diff --git a/rq/__init__.py b/rq/__init__.py index 9ed8d19..42881a4 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,5 +1,4 @@ from .proxy import conn from .queue import Queue -from .job import job -__all__ = ['conn', 'Queue', 'job'] +__all__ = ['conn', 'Queue'] diff --git a/rq/job.py b/rq/job.py deleted file mode 100644 index 9de638e..0000000 --- a/rq/job.py +++ /dev/null @@ -1,50 +0,0 @@ -import uuid -from pickle import loads, dumps -from .proxy import conn -from .queue import Queue - - -class DelayedResult(object): - def __init__(self, key): - self.key = key - self._rv = None - - @property - def return_value(self): - if self._rv is None: - rv = conn.get(self.key) - if rv is not None: - # cache the result - self._rv = loads(rv) - return self._rv - - -class job(object): - """The @job decorator extends the given function with two new methods: - `delay` and `enqueue`. - """ - - def __init__(self, queue_name=None): - if queue_name is not None: - self.queue = Queue(queue_name) - else: - self.queue = None - - def __call__(self, f): - def enqueue(queue, *args, **kwargs): - if not isinstance(queue, Queue): - raise ValueError('Argument queue must be a Queue.') - rv_key = '%s:result:%s' % (queue.key, str(uuid.uuid4())) - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - s = dumps((f, rv_key, args, kwargs)) - conn.rpush(queue.key, s) - return DelayedResult(rv_key) - f.enqueue = enqueue - - def delay(*args, **kwargs): - if self.queue is None: - raise ValueError('This job has no default queue set.') - return f.enqueue(self.queue, *args, **kwargs) - f.delay = delay - return f diff --git a/rq/queue.py b/rq/queue.py index 81d3854..58c4ed1 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,16 +1,32 @@ -from pickle import loads +import uuid +from pickle import loads, dumps from .proxy import conn + +class DelayedResult(object): + def __init__(self, key): + self.key = key + self._rv = None + + @property + def return_value(self): + if self._rv is None: + rv = conn.get(self.key) + if rv is not None: + # cache the result + self._rv = loads(rv) + return self._rv + + + def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) class Queue(object): - def __init__(self, friendly_name): - if not friendly_name: - raise ValueError("Please specify a valid queue name (Got '%s')." % friendly_name) - self.name = friendly_name - self._key = to_queue_key(friendly_name) + def __init__(self, name='default'): + self.name = name + self._key = to_queue_key(name) @property def key(self): @@ -29,7 +45,12 @@ class Queue(object): return conn.llen(self.key) def enqueue(self, job, *args, **kwargs): - return job.enqueue(self, *args, **kwargs) + rv_key = '%s:result:%s' % (self.key, str(uuid.uuid4())) + if job.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + message = dumps((job, args, kwargs, rv_key)) + conn.rpush(self.key, message) + return DelayedResult(rv_key) def dequeue(self): s = conn.lpop(self.key) diff --git a/rq/worker.py b/rq/worker.py index def0230..df4c74f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -11,20 +11,28 @@ from pickle import loads, dumps from .queue import Queue from .proxy import conn +def iterable(x): + return hasattr(x, '__iter__') + class NoQueueError(Exception): pass class NoMoreWorkError(Exception): pass class Worker(object): - def __init__(self, queue_names, rv_ttl=500): - self.queues = map(Queue, queue_names) + def __init__(self, queues, rv_ttl=500): + if isinstance(queues, Queue): + queues = [queues] + self.queues = queues + self.validate_queues() self.rv_ttl = rv_ttl self._working = False self.log = Logger('worker') - self.validate_queues() def validate_queues(self): - if not self.queues: - raise NoQueueError('Give each worker at least one queue.') + if not iterable(self.queues): + raise ValueError('Argument queues not iterable.') + for queue in self.queues: + if not isinstance(queue, Queue): + raise NoQueueError('Give each worker at least one Queue.') def queue_names(self): return map(lambda q: q.name, self.queues) @@ -32,12 +40,14 @@ class Worker(object): def queue_keys(self): return map(lambda q: q.key, self.queues) + def is_idle(self): return not self.is_working() def is_working(self): return self._working + @property def pid(self): return os.getpid() @@ -46,6 +56,7 @@ class Worker(object): self.log.debug(message) procname.setprocname('rq: %s' % (message,)) + def multi_lpop(self, queues): # Redis' BLPOP command takes multiple queue arguments, but LPOP can # only take a single queue. Therefore, we need to loop over all @@ -68,7 +79,7 @@ class Worker(object): queue, msg = value return (queue, msg) - def work(self, quit_when_done=False): + def _work(self, quit_when_done=False): while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) try: @@ -78,6 +89,12 @@ class Worker(object): break self.fork_and_perform_job(queue, msg) + def work_forever(self): + return self._work(False) + + def work(self): + return self._work(True) + def fork_and_perform_job(self, queue, msg): child_pid = os.fork() if child_pid == 0: @@ -97,7 +114,7 @@ class Worker(object): self._working = False def perform_job(self, queue, msg): - func, key, args, kwargs = loads(msg) + func, args, kwargs, rv_key = loads(msg) self.procline('Processing %s from %s since %s' % (func.__name__, queue, time.time())) try: rv = func(*args, **kwargs) @@ -111,6 +128,6 @@ class Worker(object): self.log.info('Job ended normally without result') if rv is not None: p = conn.pipeline() - p.set(key, dumps(rv)) - p.expire(key, self.rv_ttl) + p.set(rv_key, dumps(rv)) + p.expire(rv_key, self.rv_ttl) p.execute() diff --git a/tests/test_rq.py b/tests/test_rq.py index ab8818b..d48936f 100644 --- a/tests/test_rq.py +++ b/tests/test_rq.py @@ -2,21 +2,14 @@ import unittest from pickle import loads from blinker import signal from redis import Redis -from rq import conn, Queue, job +from rq import conn, Queue # Test data -@job('my-queue') def testjob(name=None): if name is None: name = 'Stranger' return 'Hi there, %s!' % (name,) -@job() # no queue spec'ed -def queueless_job(name=None): - if name is None: - name = 'Stranger' - return 'Hi there, %s!' % (name,) - class RQTestCase(unittest.TestCase): def setUp(self): @@ -62,6 +55,11 @@ class TestQueue(RQTestCase): q = Queue('my-queue') self.assertEquals(q.name, 'my-queue') + def test_create_default_queue(self): + """Instantiating the default queue.""" + q = Queue() + self.assertEquals(q.name, 'default') + def test_queue_empty(self): """Detecting empty queues.""" q = Queue('my-queue') @@ -72,94 +70,25 @@ class TestQueue(RQTestCase): def test_enqueue(self): - """Putting work on queues using delay.""" + """Putting work on queues.""" q = Queue('my-queue') self.assertEquals(q.empty, True) # testjob spec holds which queue this is sent to - testjob.delay() - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) - - def test_enqueue_to_different_queue(self): - """Putting work on alternative queues using enqueue.""" - - # Override testjob spec holds which queue - q = Queue('different-queue') - self.assertEquals(q.empty, True) - testjob.enqueue(q, 'Nick') - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) - - def test_enqueue_to_different_queue_reverse(self): - """Putting work on specific queues using the Queue object.""" - - q = Queue('alt-queue') - self.assertEquals(q.empty, True) - q.enqueue(testjob) + q.enqueue(testjob, 'Nick', foo='bar') self.assertEquals(q.empty, False) self.assertQueueContains(q, testjob) - def test_dequeue(self): """Fetching work from specific queue.""" q = Queue('foo') - testjob.enqueue(q, 'Rick') + q.enqueue(testjob, 'Rick', foo='bar') # Pull it off the queue (normally, a worker would do this) - f, rv_key, args, kwargs = q.dequeue() + f, args, kwargs, rv_key = q.dequeue() self.assertEquals(f, testjob) self.assertEquals(args[0], 'Rick') - - -class TestJob(RQTestCase): - def test_job_methods(self): - """Jobs have methods to enqueue them.""" - self.assertTrue(hasattr(testjob, 'delay')) - self.assertTrue(hasattr(testjob, 'enqueue')) - self.assertTrue(hasattr(queueless_job, 'delay')) - self.assertTrue(hasattr(queueless_job, 'enqueue')) - - def test_queue_empty(self): - """Detecting empty queues.""" - q = Queue('my-queue') - self.assertEquals(q.empty, True) - - conn.rpush('rq:my-queue', 'some val') - self.assertEquals(q.empty, False) - - def test_put_work_on_queue(self): - """Putting work on queues using delay.""" - q = Queue('my-queue') - self.assertEquals(q.empty, True) - - # testjob spec holds which queue this is sent to - testjob.delay() - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) - - def test_put_work_on_queue_fails_for_queueless_jobs(self): - """Putting work on queues using delay fails for queueless jobs.""" - self.assertRaises(ValueError, queueless_job.delay, 'Rick') - - def test_put_work_on_different_queue(self): - """Putting work on alternative queues using enqueue.""" - - # Override testjob spec holds which queue - q = Queue('different-queue') - self.assertEquals(q.empty, True) - testjob.enqueue(q) - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) - - def test_put_work_on_different_queue_reverse(self): - """Putting work on specific queues using the Queue object.""" - - q = Queue('alt-queue') - self.assertEquals(q.empty, True) - q.enqueue(testjob, 'Simon') - self.assertEquals(q.empty, False) - self.assertQueueContains(q, testjob) + self.assertEquals(kwargs['foo'], 'bar') if __name__ == '__main__':