mirror of https://github.com/peter4431/rq.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
71 lines
2.2 KiB
Python
71 lines
2.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import (absolute_import, division, print_function,
|
|
unicode_literals)
|
|
|
|
from redis import Redis
|
|
|
|
from rq import Connection, Queue, use_connection, get_current_connection, pop_connection
|
|
from rq.connections import NoRedisConnectionException
|
|
|
|
from tests import find_empty_redis_database, RQTestCase
|
|
from tests.fixtures import do_nothing
|
|
|
|
|
|
def new_connection():
|
|
return find_empty_redis_database()
|
|
|
|
|
|
class TestConnectionInheritance(RQTestCase):
|
|
def test_connection_detection(self):
|
|
"""Automatic detection of the connection."""
|
|
q = Queue()
|
|
self.assertEqual(q.connection, self.testconn)
|
|
|
|
def test_connection_stacking(self):
|
|
"""Connection stacking."""
|
|
conn1 = Redis(db=4)
|
|
conn2 = Redis(db=5)
|
|
|
|
with Connection(conn1):
|
|
q1 = Queue()
|
|
with Connection(conn2):
|
|
q2 = Queue()
|
|
self.assertNotEqual(q1.connection, q2.connection)
|
|
|
|
def test_connection_pass_thru(self):
|
|
"""Connection passed through from queues to jobs."""
|
|
q1 = Queue()
|
|
with Connection(new_connection()):
|
|
q2 = Queue()
|
|
job1 = q1.enqueue(do_nothing)
|
|
job2 = q2.enqueue(do_nothing)
|
|
self.assertEqual(q1.connection, job1.connection)
|
|
self.assertEqual(q2.connection, job2.connection)
|
|
|
|
|
|
class TestConnectionHelpers(RQTestCase):
|
|
def test_use_connection(self):
|
|
"""Test function use_connection works as expected."""
|
|
conn = new_connection()
|
|
use_connection(conn)
|
|
|
|
self.assertEqual(conn, get_current_connection())
|
|
|
|
use_connection()
|
|
|
|
self.assertNotEqual(conn, get_current_connection())
|
|
|
|
use_connection(self.testconn) # Restore RQTestCase connection
|
|
|
|
with self.assertRaises(AssertionError):
|
|
with Connection(new_connection()):
|
|
use_connection()
|
|
with Connection(new_connection()):
|
|
use_connection()
|
|
|
|
def test_resolve_connection_raises_on_no_connection(self):
|
|
"""Test function resolve_connection raises if there is no connection."""
|
|
pop_connection()
|
|
with self.assertRaises(NoRedisConnectionException):
|
|
Queue()
|