diff --git a/rq/exceptions.py b/rq/exceptions.py new file mode 100644 index 0000000..00583d7 --- /dev/null +++ b/rq/exceptions.py @@ -0,0 +1,5 @@ +class NoQueueError(Exception): + pass + +class NoMoreWorkError(Exception): + pass diff --git a/rq/queue.py b/rq/queue.py index 58c4ed1..eeac2e7 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,7 @@ import uuid from pickle import loads, dumps from .proxy import conn - +from .exceptions import NoMoreWorkError class DelayedResult(object): def __init__(self, key): @@ -18,7 +18,6 @@ class DelayedResult(object): return self._rv - def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -56,5 +55,28 @@ class Queue(object): s = conn.lpop(self.key) return loads(s) + @classmethod + def _dequeue_any(cls, queues): + # Redis' BLPOP command takes multiple queue arguments, but LPOP can + # only take a single queue. Therefore, we need to loop over all + # queues manually, in order, and return None if no more work is + # available + for queue in queues: + value = conn.lpop(queue) + if value is not None: + return (queue, value) + return None + + @classmethod + def dequeue_any(cls, queues, blocking): + if blocking: + queue, msg = conn.blpop(queues) + else: + value = cls._dequeue_any(queues) + if value is None: + raise NoMoreWorkError('No more work.') + queue, msg = value + return (queue, msg) + def __str__(self): return self.name diff --git a/rq/worker.py b/rq/worker.py index 9055f70..07fdcca 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -10,13 +10,11 @@ except ImportError: from pickle import loads, dumps from .queue import Queue from .proxy import conn +from .exceptions import NoMoreWorkError, NoQueueError def iterable(x): return hasattr(x, '__iter__') -class NoQueueError(Exception): pass -class NoMoreWorkError(Exception): pass - class Worker(object): def __init__(self, queues, rv_ttl=500): if isinstance(queues, Queue): @@ -57,35 +55,13 @@ class Worker(object): procname.setprocname('rq: %s' % (message,)) - def multi_lpop(self, queues): - # Redis' BLPOP command takes multiple queue arguments, but LPOP can - # only take a single queue. Therefore, we need to loop over all - # queues manually, in order, and raise an exception is no more work - # is available - for queue in queues: - value = conn.lpop(queue) - if value is not None: - return (queue, value) - return None - - def pop_next_job(self, blocking): - queues = self.queue_keys() - if blocking: - queue, msg = conn.blpop(queues) - else: - value = self.multi_lpop(queues) - if value is None: - raise NoMoreWorkError('No more work.') - queue, msg = value - return (queue, msg) - def _work(self, quit_when_done=False): did_work = False while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) try: wait_for_job = not quit_when_done - queue, msg = self.pop_next_job(wait_for_job) + queue, msg = Queue.dequeue_any(self.queues, wait_for_job) did_work = True except NoMoreWorkError: break