From b1650cb9b91c257c7d8a525e92e0c73713086240 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 14:18:17 +0100 Subject: [PATCH] CHECKPOINT: Second part of the big refactoring. Jobs are now stored in separate keys, and only job IDs are put on Redis queues. Much of the code has been hit by this change, but it is for the good. No really. --- rq/job.py | 29 +++++++------ rq/queue.py | 99 +++++++++++++++++++++++--------------------- rq/worker.py | 8 ++-- tests/test_job.py | 30 +++++++++----- tests/test_queue.py | 79 +++++++++++++++++++++-------------- tests/test_worker.py | 11 ++--- 6 files changed, 144 insertions(+), 112 deletions(-) diff --git a/rq/job.py b/rq/job.py index a17e490..c68af23 100644 --- a/rq/job.py +++ b/rq/job.py @@ -5,6 +5,21 @@ from .proxy import conn from .exceptions import UnpickleError, NoSuchJobError +def unpickle(pickled_string): + """Unpickles a string, but raises a unified UnpickleError in case anything + fails. + + This is a helper method to not have to deal with the fact that `loads()` + potentially raises many types of exceptions (e.g. AttributeError, + IndexError, TypeError, KeyError, etc.) + """ + try: + obj = loads(pickled_string) + except StandardError: + raise UnpickleError('Could not unpickle.', pickled_string) + return obj + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ @@ -105,7 +120,7 @@ class Job(object): if pickled_data is None: raise NoSuchJobError('No such job: %s' % (key,)) - self.func, self.args, self.kwargs = loads(pickled_data) + self.func, self.args, self.kwargs = unpickle(pickled_data) self.created_at = times.to_universal(conn.hget(key, 'created_at')) @@ -155,15 +170,3 @@ class Job(object): """ return dumps(self) - @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - - raise UnpickleError('Could not unpickle Job.', pickle_data) - - diff --git a/rq/queue.py b/rq/queue.py index 43d0db0..a938523 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,8 +1,8 @@ -from datetime import datetime +import times from functools import total_ordering from .proxy import conn from .job import Job -from .exceptions import UnpickleError +from .exceptions import NoSuchJobError, UnpickleError @total_ordering @@ -58,21 +58,9 @@ class Queue(object): return conn.llen(self.key) - def _create_job(self, f, *args, **kwargs): - """Creates a Job instance for the given function call and attaches queue - meta data to it. - """ - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - - job = Job.for_call(f, *args, **kwargs) - job.origin = self.name - return job - - def enqueue_job(self, job): - """Enqueues a pickled_job on the corresponding Redis queue.""" - job.save() - conn.rpush(self.key, job.id) + def push_job_id(self, job_id): + """Pushes a job ID on the corresponding Redis queue.""" + conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -80,25 +68,55 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. """ - job = self._create_job(f, *args, **kwargs) - job.enqueued_at = datetime.utcnow() - self.enqueue_job(job) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + + job = Job.for_call(f, *args, **kwargs) + job.origin = self.name + job.enqueued_at = times.now() + job.save() + self.push_job_id(job.id) return Job(job.id) def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" raise NotImplementedError('Implement this') + def pop_job_id(self): + """Pops a given job ID from this Redis queue.""" + return conn.lpop(self.key) + + @classmethod + def lpop(cls, queue_keys, blocking): + """Helper method. Intermediate method to abstract away from some Redis + API details, where LPOP accepts only a single key, whereas BLPOP accepts + multiple. So if we want the non-blocking LPOP, we need to iterate over + all queues, do individual LPOPs, and return the result. + + Until Redis receives a specific method for this, we'll have to wrap it + this way. + """ + if blocking: + queue_key, job_id = conn.blpop(queue_keys) + else: + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return queue_key, blob + return None + def dequeue(self): - """Dequeues the function call at the front of this Queue. + """Dequeues the front-most job from this queue. Returns a Job instance, which can be executed or inspected. """ - blob = conn.lpop(self.key) - if blob is None: + job_id = self.pop_job_id() + if job_id is None: return None try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError as e: + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting @@ -107,20 +125,6 @@ class Queue(object): job.origin = self return job - @classmethod - def _lpop_any(cls, queue_keys): - """Helper method. You should not call this directly. - - Redis' BLPOP command takes multiple queue arguments, but LPOP can only - take a single queue. Therefore, we need to loop over all queues - manually, in order, and return None if no more work is available. - """ - for queue_key in queue_keys: - blob = conn.lpop(queue_key) - if blob is not None: - return (queue_key, blob) - return None - @classmethod def dequeue_any(cls, queues, blocking): """Class method returning the Job instance at the front of the given set @@ -130,18 +134,17 @@ class Queue(object): either blocks execution of this function until new messages arrive on any of the queues, or returns None. """ - queue_keys = map(lambda q: q.key, queues) - if blocking: - queue_key, blob = conn.blpop(queue_keys) - else: - redis_result = cls._lpop_any(queue_keys) - if redis_result is None: - return None - queue_key, blob = redis_result - + queue_keys = [q.key for q in queues] + result = cls.lpop(queue_keys, blocking) + if result is None: + return None + queue_key, job_id = result queue = Queue.from_queue_key(queue_key) try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError: + # Silently pass on jobs that don't exist (anymore) + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting diff --git a/rq/worker.py b/rq/worker.py index b2e6484..7c6911c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,9 +1,9 @@ import sys import os import errno -import datetime import random import time +import times import procname import socket import signal @@ -233,7 +233,7 @@ class Worker(object): Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the - queues, unless `burst` is True. + queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ @@ -249,7 +249,7 @@ class Worker(object): break self.state = 'idle' qnames = self.queue_names() - self.procline('Listening on %s' % (','.join(qnames))) + self.procline('Listening on %s' % ','.join(qnames)) self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst try: @@ -321,7 +321,7 @@ class Worker(object): fq = self.failure_queue self.log.warning('Moving job to %s queue.' % (fq.name,)) - job.ended_at = datetime.datetime.utcnow() + job.ended_at = times.now() job.exc_info = e fq._push(job.pickle()) else: diff --git a/tests/test_job.py b/tests/test_job.py index 45bdf97..112a5f2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,6 +1,6 @@ from datetime import datetime from tests import RQTestCase -from pickle import dumps, loads +from pickle import loads from rq.job import Job from rq.exceptions import NoSuchJobError, UnpickleError @@ -95,16 +95,26 @@ class TestJob(RQTestCase): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') - def test_unpickle_errors(self): - """Handling of unpickl'ing errors.""" - with self.assertRaises(UnpickleError): - Job.unpickle('this is no pickle data') + def test_dequeue_unreadable_data(self): + """Dequeue fails on unreadable data.""" + # Set up + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Just replace the data hkey with some random noise + self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): - Job.unpickle(13) + job.refresh() - pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) - corrupt_data = pickle_data.replace('arbitrary', 'b0rken') - with self.assertRaises(UnpickleError): - Job.unpickle(corrupt_data) + # Set up (part B) + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Now slightly modify the job to make it unpickl'able (this is + # equivalent to a worker not having the most up-to-date source code and + # unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('arbitrary_function', 'broken') + self.testconn.hset(job.key, 'data', unimportable_data) + with self.assertRaises(UnpickleError): + job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index 80e1acc..a1866dc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -39,7 +39,7 @@ class TestQueue(RQTestCase): def test_enqueue(self): - """Enqueueing writes job IDs to queues.""" + """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -53,18 +53,51 @@ class TestQueue(RQTestCase): self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + def test_pop_job_id(self): + """Popping job IDs from queues.""" + # Set up + q = Queue() + uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da' + q.push_job_id(uuid) + + # Pop it off the queue... + self.assertEquals(q.count, 1) + self.assertEquals(q.pop_job_id(), uuid) + + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue(self): - """Fetching work from specific queue.""" - q = Queue('foo') - q.enqueue(testjob, 'Rick', foo='bar') + """Dequeueing jobs from queues.""" + # Set up + q = Queue() + result = q.enqueue(testjob, 'Rick', foo='bar') - # Pull it off the queue (normally, a worker would do this) + # Dequeue a job (not a job ID) off the queue + self.assertEquals(q.count, 1) job = q.dequeue() + self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) self.assertEquals(job.origin, q) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue_ignores_nonexisting_jobs(self): + """Dequeuing silently ignores non-existing jobs.""" + + q = Queue() + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) + + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(q.dequeue(), None) + self.assertEquals(q.count, 0) + def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -91,33 +124,15 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, barq) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') - def test_dequeue_unpicklable_data(self): - """Error handling of invalid pickle data.""" - - # Push non-pickle data on the queue - q = Queue('foo') - blob = 'this is nothing like pickled data' - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push value pickle data, but not representing a job tuple - q = Queue('foo') - blob = dumps('this is pickled, but not a job tuple') - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push slightly incorrect pickled data onto the queue (simulate - # a function that can't be imported from the worker) - q = Queue('foo') + def test_dequeue_any_ignores_nonexisting_jobs(self): + """Dequeuing (from any queue) silently ignores non-existing jobs.""" - job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) - blob = job_tuple.replace('testjob', 'fooobar') - self.testconn.rpush(q._key, blob) + q = Queue('low') + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when dequeue()'ing + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None) + self.assertEquals(q.count, 0) diff --git a/tests/test_worker.py b/tests/test_worker.py index 06242f6..b2ddacb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker processes unreadable job.""" + """Worker ignores unreadable job.""" q = Queue() failure_q = Queue('failure') @@ -32,13 +32,14 @@ class TestWorker(RQTestCase): # What we're simulating here is a call to a function that is not # importable from the worker process. job = Job.for_call(failing_job, 3) - pickled_job = job.pickle() - invalid_data = pickled_job.replace( - 'failing_job', 'nonexisting_job') + job.save() + data = self.testconn.hget(job.key, 'data') + invalid_data = data.replace('failing_job', 'nonexisting_job') + self.testconn.hset(job.key, 'data', invalid_data) # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q._push(invalid_data) + q.push_job_id(invalid_data) self.assertEquals(q.count, 1)