Bugfix: LPOP does not support multiple queue arguments.

Redis' BLPOP command takes multiple queue arguments, but LPOP can only
take a single queue.  Therefore, we need to loop over all queues
manually, in order, and raise an exception is no more work is available.
main
Vincent Driessen 13 years ago
parent a77c3d9104
commit 04c88577ed

@ -12,6 +12,7 @@ from .queue import Queue
from .proxy import conn from .proxy import conn
class NoQueueError(Exception): pass class NoQueueError(Exception): pass
class NoMoreWorkError(Exception): pass
class Worker(object): class Worker(object):
def __init__(self, queue_names, rv_ttl=500): def __init__(self, queue_names, rv_ttl=500):
@ -45,16 +46,36 @@ class Worker(object):
self.log.debug(message) self.log.debug(message)
procname.setprocname('rq: %s' % (message,)) procname.setprocname('rq: %s' % (message,))
def multi_lpop(self, queues):
# Redis' BLPOP command takes multiple queue arguments, but LPOP can
# only take a single queue. Therefore, we need to loop over all
# queues manually, in order, and raise an exception is no more work
# is available
for queue in queues:
value = conn.lpop(queue)
if value is not None:
return value
return None
def pop_next_job(self, blocking):
queues = self.queue_keys()
if blocking:
queue, msg = conn.blpop(queues)
else:
value = self.multi_lpop(queues)
if value is None:
raise NoMoreWorkError('No more work.')
queue, msg = queue, value
return (queue, msg)
def work(self, quit_when_done=False): def work(self, quit_when_done=False):
while True: while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
if quit_when_done: try:
value = conn.lpop(self.queue_keys()) wait_for_job = not quit_when_done
if value is None: queue, msg = self.pop_next_job(wait_for_job)
break # No more work, so quitting except NoMoreWorkError:
queue, msg = value break
else:
queue, msg = conn.blpop(self.queue_keys())
self.fork_and_perform_job(queue, msg) self.fork_and_perform_job(queue, msg)
def fork_and_perform_job(self, queue, msg): def fork_and_perform_job(self, queue, msg):

Loading…
Cancel
Save