From a5ea45af57d16081d963b4143ec5130956a48311 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 16 Nov 2011 12:44:33 +0100 Subject: [PATCH] Make the dequeue methods return values consistent. I merely refactored the internal calls. No external API changes have been made in this commit. In order to make the dequeueing methods consistent, each dequeue method now returns a Job instance, which is just a nice lightweight wrapper around the job tuple. The Job class makes it easier to pass the method call info around, along with some possible meta information, like the queue the job originated from. This fixes #7. --- rq/exceptions.py | 3 -- rq/queue.py | 82 +++++++++++++++++++++++++++++++++++------------- rq/worker.py | 30 +++++++++--------- tests/test_rq.py | 55 +++++++++++++++++++++++++++++--- 4 files changed, 125 insertions(+), 45 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 00583d7..6269e81 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,5 +1,2 @@ class NoQueueError(Exception): pass - -class NoMoreWorkError(Exception): - pass diff --git a/rq/queue.py b/rq/queue.py index eeac2e7..793fb39 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,6 @@ import uuid from pickle import loads, dumps from .proxy import conn -from .exceptions import NoMoreWorkError class DelayedResult(object): def __init__(self, key): @@ -17,15 +16,44 @@ class DelayedResult(object): self._rv = loads(rv) return self._rv +class Job(object): + """A Job is just a convenient datastructure to pass around job (meta) data. + """ -def to_queue_key(queue_name): - return 'rq:%s' % (queue_name,) + @classmethod + def unpickle(cls, pickle_data): + job_tuple = loads(pickle_data) + return Job(job_tuple) + + def __init__(self, job_tuple, origin=None): + self.func, self.args, self.kwargs, self.rv_key = job_tuple + self.origin = origin + + def perform(self): + """Invokes the job function with the job arguments. + """ + return self.func(*self.args, **self.kwargs) class Queue(object): + redis_queue_namespace_prefix = 'rq:' + + @classmethod + def from_queue_key(cls, queue_key): + """Returns a Queue instance, based on the naming conventions for naming + the internal Redis keys. Can be used to reverse-lookup Queues by their + Redis keys. + """ + prefix = cls.redis_queue_namespace_prefix + if not queue_key.startswith(prefix): + raise ValueError('Not a valid RQ queue key: %s' % (queue_key,)) + name = queue_key[len(prefix):] + return Queue(name) + def __init__(self, name='default'): + prefix = self.redis_queue_namespace_prefix self.name = name - self._key = to_queue_key(name) + self._key = '%s%s' % (prefix, name) @property def key(self): @@ -52,31 +80,43 @@ class Queue(object): return DelayedResult(rv_key) def dequeue(self): - s = conn.lpop(self.key) - return loads(s) + blob = conn.lpop(self.key) + if blob is None: + return None + job = Job.unpickle(blob) + job.origin = self + return job @classmethod - def _dequeue_any(cls, queues): - # Redis' BLPOP command takes multiple queue arguments, but LPOP can - # only take a single queue. Therefore, we need to loop over all - # queues manually, in order, and return None if no more work is - # available - for queue in queues: - value = conn.lpop(queue) - if value is not None: - return (queue, value) + def _lpop_any(cls, queue_keys): + """Helper method. You should not call this directly. + + Redis' BLPOP command takes multiple queue arguments, but LPOP can only + take a single queue. Therefore, we need to loop over all queues + manually, in order, and return None if no more work is available. + """ + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return (queue_key, blob) return None @classmethod def dequeue_any(cls, queues, blocking): + queue_keys = map(lambda q: q.key, queues) if blocking: - queue, msg = conn.blpop(queues) + queue_key, blob = conn.blpop(queue_keys) else: - value = cls._dequeue_any(queues) - if value is None: - raise NoMoreWorkError('No more work.') - queue, msg = value - return (queue, msg) + redis_result = cls._lpop_any(queue_keys) + if redis_result is None: + return None + queue_key, blob = redis_result + + job = Job.unpickle(blob) + queue = Queue.from_queue_key(queue_key) + job.origin = queue + return job + def __str__(self): return self.name diff --git a/rq/worker.py b/rq/worker.py index 07fdcca..9ce92e5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -3,14 +3,14 @@ import os import random import time import procname +from pickle import dumps try: from logbook import Logger except ImportError: from logging import Logger -from pickle import loads, dumps from .queue import Queue from .proxy import conn -from .exceptions import NoMoreWorkError, NoQueueError +from .exceptions import NoQueueError def iterable(x): return hasattr(x, '__iter__') @@ -59,13 +59,12 @@ class Worker(object): did_work = False while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - try: - wait_for_job = not quit_when_done - queue, msg = Queue.dequeue_any(self.queues, wait_for_job) - did_work = True - except NoMoreWorkError: + wait_for_job = not quit_when_done + job = Queue.dequeue_any(self.queues, wait_for_job) + if job is None: break - self.fork_and_perform_job(queue, msg) + did_work = True + self.fork_and_perform_job(job) return did_work def work_forever(self): @@ -74,7 +73,7 @@ class Worker(object): def work(self): return self._work(True) - def fork_and_perform_job(self, queue, msg): + def fork_and_perform_job(self, job): child_pid = os.fork() if child_pid == 0: random.seed() @@ -82,7 +81,7 @@ class Worker(object): try: self.procline('Processing work since %d' % (time.time(),)) self._working = True - self.perform_job(queue, msg) + self.perform_job(job) except Exception, e: self.log.exception(e) sys.exit(1) @@ -92,11 +91,10 @@ class Worker(object): os.waitpid(child_pid, 0) self._working = False - def perform_job(self, queue, msg): - func, args, kwargs, rv_key = loads(msg) - self.procline('Processing %s from %s since %s' % (func.__name__, queue, time.time())) + def perform_job(self, job): + self.procline('Processing %s from %s since %s' % (job.func.__name__, job.origin.name, time.time())) try: - rv = func(*args, **kwargs) + rv = job.perform() except Exception, e: rv = e self.log.exception(e) @@ -107,6 +105,6 @@ class Worker(object): self.log.info('Job ended normally without result') if rv is not None: p = conn.pipeline() - p.set(rv_key, dumps(rv)) - p.expire(rv_key, self.rv_ttl) + p.set(job.rv_key, dumps(rv)) + p.expire(job.rv_key, self.rv_ttl) p.execute() diff --git a/tests/test_rq.py b/tests/test_rq.py index d48936f..fb8dced 100644 --- a/tests/test_rq.py +++ b/tests/test_rq.py @@ -2,7 +2,7 @@ import unittest from pickle import loads from blinker import signal from redis import Redis -from rq import conn, Queue +from rq import conn, Queue, Worker # Test data def testjob(name=None): @@ -85,10 +85,55 @@ class TestQueue(RQTestCase): q.enqueue(testjob, 'Rick', foo='bar') # Pull it off the queue (normally, a worker would do this) - f, args, kwargs, rv_key = q.dequeue() - self.assertEquals(f, testjob) - self.assertEquals(args[0], 'Rick') - self.assertEquals(kwargs['foo'], 'bar') + job = q.dequeue() + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, q) + self.assertEquals(job.args[0], 'Rick') + self.assertEquals(job.kwargs['foo'], 'bar') + + + def test_dequeue_any(self): + """Fetching work from any given queue.""" + fooq = Queue('foo') + barq = Queue('bar') + + self.assertEquals(Queue.dequeue_any([fooq, barq], False), None) + + # Enqueue a single item + barq.enqueue(testjob) + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + + # Enqueue items on both queues + barq.enqueue(testjob, 'for Bar') + fooq.enqueue(testjob, 'for Foo') + + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, fooq) + self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') + + job = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(job.func, testjob) + self.assertEquals(job.origin, barq) + self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') + + +class TestWorker(RQTestCase): + def test_create_worker(self): + """Worker creation.""" + fooq, barq = Queue('foo'), Queue('bar') + w = Worker([fooq, barq]) + self.assertEquals(w.queues, [fooq, barq]) + + def test_work_and_quit(self): + """Worker processes work, then quits.""" + fooq, barq = Queue('foo'), Queue('bar') + w = Worker([fooq, barq]) + self.assertEquals(w.work(), False, 'Did not expect any work on the queue.') + + fooq.enqueue(testjob, name='Frank') + self.assertEquals(w.work(), True, 'Expected at least some work done.') if __name__ == '__main__':