From 74c23512322c565e3ea2a04bff9ee0eeb524d1bc Mon Sep 17 00:00:00 2001 From: Yaniv Aknin Date: Sat, 26 Jan 2013 18:21:03 +0200 Subject: [PATCH] Refactor dequeue_any to accept arbitrary timeouts The 'blocking' parameter was replaced with a 'timeout' parameter. The timeout parameter is interpreted thus: 0 - no timeout (block forever, equivalent to blocking=True) None - non-blocking (return value or None immediately, equivalent to blocking=False) - maximum seconds to block Upon timing out, a dequeue operation will raise DequeueTimeout. --- rq/exceptions.py | 3 +++ rq/queue.py | 32 ++++++++++++++++++++++---------- rq/worker.py | 4 ++-- tests/test_queue.py | 10 +++++----- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 4d2cb6a..135ff3d 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -14,3 +14,6 @@ class UnpickleError(Exception): def __init__(self, message, raw_data): super(UnpickleError, self).__init__(message) self.raw_data = raw_data + +class DequeueTimeout(Exception): + pass diff --git a/rq/queue.py b/rq/queue.py index b962b09..0a2791c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,8 @@ import times from .connections import resolve_connection from .job import Job, Status -from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError +from .exceptions import (NoSuchJobError, UnpickleError, + InvalidJobOperationError, DequeueTimeout) from .compat import total_ordering @@ -186,7 +187,7 @@ class Queue(object): return self.connection.lpop(self.key) @classmethod - def lpop(cls, queue_keys, blocking, connection=None): + def lpop(cls, queue_keys, timeout, connection=None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -194,10 +195,18 @@ class Queue(object): Until Redis receives a specific method for this, we'll have to wrap it this way. + + The timeout parameter is interpreted thus: + 0 - no timeout (block forever) + None - non-blocking (return value or None immediately) + - maximum seconds to block """ connection = resolve_connection(connection) - if blocking: - queue_key, job_id = connection.blpop(queue_keys) + if timeout is not None: + result = connection.blpop(queue_keys, timeout) + if result is None: + raise DequeueTimeout(timeout, queue_keys) + queue_key, job_id = result return queue_key, job_id else: for queue_key in queue_keys: @@ -228,16 +237,19 @@ class Queue(object): return job @classmethod - def dequeue_any(cls, queues, blocking, connection=None): + def dequeue_any(cls, queues, timeout, connection=None): """Class method returning the Job instance at the front of the given set of Queues, where the order of the queues is important. - When all of the Queues are empty, depending on the `blocking` argument, - either blocks execution of this function until new messages arrive on - any of the queues, or returns None. + When all of the Queues are empty, depending on the `timout` argument, + either blocks execution of this function for the duration of the + timout or until new messages arrive on any of the queues, or returns + None. + + See the documentation of cls.lpop for the interpretation of timeout. """ queue_keys = [q.key for q in queues] - result = cls.lpop(queue_keys, blocking, connection=connection) + result = cls.lpop(queue_keys, timeout, connection=connection) if result is None: return None queue_key, job_id = result @@ -247,7 +259,7 @@ class Queue(object): except NoSuchJobError: # Silently pass on jobs that don't exist (anymore), # and continue by reinvoking the same function recursively - return cls.dequeue_any(queues, blocking, connection=connection) + return cls.dequeue_any(queues, timeout, connection=connection) except UnpickleError as e: # Attach queue information on the exception for improved error # reporting diff --git a/rq/worker.py b/rq/worker.py index 6337dd6..60273f2 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -301,9 +301,9 @@ class Worker(object): self.log.info('') self.log.info('*** Listening on %s...' % \ green(', '.join(qnames))) - wait_for_job = not burst + timeout = None if burst else 0 try: - result = Queue.dequeue_any(self.queues, wait_for_job, \ + result = Queue.dequeue_any(self.queues, timeout, \ connection=self.connection) if result is None: break diff --git a/tests/test_queue.py b/tests/test_queue.py index 4a88f65..ae70ac7 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -166,11 +166,11 @@ class TestQueue(RQTestCase): fooq = Queue('foo') barq = Queue('bar') - self.assertEquals(Queue.dequeue_any([fooq, barq], False), None) + self.assertEquals(Queue.dequeue_any([fooq, barq], None), None) # Enqueue a single item barq.enqueue(say_hello) - job, queue = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], None) self.assertEquals(job.func, say_hello) self.assertEquals(queue, barq) @@ -178,14 +178,14 @@ class TestQueue(RQTestCase): barq.enqueue(say_hello, 'for Bar') fooq.enqueue(say_hello, 'for Foo') - job, queue = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], None) self.assertEquals(queue, fooq) self.assertEquals(job.func, say_hello) self.assertEquals(job.origin, fooq.name) self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') - job, queue = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], None) self.assertEquals(queue, barq) self.assertEquals(job.func, say_hello) self.assertEquals(job.origin, barq.name) @@ -201,7 +201,7 @@ class TestQueue(RQTestCase): # Dequeue simply ignores the missing job and returns None self.assertEquals(q.count, 1) - self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), # noqa + self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa None) self.assertEquals(q.count, 0)