Refactor dequeue_any to accept arbitrary timeouts

The 'blocking' parameter was replaced with a 'timeout' parameter.

The timeout parameter is interpreted thus:
    0 - no timeout (block forever, equivalent to blocking=True)
    None - non-blocking (return value or None immediately, equivalent to
                         blocking=False)
    <integer> - maximum seconds to block

Upon timing out, a dequeue operation will raise DequeueTimeout.
main
Yaniv Aknin 12 years ago
parent 0bc451f75b
commit 74c2351232

@ -14,3 +14,6 @@ class UnpickleError(Exception):
def __init__(self, message, raw_data): def __init__(self, message, raw_data):
super(UnpickleError, self).__init__(message) super(UnpickleError, self).__init__(message)
self.raw_data = raw_data self.raw_data = raw_data
class DequeueTimeout(Exception):
pass

@ -1,7 +1,8 @@
import times import times
from .connections import resolve_connection from .connections import resolve_connection
from .job import Job, Status from .job import Job, Status
from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError from .exceptions import (NoSuchJobError, UnpickleError,
InvalidJobOperationError, DequeueTimeout)
from .compat import total_ordering from .compat import total_ordering
@ -186,7 +187,7 @@ class Queue(object):
return self.connection.lpop(self.key) return self.connection.lpop(self.key)
@classmethod @classmethod
def lpop(cls, queue_keys, blocking, connection=None): def lpop(cls, queue_keys, timeout, connection=None):
"""Helper method. Intermediate method to abstract away from some """Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to accepts multiple. So if we want the non-blocking LPOP, we need to
@ -194,10 +195,18 @@ class Queue(object):
Until Redis receives a specific method for this, we'll have to wrap it Until Redis receives a specific method for this, we'll have to wrap it
this way. this way.
The timeout parameter is interpreted thus:
0 - no timeout (block forever)
None - non-blocking (return value or None immediately)
<integer> - maximum seconds to block
""" """
connection = resolve_connection(connection) connection = resolve_connection(connection)
if blocking: if timeout is not None:
queue_key, job_id = connection.blpop(queue_keys) result = connection.blpop(queue_keys, timeout)
if result is None:
raise DequeueTimeout(timeout, queue_keys)
queue_key, job_id = result
return queue_key, job_id return queue_key, job_id
else: else:
for queue_key in queue_keys: for queue_key in queue_keys:
@ -228,16 +237,19 @@ class Queue(object):
return job return job
@classmethod @classmethod
def dequeue_any(cls, queues, blocking, connection=None): def dequeue_any(cls, queues, timeout, connection=None):
"""Class method returning the Job instance at the front of the given """Class method returning the Job instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
When all of the Queues are empty, depending on the `blocking` argument, When all of the Queues are empty, depending on the `timout` argument,
either blocks execution of this function until new messages arrive on either blocks execution of this function for the duration of the
any of the queues, or returns None. timout or until new messages arrive on any of the queues, or returns
None.
See the documentation of cls.lpop for the interpretation of timeout.
""" """
queue_keys = [q.key for q in queues] queue_keys = [q.key for q in queues]
result = cls.lpop(queue_keys, blocking, connection=connection) result = cls.lpop(queue_keys, timeout, connection=connection)
if result is None: if result is None:
return None return None
queue_key, job_id = result queue_key, job_id = result
@ -247,7 +259,7 @@ class Queue(object):
except NoSuchJobError: except NoSuchJobError:
# Silently pass on jobs that don't exist (anymore), # Silently pass on jobs that don't exist (anymore),
# and continue by reinvoking the same function recursively # and continue by reinvoking the same function recursively
return cls.dequeue_any(queues, blocking, connection=connection) return cls.dequeue_any(queues, timeout, connection=connection)
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting

@ -301,9 +301,9 @@ class Worker(object):
self.log.info('') self.log.info('')
self.log.info('*** Listening on %s...' % \ self.log.info('*** Listening on %s...' % \
green(', '.join(qnames))) green(', '.join(qnames)))
wait_for_job = not burst timeout = None if burst else 0
try: try:
result = Queue.dequeue_any(self.queues, wait_for_job, \ result = Queue.dequeue_any(self.queues, timeout, \
connection=self.connection) connection=self.connection)
if result is None: if result is None:
break break

@ -166,11 +166,11 @@ class TestQueue(RQTestCase):
fooq = Queue('foo') fooq = Queue('foo')
barq = Queue('bar') barq = Queue('bar')
self.assertEquals(Queue.dequeue_any([fooq, barq], False), None) self.assertEquals(Queue.dequeue_any([fooq, barq], None), None)
# Enqueue a single item # Enqueue a single item
barq.enqueue(say_hello) barq.enqueue(say_hello)
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(job.func, say_hello) self.assertEquals(job.func, say_hello)
self.assertEquals(queue, barq) self.assertEquals(queue, barq)
@ -178,14 +178,14 @@ class TestQueue(RQTestCase):
barq.enqueue(say_hello, 'for Bar') barq.enqueue(say_hello, 'for Bar')
fooq.enqueue(say_hello, 'for Foo') fooq.enqueue(say_hello, 'for Foo')
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(queue, fooq) self.assertEquals(queue, fooq)
self.assertEquals(job.func, say_hello) self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, fooq.name) self.assertEquals(job.origin, fooq.name)
self.assertEquals(job.args[0], 'for Foo', self.assertEquals(job.args[0], 'for Foo',
'Foo should be dequeued first.') 'Foo should be dequeued first.')
job, queue = Queue.dequeue_any([fooq, barq], False) job, queue = Queue.dequeue_any([fooq, barq], None)
self.assertEquals(queue, barq) self.assertEquals(queue, barq)
self.assertEquals(job.func, say_hello) self.assertEquals(job.func, say_hello)
self.assertEquals(job.origin, barq.name) self.assertEquals(job.origin, barq.name)
@ -201,7 +201,7 @@ class TestQueue(RQTestCase):
# Dequeue simply ignores the missing job and returns None # Dequeue simply ignores the missing job and returns None
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), # noqa self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], None), # noqa
None) None)
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)

Loading…
Cancel
Save