Merge branch 'selwin-decorator'

main
Vincent Driessen 13 years ago
commit 4057ae768b

@ -1,8 +1,22 @@
### 0.3.0 ### 0.3.0
(not released) (not released)
- Removes the possible ambiguity of passing in a `timeout` argument to - `.enqueue()` does not consume the `timeout` kwarg anymore. Instead, to pass
`.enqueue()`. Instead, now use the `.enqueue_call()` method. RQ a timeout value while enqueueing a function, use the explicit invocation
instead:
q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30)
- Add a `@job` decorator, which can be used to do Celery-style delayed
invocations:
from rq.decorators import job
@job('high', timeout=10)
def some_work(x, y):
return x + y
some_work.delay(2, 3)
### 0.2.1 ### 0.2.1

@ -51,6 +51,20 @@ def get_current_connection():
return _connection_stack.top return _connection_stack.top
def resolve_connection(connection=None):
"""Convenience function to resolve the given or the current connection.
Raises an exception if it cannot resolve a connection now.
"""
if connection is not None:
return connection
connection = get_current_connection()
if connection is None:
raise NoRedisConnectionException(
'Could not resolve a Redis connection.')
return connection
_connection_stack = LocalStack() _connection_stack = LocalStack()
__all__ = ['Connection', __all__ = ['Connection',

@ -0,0 +1,34 @@
from functools import wraps
from .queue import Queue
from .connections import resolve_connection
class job(object):
def __init__(self, queue, connection=None, timeout=None):
"""A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example:
@job(queue='default')
def simple_add(x, y):
return x + y
simple_add.delay(1, 2) # Puts simple_add function into queue
"""
self.queue = queue
self.connection = resolve_connection(connection)
self.timeout = timeout
def __call__(self, f):
@wraps(f)
def delay(*args, **kwargs):
if isinstance(self.queue, basestring):
queue = Queue(name=self.queue, connection=self.connection)
else:
queue = self.queue
return queue.enqueue_call(f, args=args, kwargs=kwargs,
timeout=self.timeout)
f.delay = delay
return f

@ -1,5 +1,5 @@
import times import times
from .connections import get_current_connection from .connections import resolve_connection
from .job import Job from .job import Job
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
from .compat import total_ordering from .compat import total_ordering
@ -23,8 +23,7 @@ class Queue(object):
"""Returns an iterable of all Queues. """Returns an iterable of all Queues.
""" """
prefix = cls.redis_queue_namespace_prefix prefix = cls.redis_queue_namespace_prefix
if connection is None: connection = resolve_connection(connection)
connection = get_current_connection()
def to_queue(queue_key): def to_queue(queue_key):
return cls.from_queue_key(queue_key, connection=connection) return cls.from_queue_key(queue_key, connection=connection)
@ -43,9 +42,7 @@ class Queue(object):
return cls(name, connection=connection) return cls(name, connection=connection)
def __init__(self, name='default', default_timeout=None, connection=None): def __init__(self, name='default', default_timeout=None, connection=None):
if connection is None: self.connection = resolve_connection(connection)
connection = get_current_connection()
self.connection = connection
prefix = self.redis_queue_namespace_prefix prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = '%s%s' % (prefix, name) self._key = '%s%s' % (prefix, name)
@ -107,7 +104,7 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, **options): def enqueue_call(self, func, args=None, kwargs=None, timeout=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -115,7 +112,7 @@ class Queue(object):
and kwargs as explicit arguments. Any kwargs passed to this function and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = options.get('timeout', self._default_timeout) timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection) job = Job.create(func, args, kwargs, connection=self.connection)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
@ -138,15 +135,17 @@ class Queue(object):
'Functions from the __main__ module cannot be processed ' 'Functions from the __main__ module cannot be processed '
'by workers.') 'by workers.')
# Warn about the timeout flag that has been removed # Detect explicit invocations, i.e. of the form:
if 'timeout' in kwargs: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
import warnings timeout = None
warnings.warn('The use of the timeout kwarg is not supported ' if 'args' in kwargs or 'kwargs' in kwargs:
'anymore. If you meant to pass this argument to RQ ' assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
'(rather than to %r), use the `.enqueue_call()` ' timeout = kwargs.pop('timeout', None)
'method instead.' % f, DeprecationWarning) args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs) return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
@ -185,8 +184,7 @@ class Queue(object):
Until Redis receives a specific method for this, we'll have to wrap it Until Redis receives a specific method for this, we'll have to wrap it
this way. this way.
""" """
if connection is None: connection = resolve_connection(connection)
connection = get_current_connection()
if blocking: if blocking:
queue_key, job_id = connection.blpop(queue_keys) queue_key, job_id = connection.blpop(queue_keys)
return queue_key, job_id return queue_key, job_id

@ -3,6 +3,8 @@ This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics. fixtures has a slighty different characteristics.
""" """
import time import time
from rq import Connection
from rq.decorators import job
def say_hello(name=None): def say_hello(name=None):
@ -46,5 +48,11 @@ class Calculator(object):
def __init__(self, denominator): def __init__(self, denominator):
self.denominator = denominator self.denominator = denominator
def calculate(x, y): def calculate(self, x, y):
return x * y / self.denominator return x * y / self.denominator
with Connection():
@job(queue='default')
def decorated_job(x, y):
return x + y

@ -0,0 +1,36 @@
from tests import RQTestCase
from tests.fixtures import decorated_job
from rq.decorators import job
from rq.job import Job
class TestDecorator(RQTestCase):
def setUp(self):
super(TestDecorator, self).setUp()
def test_decorator_preserves_functionality(self):
"""Ensure that a decorated function's functionality is still preserved.
"""
self.assertEqual(decorated_job(1, 2), 3)
def test_decorator_adds_delay_attr(self):
"""Ensure that decorator adds a delay attribute to function that returns
a Job instance when called.
"""
self.assertTrue(hasattr(decorated_job, 'delay'))
result = decorated_job.delay(1, 2)
self.assertTrue(isinstance(result, Job))
# Ensure that job returns the right result when performed
self.assertEqual(result.perform(), 3)
def test_decorator_accepts_queue_name_as_argument(self):
"""Ensure that passing in queue name to the decorator puts the job in
the right queue.
"""
@job(queue='queue_name')
def hello():
return 'Hi'
result = hello.delay()
self.assertEqual(result.origin, 'queue_name')

@ -157,8 +157,8 @@ class TestWorker(RQTestCase):
w = Worker([q]) w = Worker([q])
# Put it on the queue with a timeout value # Put it on the queue with a timeout value
res = q.enqueue_call( res = q.enqueue(
func=create_file_after_timeout, create_file_after_timeout,
args=(sentinel_file, 4), args=(sentinel_file, 4),
timeout=1) timeout=1)

Loading…
Cancel
Save