From bdc1af28d14fb152b92872c973a8add760ad0fa4 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Mon, 23 Jul 2012 13:25:31 +0700 Subject: [PATCH 01/12] Added a job decorator. --- rq/decorators.py | 35 +++++++++++++++++++++++++++++++++++ tests/fixtures.py | 6 ++++++ tests/test_decorator.py | 29 +++++++++++++++++++++++++++++ 3 files changed, 70 insertions(+) create mode 100644 rq/decorators.py create mode 100644 tests/test_decorator.py diff --git a/rq/decorators.py b/rq/decorators.py new file mode 100644 index 0000000..53eb4cb --- /dev/null +++ b/rq/decorators.py @@ -0,0 +1,35 @@ +from .connections import use_connection, get_current_connection +from .queue import Queue + + +class job(object): + + def __init__(self, queue=None): + """ + A decorator that adds a ``delay`` method to the decorated function, + which in turn creates a RQ job when called. Accepts a ``queue`` instance + as an optional argument. For example: + + from rq import Queue, use_connection + use_connection() + q = Queue() + + @job(queue=q) + def simple_add(x, y): + return x + y + + simple_add.delay(1, 2) # Puts simple_add function into queue + + """ + self.queue = queue + + def __call__(self, f): + def delay(*args, **kwargs): + if self.queue is None: + use_connection(get_current_connection()) + self.queue = Queue() + return self.queue.enqueue(f, *args, **kwargs) + f.delay = delay + return f + + diff --git a/tests/fixtures.py b/tests/fixtures.py index 32003de..ada2bf5 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -4,6 +4,8 @@ fixtures has a slighty different characteristics. """ import time +from rq.decorators import job + def say_hello(name=None): """A job with a single argument and a return value.""" @@ -48,3 +50,7 @@ class Calculator(object): def calculate(x, y): return x * y / self.denominator + +@job() +def decorated_job(x, y): + return x + y diff --git a/tests/test_decorator.py b/tests/test_decorator.py new file mode 100644 index 0000000..4a6fcb3 --- /dev/null +++ b/tests/test_decorator.py @@ -0,0 +1,29 @@ +from tests import RQTestCase +from tests.fixtures import decorated_job + +from rq.decorators import job +from rq.job import Job + + +class TestDecorator(RQTestCase): + + def setUp(self): + super(TestDecorator, self).setUp() + + def test_decorator_preserves_functionality(self): + """ + Ensure that a decorated function's functionality is still preserved + """ + self.assertEqual(decorated_job(1, 2), 3) + + def test_decorator_adds_delay_attr(self): + """ + Ensure that decorator adds a delay attribute to function that returns + a Job instance when called. + """ + self.assertTrue(hasattr(decorated_job, 'delay')) + result = decorated_job.delay(1, 2) + self.assertTrue(isinstance(result, Job)) + # Ensure that job returns the right result when performed + self.assertEqual(result.perform(), 3) + From f6374f2dfa7401dfc52541d843efa6b13f17a66c Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 08:33:28 +0200 Subject: [PATCH 02/12] Add new way of invoking .enqueue(), either implicitly or explicitly. --- rq/queue.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 9253aa1..3e73f71 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -138,15 +138,17 @@ class Queue(object): 'Functions from the __main__ module cannot be processed ' 'by workers.') - # Warn about the timeout flag that has been removed - if 'timeout' in kwargs: - import warnings - warnings.warn('The use of the timeout kwarg is not supported ' - 'anymore. If you meant to pass this argument to RQ ' - '(rather than to %r), use the `.enqueue_call()` ' - 'method instead.' % f, DeprecationWarning) - - return self.enqueue_call(func=f, args=args, kwargs=kwargs) + # Detect explicit invocations, i.e. of the form: + # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) + if 'args' in kwargs or 'kwargs' in kwargs: + assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa + options = kwargs + args = kwargs.pop('args', None) + kwargs = kwargs.pop('kwargs', None) + else: + options = {} + + return self.enqueue_call(func=f, args=args, kwargs=kwargs, **options) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. From d66939ff4af362d67043b1ee73c70786c84b66a7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 10:51:58 +0200 Subject: [PATCH 03/12] Don't use the (internal) .enqueue_call() in unit tests. --- tests/test_worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8b12c6a..9ea338b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -157,8 +157,8 @@ class TestWorker(RQTestCase): w = Worker([q]) # Put it on the queue with a timeout value - res = q.enqueue_call( - func=create_file_after_timeout, + res = q.enqueue( + create_file_after_timeout, args=(sentinel_file, 4), timeout=1) From 8c3292d35b667863fbb5547817832ac5316baed5 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 24 Jul 2012 16:03:49 +0700 Subject: [PATCH 04/12] Make "queue" argument in job decorator required. job decorator now uses Queue's "enqueue_call" method. --- rq/decorators.py | 22 +++++++++------------- tests/fixtures.py | 2 +- tests/test_decorator.py | 10 ++++++++++ 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/rq/decorators.py b/rq/decorators.py index 53eb4cb..b577667 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,20 +1,15 @@ -from .connections import use_connection, get_current_connection from .queue import Queue class job(object): - def __init__(self, queue=None): + def __init__(self, queue, connection=None, timeout=None): """ A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a ``queue`` instance as an optional argument. For example: - from rq import Queue, use_connection - use_connection() - q = Queue() - - @job(queue=q) + @job(queue='default') def simple_add(x, y): return x + y @@ -22,14 +17,15 @@ class job(object): """ self.queue = queue + self.connection = connection + self.timeout = timeout def __call__(self, f): def delay(*args, **kwargs): - if self.queue is None: - use_connection(get_current_connection()) - self.queue = Queue() - return self.queue.enqueue(f, *args, **kwargs) + if isinstance(self.queue, basestring): + queue = Queue(name=self.queue, connection=self.connection) + else: + queue = self.queue + return queue.enqueue_call(f, args=args, kwargs=kwargs, timeout=self.timeout) f.delay = delay return f - - diff --git a/tests/fixtures.py b/tests/fixtures.py index ada2bf5..1edebc4 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -51,6 +51,6 @@ class Calculator(object): def calculate(x, y): return x * y / self.denominator -@job() +@job(queue='default') def decorated_job(x, y): return x + y diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 4a6fcb3..cf6d4e6 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -27,3 +27,13 @@ class TestDecorator(RQTestCase): # Ensure that job returns the right result when performed self.assertEqual(result.perform(), 3) + def test_decorator_accepts_queue_name_as_argument(self): + """ + Ensure that passing in queue name to the decorator puts the job in the + right queue. + """ + @job(queue='queue_name') + def hello(): + return 'Hi' + result = hello.delay() + self.assertEqual(result.origin, 'queue_name') From abac4a5f41efe6c9faef7f8d1f28124c1fe94c3f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:13:06 +0200 Subject: [PATCH 05/12] Since we only have the timeout option, don't be too generic. --- rq/queue.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index 3e73f71..7d78875 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -107,7 +107,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, **options): + def enqueue_call(self, func, args=None, kwargs=None, timeout=None): """Creates a job to represent the delayed function call and enqueues it. @@ -115,7 +115,7 @@ class Queue(object): and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. """ - timeout = options.get('timeout', self._default_timeout) + timeout = timeout or self._default_timeout job = Job.create(func, args, kwargs, connection=self.connection) return self.enqueue_job(job, timeout=timeout) @@ -140,15 +140,15 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) + timeout = None if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa - options = kwargs + timeout = kwargs.pop('timeout', None) args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) - else: - options = {} - return self.enqueue_call(func=f, args=args, kwargs=kwargs, **options) + return self.enqueue_call(func=f, args=args, kwargs=kwargs, + timeout=timeout) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. From 35dedf3db4b2257f243651d7a8b9d4ec8aaecc82 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:15:23 +0200 Subject: [PATCH 06/12] Make test descriptions show up in short (oneliner) mode, as ./run_tests does. --- tests/test_decorator.py | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/tests/test_decorator.py b/tests/test_decorator.py index cf6d4e6..4f95f9c 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -11,14 +11,12 @@ class TestDecorator(RQTestCase): super(TestDecorator, self).setUp() def test_decorator_preserves_functionality(self): - """ - Ensure that a decorated function's functionality is still preserved + """Ensure that a decorated function's functionality is still preserved. """ self.assertEqual(decorated_job(1, 2), 3) def test_decorator_adds_delay_attr(self): - """ - Ensure that decorator adds a delay attribute to function that returns + """Ensure that decorator adds a delay attribute to function that returns a Job instance when called. """ self.assertTrue(hasattr(decorated_job, 'delay')) @@ -28,9 +26,8 @@ class TestDecorator(RQTestCase): self.assertEqual(result.perform(), 3) def test_decorator_accepts_queue_name_as_argument(self): - """ - Ensure that passing in queue name to the decorator puts the job in the - right queue. + """Ensure that passing in queue name to the decorator puts the job in + the right queue. """ @job(queue='queue_name') def hello(): From 6b79e51033f7d87764ec59645c1e3ca716b784db Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:19:04 +0200 Subject: [PATCH 07/12] PEP8ify. --- rq/decorators.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/decorators.py b/rq/decorators.py index b577667..5eeb6d1 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -6,8 +6,8 @@ class job(object): def __init__(self, queue, connection=None, timeout=None): """ A decorator that adds a ``delay`` method to the decorated function, - which in turn creates a RQ job when called. Accepts a ``queue`` instance - as an optional argument. For example: + which in turn creates a RQ job when called. Accepts a ``queue`` + instance as an optional argument. For example: @job(queue='default') def simple_add(x, y): @@ -26,6 +26,7 @@ class job(object): queue = Queue(name=self.queue, connection=self.connection) else: queue = self.queue - return queue.enqueue_call(f, args=args, kwargs=kwargs, timeout=self.timeout) + return queue.enqueue_call(f, args=args, kwargs=kwargs, + timeout=self.timeout) f.delay = delay return f From a1d08a64fcc5eec26e16a8c5b819f08c6722acf2 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:19:25 +0200 Subject: [PATCH 08/12] Add @wraps() call. --- rq/decorators.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rq/decorators.py b/rq/decorators.py index 5eeb6d1..0c64f3d 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,3 +1,4 @@ +from functools import wraps from .queue import Queue @@ -21,6 +22,7 @@ class job(object): self.timeout = timeout def __call__(self, f): + @wraps(f) def delay(*args, **kwargs): if isinstance(self.queue, basestring): queue = Queue(name=self.queue, connection=self.connection) From bfc3c3d89e71b1cdd25a50b382ad632d2f816e84 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:28:18 +0200 Subject: [PATCH 09/12] Add changes to the change log. --- CHANGES.md | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 4036dba..30399a5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,8 +1,22 @@ ### 0.3.0 (not released) -- Removes the possible ambiguity of passing in a `timeout` argument to - `.enqueue()`. Instead, now use the `.enqueue_call()` method. +- `.enqueue()` does not consume the `timeout` kwarg anymore. Instead, to pass + RQ a timeout value while enqueueing a function, use the explicit invocation + instead: + + q.enqueue(do_something, args=(1, 2), kwargs={'a': 1}, timeout=30) + +- Add a `@job` decorator, which can be used to do Celery-style delayed + invocations: + + from rq.decorators import job + + @job('high', timeout=10) + def some_work(x, y): + return x + y + + some_work.delay(2, 3) ### 0.2.1 From dbf101bc06424644c67eb816b5d60c31fa63f9d5 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 11:31:19 +0200 Subject: [PATCH 10/12] Update comment. --- rq/decorators.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/rq/decorators.py b/rq/decorators.py index 0c64f3d..54b53f8 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -5,17 +5,16 @@ from .queue import Queue class job(object): def __init__(self, queue, connection=None, timeout=None): - """ - A decorator that adds a ``delay`` method to the decorated function, - which in turn creates a RQ job when called. Accepts a ``queue`` - instance as an optional argument. For example: + """A decorator that adds a ``delay`` method to the decorated function, + which in turn creates a RQ job when called. Accepts a required ``queue`` + argument that can be either a ``Queue`` instance or a string denoting + the queue name. For example: @job(queue='default') def simple_add(x, y): return x + y simple_add.delay(1, 2) # Puts simple_add function into queue - """ self.queue = queue self.connection = connection From c7225ba2577e012b20c03451be21604889eb8104 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 12:30:37 +0200 Subject: [PATCH 11/12] Minor bug in test case. --- tests/fixtures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 1edebc4..1b20248 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -48,7 +48,7 @@ class Calculator(object): def __init__(self, denominator): self.denominator = denominator - def calculate(x, y): + def calculate(self, x, y): return x * y / self.denominator @job(queue='default') From d697ddb93a33c59d0ce3f7b6f139ecd88abaceea Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 24 Jul 2012 12:33:22 +0200 Subject: [PATCH 12/12] Resolve connections early. Fixes #101. --- rq/connections.py | 14 ++++++++++++++ rq/decorators.py | 9 +++++---- rq/queue.py | 12 ++++-------- tests/fixtures.py | 10 ++++++---- 4 files changed, 29 insertions(+), 16 deletions(-) diff --git a/rq/connections.py b/rq/connections.py index 7aa7cf7..05de2f1 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -51,6 +51,20 @@ def get_current_connection(): return _connection_stack.top +def resolve_connection(connection=None): + """Convenience function to resolve the given or the current connection. + Raises an exception if it cannot resolve a connection now. + """ + if connection is not None: + return connection + + connection = get_current_connection() + if connection is None: + raise NoRedisConnectionException( + 'Could not resolve a Redis connection.') + return connection + + _connection_stack = LocalStack() __all__ = ['Connection', diff --git a/rq/decorators.py b/rq/decorators.py index 54b53f8..d5851c3 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,14 +1,15 @@ from functools import wraps from .queue import Queue +from .connections import resolve_connection class job(object): def __init__(self, queue, connection=None, timeout=None): """A decorator that adds a ``delay`` method to the decorated function, - which in turn creates a RQ job when called. Accepts a required ``queue`` - argument that can be either a ``Queue`` instance or a string denoting - the queue name. For example: + which in turn creates a RQ job when called. Accepts a required + ``queue`` argument that can be either a ``Queue`` instance or a string + denoting the queue name. For example: @job(queue='default') def simple_add(x, y): @@ -17,7 +18,7 @@ class job(object): simple_add.delay(1, 2) # Puts simple_add function into queue """ self.queue = queue - self.connection = connection + self.connection = resolve_connection(connection) self.timeout = timeout def __call__(self, f): diff --git a/rq/queue.py b/rq/queue.py index 7d78875..b02e799 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,5 +1,5 @@ import times -from .connections import get_current_connection +from .connections import resolve_connection from .job import Job from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .compat import total_ordering @@ -23,8 +23,7 @@ class Queue(object): """Returns an iterable of all Queues. """ prefix = cls.redis_queue_namespace_prefix - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) def to_queue(queue_key): return cls.from_queue_key(queue_key, connection=connection) @@ -43,9 +42,7 @@ class Queue(object): return cls(name, connection=connection) def __init__(self, name='default', default_timeout=None, connection=None): - if connection is None: - connection = get_current_connection() - self.connection = connection + self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) @@ -187,8 +184,7 @@ class Queue(object): Until Redis receives a specific method for this, we'll have to wrap it this way. """ - if connection is None: - connection = get_current_connection() + connection = resolve_connection(connection) if blocking: queue_key, job_id = connection.blpop(queue_keys) return queue_key, job_id diff --git a/tests/fixtures.py b/tests/fixtures.py index 1b20248..3d45ab6 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,7 +3,7 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ import time - +from rq import Connection from rq.decorators import job @@ -51,6 +51,8 @@ class Calculator(object): def calculate(self, x, y): return x * y / self.denominator -@job(queue='default') -def decorated_job(x, y): - return x + y + +with Connection(): + @job(queue='default') + def decorated_job(x, y): + return x + y