Make the dequeue methods return values consistent.

I merely refactored the internal calls. No external API changes have been made in this commit. In order to make the dequeueing methods consistent, each dequeue method now returns a Job instance, which is just a nice lightweight wrapper around the job tuple.

The Job class makes it easier to pass the method call info around, along with some possible meta information, like the queue the job originated from.

This fixes #7.
main
Vincent Driessen 13 years ago
parent f492a5ae2b
commit a5ea45af57

@ -1,5 +1,2 @@
class NoQueueError(Exception): class NoQueueError(Exception):
pass pass
class NoMoreWorkError(Exception):
pass

@ -1,7 +1,6 @@
import uuid import uuid
from pickle import loads, dumps from pickle import loads, dumps
from .proxy import conn from .proxy import conn
from .exceptions import NoMoreWorkError
class DelayedResult(object): class DelayedResult(object):
def __init__(self, key): def __init__(self, key):
@ -17,15 +16,44 @@ class DelayedResult(object):
self._rv = loads(rv) self._rv = loads(rv)
return self._rv return self._rv
class Job(object):
"""A Job is just a convenient datastructure to pass around job (meta) data.
"""
def to_queue_key(queue_name): @classmethod
return 'rq:%s' % (queue_name,) def unpickle(cls, pickle_data):
job_tuple = loads(pickle_data)
return Job(job_tuple)
def __init__(self, job_tuple, origin=None):
self.func, self.args, self.kwargs, self.rv_key = job_tuple
self.origin = origin
def perform(self):
"""Invokes the job function with the job arguments.
"""
return self.func(*self.args, **self.kwargs)
class Queue(object): class Queue(object):
redis_queue_namespace_prefix = 'rq:'
@classmethod
def from_queue_key(cls, queue_key):
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys.
"""
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
raise ValueError('Not a valid RQ queue key: %s' % (queue_key,))
name = queue_key[len(prefix):]
return Queue(name)
def __init__(self, name='default'): def __init__(self, name='default'):
prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = to_queue_key(name) self._key = '%s%s' % (prefix, name)
@property @property
def key(self): def key(self):
@ -52,31 +80,43 @@ class Queue(object):
return DelayedResult(rv_key) return DelayedResult(rv_key)
def dequeue(self): def dequeue(self):
s = conn.lpop(self.key) blob = conn.lpop(self.key)
return loads(s) if blob is None:
return None
job = Job.unpickle(blob)
job.origin = self
return job
@classmethod @classmethod
def _dequeue_any(cls, queues): def _lpop_any(cls, queue_keys):
# Redis' BLPOP command takes multiple queue arguments, but LPOP can """Helper method. You should not call this directly.
# only take a single queue. Therefore, we need to loop over all
# queues manually, in order, and return None if no more work is Redis' BLPOP command takes multiple queue arguments, but LPOP can only
# available take a single queue. Therefore, we need to loop over all queues
for queue in queues: manually, in order, and return None if no more work is available.
value = conn.lpop(queue) """
if value is not None: for queue_key in queue_keys:
return (queue, value) blob = conn.lpop(queue_key)
if blob is not None:
return (queue_key, blob)
return None return None
@classmethod @classmethod
def dequeue_any(cls, queues, blocking): def dequeue_any(cls, queues, blocking):
queue_keys = map(lambda q: q.key, queues)
if blocking: if blocking:
queue, msg = conn.blpop(queues) queue_key, blob = conn.blpop(queue_keys)
else: else:
value = cls._dequeue_any(queues) redis_result = cls._lpop_any(queue_keys)
if value is None: if redis_result is None:
raise NoMoreWorkError('No more work.') return None
queue, msg = value queue_key, blob = redis_result
return (queue, msg)
job = Job.unpickle(blob)
queue = Queue.from_queue_key(queue_key)
job.origin = queue
return job
def __str__(self): def __str__(self):
return self.name return self.name

@ -3,14 +3,14 @@ import os
import random import random
import time import time
import procname import procname
from pickle import dumps
try: try:
from logbook import Logger from logbook import Logger
except ImportError: except ImportError:
from logging import Logger from logging import Logger
from pickle import loads, dumps
from .queue import Queue from .queue import Queue
from .proxy import conn from .proxy import conn
from .exceptions import NoMoreWorkError, NoQueueError from .exceptions import NoQueueError
def iterable(x): def iterable(x):
return hasattr(x, '__iter__') return hasattr(x, '__iter__')
@ -59,13 +59,12 @@ class Worker(object):
did_work = False did_work = False
while True: while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
try: wait_for_job = not quit_when_done
wait_for_job = not quit_when_done job = Queue.dequeue_any(self.queues, wait_for_job)
queue, msg = Queue.dequeue_any(self.queues, wait_for_job) if job is None:
did_work = True
except NoMoreWorkError:
break break
self.fork_and_perform_job(queue, msg) did_work = True
self.fork_and_perform_job(job)
return did_work return did_work
def work_forever(self): def work_forever(self):
@ -74,7 +73,7 @@ class Worker(object):
def work(self): def work(self):
return self._work(True) return self._work(True)
def fork_and_perform_job(self, queue, msg): def fork_and_perform_job(self, job):
child_pid = os.fork() child_pid = os.fork()
if child_pid == 0: if child_pid == 0:
random.seed() random.seed()
@ -82,7 +81,7 @@ class Worker(object):
try: try:
self.procline('Processing work since %d' % (time.time(),)) self.procline('Processing work since %d' % (time.time(),))
self._working = True self._working = True
self.perform_job(queue, msg) self.perform_job(job)
except Exception, e: except Exception, e:
self.log.exception(e) self.log.exception(e)
sys.exit(1) sys.exit(1)
@ -92,11 +91,10 @@ class Worker(object):
os.waitpid(child_pid, 0) os.waitpid(child_pid, 0)
self._working = False self._working = False
def perform_job(self, queue, msg): def perform_job(self, job):
func, args, kwargs, rv_key = loads(msg) self.procline('Processing %s from %s since %s' % (job.func.__name__, job.origin.name, time.time()))
self.procline('Processing %s from %s since %s' % (func.__name__, queue, time.time()))
try: try:
rv = func(*args, **kwargs) rv = job.perform()
except Exception, e: except Exception, e:
rv = e rv = e
self.log.exception(e) self.log.exception(e)
@ -107,6 +105,6 @@ class Worker(object):
self.log.info('Job ended normally without result') self.log.info('Job ended normally without result')
if rv is not None: if rv is not None:
p = conn.pipeline() p = conn.pipeline()
p.set(rv_key, dumps(rv)) p.set(job.rv_key, dumps(rv))
p.expire(rv_key, self.rv_ttl) p.expire(job.rv_key, self.rv_ttl)
p.execute() p.execute()

@ -2,7 +2,7 @@ import unittest
from pickle import loads from pickle import loads
from blinker import signal from blinker import signal
from redis import Redis from redis import Redis
from rq import conn, Queue from rq import conn, Queue, Worker
# Test data # Test data
def testjob(name=None): def testjob(name=None):
@ -85,10 +85,55 @@ class TestQueue(RQTestCase):
q.enqueue(testjob, 'Rick', foo='bar') q.enqueue(testjob, 'Rick', foo='bar')
# Pull it off the queue (normally, a worker would do this) # Pull it off the queue (normally, a worker would do this)
f, args, kwargs, rv_key = q.dequeue() job = q.dequeue()
self.assertEquals(f, testjob) self.assertEquals(job.func, testjob)
self.assertEquals(args[0], 'Rick') self.assertEquals(job.origin, q)
self.assertEquals(kwargs['foo'], 'bar') self.assertEquals(job.args[0], 'Rick')
self.assertEquals(job.kwargs['foo'], 'bar')
def test_dequeue_any(self):
"""Fetching work from any given queue."""
fooq = Queue('foo')
barq = Queue('bar')
self.assertEquals(Queue.dequeue_any([fooq, barq], False), None)
# Enqueue a single item
barq.enqueue(testjob)
job = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(job.func, testjob)
# Enqueue items on both queues
barq.enqueue(testjob, 'for Bar')
fooq.enqueue(testjob, 'for Foo')
job = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, fooq)
self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.')
job = Queue.dequeue_any([fooq, barq], False)
self.assertEquals(job.func, testjob)
self.assertEquals(job.origin, barq)
self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.')
class TestWorker(RQTestCase):
def test_create_worker(self):
"""Worker creation."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq])
def test_work_and_quit(self):
"""Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq])
self.assertEquals(w.work(), False, 'Did not expect any work on the queue.')
fooq.enqueue(testjob, name='Frank')
self.assertEquals(w.work(), True, 'Expected at least some work done.')
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save