From 04c88577edb0478573ef45ab0f2549f81df6b18d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 15 Nov 2011 01:31:49 +0100 Subject: [PATCH] Bugfix: LPOP does not support multiple queue arguments. Redis' BLPOP command takes multiple queue arguments, but LPOP can only take a single queue. Therefore, we need to loop over all queues manually, in order, and raise an exception is no more work is available. --- rq/worker.py | 35 ++++++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index f24548b..ee9ae3f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -12,6 +12,7 @@ from .queue import Queue from .proxy import conn class NoQueueError(Exception): pass +class NoMoreWorkError(Exception): pass class Worker(object): def __init__(self, queue_names, rv_ttl=500): @@ -45,16 +46,36 @@ class Worker(object): self.log.debug(message) procname.setprocname('rq: %s' % (message,)) + def multi_lpop(self, queues): + # Redis' BLPOP command takes multiple queue arguments, but LPOP can + # only take a single queue. Therefore, we need to loop over all + # queues manually, in order, and raise an exception is no more work + # is available + for queue in queues: + value = conn.lpop(queue) + if value is not None: + return value + return None + + def pop_next_job(self, blocking): + queues = self.queue_keys() + if blocking: + queue, msg = conn.blpop(queues) + else: + value = self.multi_lpop(queues) + if value is None: + raise NoMoreWorkError('No more work.') + queue, msg = queue, value + return (queue, msg) + def work(self, quit_when_done=False): while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - if quit_when_done: - value = conn.lpop(self.queue_keys()) - if value is None: - break # No more work, so quitting - queue, msg = value - else: - queue, msg = conn.blpop(self.queue_keys()) + try: + wait_for_job = not quit_when_done + queue, msg = self.pop_next_job(wait_for_job) + except NoMoreWorkError: + break self.fork_and_perform_job(queue, msg) def fork_and_perform_job(self, queue, msg):