diff --git a/rq/connections.py b/rq/connections.py index 7aa7cf7..05de2f1 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -51,6 +51,20 @@ def get_current_connection(): return _connection_stack.top +def resolve_connection(connection=None): + """Convenience function to resolve the given or the current connection. + Raises an exception if it cannot resolve a connection now. + """ + if connection is not None: + return connection + + connection = get_current_connection() + if connection is None: + raise NoRedisConnectionException( + 'Could not resolve a Redis connection.') + return connection + + _connection_stack = LocalStack() __all__ = ['Connection', diff --git a/rq/decorators.py b/rq/decorators.py index 54b53f8..d5851c3 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,14 +1,15 @@ from functools import wraps from .queue import Queue +from .connections import resolve_connection class job(object): def __init__(self, queue, connection=None, timeout=None): """A decorator that adds a ``delay`` method to the decorated function, - which in turn creates a RQ job when called. Accepts a required ``queue`` - argument that can be either a ``Queue`` instance or a string denoting - the queue name. For example: + which in turn creates a RQ job when called. Accepts a required + ``queue`` argument that can be either a ``Queue`` instance or a string + denoting the queue name. For example: @job(queue='default') def simple_add(x, y): @@ -17,7 +18,7 @@ class job(object): simple_add.delay(1, 2) # Puts simple_add function into queue """ self.queue = queue - self.connection = connection + self.connection = resolve_connection(connection) self.timeout = timeout def __call__(self, f): diff --git a/rq/queue.py b/rq/queue.py index 7d78875..b02e799 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,5 +1,5 @@ import times -from .connections import get_current_connection +from .connections import resolve_connection from .job import Job from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -23,8 +23,7 @@ class Queue(object): """Returns an iterable of all Queues. """ prefix = cls.redis_queue_namespace_prefix - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) def to_queue(queue_key): return cls.from_queue_key(queue_key, connection=connection) @@ -43,9 +42,7 @@ class Queue(object): return cls(name, connection=connection) def __init__(self, name='default', default_timeout=None, connection=None): - if connection is None: - connection = get_current_connection() - self.connection = connection + self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) @@ -187,8 +184,7 @@ class Queue(object): Until Redis receives a specific method for this, we'll have to wrap it this way. """ - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) if blocking: queue_key, job_id = connection.blpop(queue_keys) return queue_key, job_id diff --git a/tests/fixtures.py b/tests/fixtures.py index 1b20248..3d45ab6 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,7 +3,7 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ import time - +from rq import Connection from rq.decorators import job @@ -51,6 +51,8 @@ class Calculator(object): def calculate(self, x, y): return x * y / self.denominator -@job(queue='default') -def decorated_job(x, y): - return x + y + +with Connection(): + @job(queue='default') + def decorated_job(x, y): + return x + y