Restructure some code.

No functional change, but leave the BLPOP'ing to the Queue, as the
queues know how to pop themselves.
main
Vincent Driessen 13 years ago
parent 652ced2580
commit f492a5ae2b

@ -0,0 +1,5 @@
class NoQueueError(Exception):
pass
class NoMoreWorkError(Exception):
pass

@ -1,7 +1,7 @@
import uuid
from pickle import loads, dumps
from .proxy import conn
from .exceptions import NoMoreWorkError
class DelayedResult(object):
def __init__(self, key):
@ -18,7 +18,6 @@ class DelayedResult(object):
return self._rv
def to_queue_key(queue_name):
return 'rq:%s' % (queue_name,)
@ -56,5 +55,28 @@ class Queue(object):
s = conn.lpop(self.key)
return loads(s)
@classmethod
def _dequeue_any(cls, queues):
# Redis' BLPOP command takes multiple queue arguments, but LPOP can
# only take a single queue. Therefore, we need to loop over all
# queues manually, in order, and return None if no more work is
# available
for queue in queues:
value = conn.lpop(queue)
if value is not None:
return (queue, value)
return None
@classmethod
def dequeue_any(cls, queues, blocking):
if blocking:
queue, msg = conn.blpop(queues)
else:
value = cls._dequeue_any(queues)
if value is None:
raise NoMoreWorkError('No more work.')
queue, msg = value
return (queue, msg)
def __str__(self):
return self.name

@ -10,13 +10,11 @@ except ImportError:
from pickle import loads, dumps
from .queue import Queue
from .proxy import conn
from .exceptions import NoMoreWorkError, NoQueueError
def iterable(x):
return hasattr(x, '__iter__')
class NoQueueError(Exception): pass
class NoMoreWorkError(Exception): pass
class Worker(object):
def __init__(self, queues, rv_ttl=500):
if isinstance(queues, Queue):
@ -57,35 +55,13 @@ class Worker(object):
procname.setprocname('rq: %s' % (message,))
def multi_lpop(self, queues):
# Redis' BLPOP command takes multiple queue arguments, but LPOP can
# only take a single queue. Therefore, we need to loop over all
# queues manually, in order, and raise an exception is no more work
# is available
for queue in queues:
value = conn.lpop(queue)
if value is not None:
return (queue, value)
return None
def pop_next_job(self, blocking):
queues = self.queue_keys()
if blocking:
queue, msg = conn.blpop(queues)
else:
value = self.multi_lpop(queues)
if value is None:
raise NoMoreWorkError('No more work.')
queue, msg = value
return (queue, msg)
def _work(self, quit_when_done=False):
did_work = False
while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
try:
wait_for_job = not quit_when_done
queue, msg = self.pop_next_job(wait_for_job)
queue, msg = Queue.dequeue_any(self.queues, wait_for_job)
did_work = True
except NoMoreWorkError:
break

Loading…
Cancel
Save