From fcca48a9d7525f74375e3554a0fb64ec20d06bdf Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Feb 2012 20:53:06 +0100 Subject: [PATCH 01/13] Rename empty property -> is_empty() method. --- rq/queue.py | 3 +-- tests/test_queue.py | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index dbef8c0..e7f86bd 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -72,8 +72,7 @@ class Queue(object): """Returns the Redis key for this Queue.""" return self._key - @property - def empty(self): + def is_empty(self): """Returns whether the current queue is empty.""" return self.count == 0 diff --git a/tests/test_queue.py b/tests/test_queue.py index c7cbc2e..ff18e9e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -32,20 +32,20 @@ class TestQueue(RQTestCase): def test_queue_empty(self): """Detecting empty queues.""" q = Queue('my-queue') - self.assertEquals(q.empty, True) + self.assertEquals(q.is_empty(), True) self.testconn.rpush('rq:queue:my-queue', 'some val') - self.assertEquals(q.empty, False) + self.assertEquals(q.is_empty(), False) def test_enqueue(self): """Putting work on queues.""" q = Queue('my-queue') - self.assertEquals(q.empty, True) + self.assertEquals(q.is_empty(), True) # testjob spec holds which queue this is sent to q.enqueue(testjob, 'Nick', foo='bar') - self.assertEquals(q.empty, False) + self.assertEquals(q.is_empty(), False) self.assertQueueContains(q, testjob) From 7fff52d99c00ad480cf01c6d7505de6be1ed1eca Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Feb 2012 20:59:29 +0100 Subject: [PATCH 02/13] Get rid of ugly custom assertion. --- tests/__init__.py | 9 --------- tests/test_queue.py | 6 +++++- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 3109684..b912bed 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -53,12 +53,3 @@ class RQTestCase(unittest.TestCase): testconn = conn.pop() assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' - - def assertQueueContains(self, queue, that_func): - # Do a queue scan (this is O(n), but we're in a test, so hey) - for job in queue.jobs: - if job.func == that_func: - return - self.fail('Queue %s does not contain message for function %s' % - (queue.key, that_func)) - diff --git a/tests/test_queue.py b/tests/test_queue.py index ff18e9e..071891f 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -46,7 +46,11 @@ class TestQueue(RQTestCase): # testjob spec holds which queue this is sent to q.enqueue(testjob, 'Nick', foo='bar') self.assertEquals(q.is_empty(), False) - self.assertQueueContains(q, testjob) + for job in q.jobs: + if job.func == testjob: + break + else: + self.fail('Job not found on queue.') def test_dequeue(self): From 65105b44c3999c577ec615dcda7dbf34867d8fdb Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 00:40:43 +0100 Subject: [PATCH 03/13] CHECKPOINT: Initial part of the big refactor. --- rq/exceptions.py | 3 + rq/job.py | 158 ++++++++++++++++++++++++++++++++++++------- rq/queue.py | 45 ++---------- setup.py | 2 +- tests/test_job.py | 90 ++++++++++++++++++++---- tests/test_queue.py | 22 +++--- tests/test_worker.py | 2 +- 7 files changed, 232 insertions(+), 90 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 0b885f7..813bcde 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -1,3 +1,6 @@ +class NoSuchJobError(Exception): + pass + class NoQueueError(Exception): pass diff --git a/rq/job.py b/rq/job.py index 0ae5bb1..a17e490 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,51 +1,131 @@ -from datetime import datetime +import times from uuid import uuid4 from pickle import loads, dumps -from .exceptions import UnpickleError +from .proxy import conn +from .exceptions import UnpickleError, NoSuchJobError class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ + + # Job construction @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - raise UnpickleError('Could not unpickle Job.', pickle_data) + def for_call(cls, func, *args, **kwargs): + """Creates a new Job instance for the given function, arguments, and + keyword arguments. + """ + job = Job() + job.func = func + job.args = args + job.kwargs = kwargs + return job - def __init__(self, func, *args, **kwargs): - self._id = unicode(uuid4()) - self.func = func - self.args = args - self.kwargs = kwargs + @classmethod + def fetch(cls, id): + """Fetches a persisted job from its corresponding Redis key and + instantiates it. + """ + job = Job(id) + job.refresh() + return job + + def __init__(self, id=None): + self._id = id + self.func = None + self.args = None + self.kwargs = None self.origin = None - self.created_at = datetime.utcnow() + self.created_at = times.now() self.enqueued_at = None + self.result = None self.exc_info = None - def pickle(self): - """Returns the pickle'd string represenation of a Job. Suitable for writing to Redis.""" - return dumps(self) + + # Data access + def get_id(self): + """The job ID for this job instance. Generates an ID lazily the + first time the ID is requested. + """ + if self._id is None: + self._id = unicode(uuid4()) + return self._id + + def set_id(self, value): + """Sets a job ID for the given job.""" + self._id = value + + id = property(get_id, set_id) @property - def rv_key(self): - """Returns the Redis key under which the Job's result will be stored, if applicable.""" - return 'rq:result:%s' % (self._id,) + def key(self): + """The Redis key that is used to store job data under.""" + return 'rq:job:%s' % (self.id,) + @property - def id(self): - """Returns the Job's internal ID.""" - return self._id + def job_tuple(self): + """Returns the job tuple that encodes the actual function call that this job represents.""" + return (self.func, self.args, self.kwargs) + + @property + def return_value(self): + """Returns the return value of the job. + + Initially, right after enqueueing a job, the return value will be None. + But when the job has been executed, and had a return value or exception, + this will return that value or exception. + + Note that, when the job has no return value (i.e. returns None), the + ReadOnlyJob object is useless, as the result won't be written back to + Redis. + + Also note that you cannot draw the conclusion that a job has _not_ been + executed when its return value is None, since return values written back + to Redis will expire after a given amount of time (500 seconds by + default). + """ + if self._cached_result is None: + rv = conn.hget(self.key, 'result') + if rv is not None: + # cache the result + self._cached_result = loads(rv) + return self._cached_result + + # Persistence + def refresh(self): + """Overwrite the current instance's properties with the values in the + corresponding Redis key. + + Will raise a NoSuchJobError if no corresponding Redis key exists. + """ + key = self.key + pickled_data = conn.hget(key, 'data') + if pickled_data is None: + raise NoSuchJobError('No such job: %s' % (key,)) + + self.func, self.args, self.kwargs = loads(pickled_data) + + self.created_at = times.to_universal(conn.hget(key, 'created_at')) + + def save(self): + """Persists the current job instance to its corresponding Redis key.""" + pickled_data = dumps(self.job_tuple) + + key = self.key + conn.hset(key, 'data', pickled_data) + conn.hset(key, 'created_at', times.format(self.created_at, 'UTC')) + + + # Job execution def perform(self): """Invokes the job function with the job arguments. """ return self.func(*self.args, **self.kwargs) + + # Representation @property def call_string(self): """Returns a string representation of the call, formatted as a regular @@ -59,5 +139,31 @@ class Job(object): def __str__(self): return '' % (self.id, self.call_string) + + # Job equality def __eq__(self, other): - return cmp(self.id, other.id) + return self.id == other.id + + def __hash__(self): + return hash(self.id) + + + # TODO: TO REFACTOR / REMOVE + def pickle(self): + """Returns the pickle'd string represenation of a Job. Suitable for + writing to Redis. + """ + return dumps(self) + + @classmethod + def unpickle(cls, pickle_data): + """Constructs a Job instance form the given pickle'd job tuple data.""" + try: + unpickled_obj = loads(pickle_data) + assert isinstance(unpickled_obj, Job) + return unpickled_obj + except (AssertionError, AttributeError, IndexError, TypeError, KeyError): + + raise UnpickleError('Could not unpickle Job.', pickle_data) + + diff --git a/rq/queue.py b/rq/queue.py index e7f86bd..43d0db0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,44 +1,10 @@ from datetime import datetime from functools import total_ordering -from pickle import loads from .proxy import conn from .job import Job from .exceptions import UnpickleError -class DelayedResult(object): - """Proxy object that is returned as a result of `Queue.enqueue()` calls. - Instances of DelayedResult can be polled for their return values. - """ - def __init__(self, key): - self.key = key - self._rv = None - - @property - def return_value(self): - """Returns the return value of the job. - - Initially, right after enqueueing a job, the return value will be None. - But when the job has been executed, and had a return value or exception, - this will return that value or exception. - - Note that, when the job has no return value (i.e. returns None), the - DelayedResult object is useless, as the result won't be written back to - Redis. - - Also note that you cannot draw the conclusion that a job has _not_ been - executed when its return value is None, since return values written back - to Redis will expire after a given amount of time (500 seconds by - default). - """ - if self._rv is None: - rv = conn.get(self.key) - if rv is not None: - # cache the result - self._rv = loads(rv) - return self._rv - - @total_ordering class Queue(object): redis_queue_namespace_prefix = 'rq:queue:' @@ -99,13 +65,14 @@ class Queue(object): if f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed by workers.') - job = Job(f, *args, **kwargs) + job = Job.for_call(f, *args, **kwargs) job.origin = self.name return job - def _push(self, pickled_job): + def enqueue_job(self, job): """Enqueues a pickled_job on the corresponding Redis queue.""" - conn.rpush(self.key, pickled_job) + job.save() + conn.rpush(self.key, job.id) def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -115,8 +82,8 @@ class Queue(object): """ job = self._create_job(f, *args, **kwargs) job.enqueued_at = datetime.utcnow() - self._push(job.pickle()) - return DelayedResult(job.rv_key) + self.enqueue_job(job) + return Job(job.id) def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" diff --git a/setup.py b/setup.py index 8dcee49..5263a04 100644 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_version(): raise RuntimeError('No version info found.') def get_dependencies(): - deps = ['redis', 'procname'] + deps = ['redis', 'procname', 'times'] deps += ['logbook'] # should be soft dependency? if sys.version_info < (2, 7) or \ sys.version_info >= (3, 0) and sys.version_info < (3, 2): diff --git a/tests/test_job.py b/tests/test_job.py index de6c78e..45bdf97 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,8 +1,8 @@ +from datetime import datetime from tests import RQTestCase from pickle import dumps, loads from rq.job import Job -#from rq import Queue, Worker -from rq.exceptions import UnpickleError +from rq.exceptions import NoSuchJobError, UnpickleError def arbitrary_function(x, y, z=1): @@ -10,25 +10,91 @@ def arbitrary_function(x, y, z=1): class TestJob(RQTestCase): - def test_create_job(self): - """Creation of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) + def test_create_empty_job(self): + """Creation of new empty jobs.""" + job = Job() + + # Jobs have a random UUID + self.assertIsNotNone(job.id) + + # Jobs have no data yet... + self.assertEquals(job.func, None) + self.assertEquals(job.args, None) + self.assertEquals(job.kwargs, None) + self.assertEquals(job.origin, None) + self.assertEquals(job.enqueued_at, None) + self.assertEquals(job.result, None) + self.assertEquals(job.exc_info, None) + + # ...except for a created_at property + self.assertIsNotNone(job.created_at) + + def test_create_normal_job(self): + """Creation of jobs for function calls.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + + # Jobs have a random UUID + self.assertIsNotNone(job.id) + self.assertIsNotNone(job.created_at) + + # Job data is set... self.assertEquals(job.func, arbitrary_function) self.assertEquals(job.args, (3, 4)) self.assertEquals(job.kwargs, {'z': 2}) + + # ...but metadata is not self.assertIsNone(job.origin) - self.assertIsNotNone(job.created_at) self.assertIsNone(job.enqueued_at) - self.assertIsNotNone(job.rv_key) + self.assertIsNone(job.result) + + + def test_save(self): + """Storing jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) - def test_pickle_job(self): - """Pickling of jobs.""" - job = Job(arbitrary_function, 3, 4, z=2) - job2 = loads(dumps(job)) + # Saving creates a Redis hash + self.assertEquals(self.testconn.exists(job.key), False) + job.save() + self.assertEquals(self.testconn.type(job.key), 'hash') + + # Saving writes pickled job data + unpickled_data = loads(self.testconn.hget(job.key, 'data')) + self.assertEquals(unpickled_data[0], arbitrary_function) + + def test_fetch(self): + """Fetching jobs.""" + # Prepare test + self.testconn.hset('rq:job:some_id', 'data', "(ctest_job\narbitrary_function\np0\n(I3\nI4\ntp1\n(dp2\nS'z'\np3\nI2\nstp4\n.") + self.testconn.hset('rq:job:some_id', 'created_at', "2012-02-07 22:13:24+0000") + + # Fetch returns a job + job = Job.fetch('some_id') + self.assertEquals(job.id, 'some_id') + self.assertEquals(job.func, arbitrary_function) + self.assertEquals(job.args, (3, 4)) + self.assertEquals(job.kwargs, dict(z=2)) + self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) + + + def test_persistence_of_jobs(self): + """Storing and fetching of jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + job2 = Job.fetch(job.id) self.assertEquals(job.func, job2.func) self.assertEquals(job.args, job2.args) self.assertEquals(job.kwargs, job2.kwargs) + # Mathematical equation + self.assertEquals(job, job2) + + def test_fetching_can_fail(self): + """Fetching fails for non-existing jobs.""" + with self.assertRaises(NoSuchJobError): + Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') + + def test_unpickle_errors(self): """Handling of unpickl'ing errors.""" with self.assertRaises(UnpickleError): @@ -37,7 +103,7 @@ class TestJob(RQTestCase): with self.assertRaises(UnpickleError): Job.unpickle(13) - pickle_data = dumps(Job(arbitrary_function, 2, 3)) + pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) corrupt_data = pickle_data.replace('arbitrary', 'b0rken') with self.assertRaises(UnpickleError): Job.unpickle(corrupt_data) diff --git a/tests/test_queue.py b/tests/test_queue.py index 071891f..80e1acc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -31,26 +31,26 @@ class TestQueue(RQTestCase): def test_queue_empty(self): """Detecting empty queues.""" - q = Queue('my-queue') + q = Queue('example') self.assertEquals(q.is_empty(), True) - self.testconn.rpush('rq:queue:my-queue', 'some val') + self.testconn.rpush('rq:queue:example', 'sentinel message') self.assertEquals(q.is_empty(), False) def test_enqueue(self): - """Putting work on queues.""" - q = Queue('my-queue') + """Enqueueing writes job IDs to queues.""" + q = Queue() self.assertEquals(q.is_empty(), True) # testjob spec holds which queue this is sent to - q.enqueue(testjob, 'Nick', foo='bar') - self.assertEquals(q.is_empty(), False) - for job in q.jobs: - if job.func == testjob: - break - else: - self.fail('Job not found on queue.') + job = q.enqueue(testjob, 'Nick', foo='bar') + job_id = job.id + + # Inspect data inside Redis + q_key = 'rq:queue:default' + self.assertEquals(self.testconn.llen(q_key), 1) + self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) def test_dequeue(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index b4cd0aa..06242f6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -31,7 +31,7 @@ class TestWorker(RQTestCase): # NOTE: We have to fake this enqueueing for this test case. # What we're simulating here is a call to a function that is not # importable from the worker process. - job = Job(failing_job, 3) + job = Job.for_call(failing_job, 3) pickled_job = job.pickle() invalid_data = pickled_job.replace( 'failing_job', 'nonexisting_job') From b1650cb9b91c257c7d8a525e92e0c73713086240 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 14:18:17 +0100 Subject: [PATCH 04/13] CHECKPOINT: Second part of the big refactoring. Jobs are now stored in separate keys, and only job IDs are put on Redis queues. Much of the code has been hit by this change, but it is for the good. No really. --- rq/job.py | 29 +++++++------ rq/queue.py | 99 +++++++++++++++++++++++--------------------- rq/worker.py | 8 ++-- tests/test_job.py | 30 +++++++++----- tests/test_queue.py | 79 +++++++++++++++++++++-------------- tests/test_worker.py | 11 ++--- 6 files changed, 144 insertions(+), 112 deletions(-) diff --git a/rq/job.py b/rq/job.py index a17e490..c68af23 100644 --- a/rq/job.py +++ b/rq/job.py @@ -5,6 +5,21 @@ from .proxy import conn from .exceptions import UnpickleError, NoSuchJobError +def unpickle(pickled_string): + """Unpickles a string, but raises a unified UnpickleError in case anything + fails. + + This is a helper method to not have to deal with the fact that `loads()` + potentially raises many types of exceptions (e.g. AttributeError, + IndexError, TypeError, KeyError, etc.) + """ + try: + obj = loads(pickled_string) + except StandardError: + raise UnpickleError('Could not unpickle.', pickled_string) + return obj + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ @@ -105,7 +120,7 @@ class Job(object): if pickled_data is None: raise NoSuchJobError('No such job: %s' % (key,)) - self.func, self.args, self.kwargs = loads(pickled_data) + self.func, self.args, self.kwargs = unpickle(pickled_data) self.created_at = times.to_universal(conn.hget(key, 'created_at')) @@ -155,15 +170,3 @@ class Job(object): """ return dumps(self) - @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - - raise UnpickleError('Could not unpickle Job.', pickle_data) - - diff --git a/rq/queue.py b/rq/queue.py index 43d0db0..a938523 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,8 +1,8 @@ -from datetime import datetime +import times from functools import total_ordering from .proxy import conn from .job import Job -from .exceptions import UnpickleError +from .exceptions import NoSuchJobError, UnpickleError @total_ordering @@ -58,21 +58,9 @@ class Queue(object): return conn.llen(self.key) - def _create_job(self, f, *args, **kwargs): - """Creates a Job instance for the given function call and attaches queue - meta data to it. - """ - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - - job = Job.for_call(f, *args, **kwargs) - job.origin = self.name - return job - - def enqueue_job(self, job): - """Enqueues a pickled_job on the corresponding Redis queue.""" - job.save() - conn.rpush(self.key, job.id) + def push_job_id(self, job_id): + """Pushes a job ID on the corresponding Redis queue.""" + conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -80,25 +68,55 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. """ - job = self._create_job(f, *args, **kwargs) - job.enqueued_at = datetime.utcnow() - self.enqueue_job(job) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + + job = Job.for_call(f, *args, **kwargs) + job.origin = self.name + job.enqueued_at = times.now() + job.save() + self.push_job_id(job.id) return Job(job.id) def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" raise NotImplementedError('Implement this') + def pop_job_id(self): + """Pops a given job ID from this Redis queue.""" + return conn.lpop(self.key) + + @classmethod + def lpop(cls, queue_keys, blocking): + """Helper method. Intermediate method to abstract away from some Redis + API details, where LPOP accepts only a single key, whereas BLPOP accepts + multiple. So if we want the non-blocking LPOP, we need to iterate over + all queues, do individual LPOPs, and return the result. + + Until Redis receives a specific method for this, we'll have to wrap it + this way. + """ + if blocking: + queue_key, job_id = conn.blpop(queue_keys) + else: + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return queue_key, blob + return None + def dequeue(self): - """Dequeues the function call at the front of this Queue. + """Dequeues the front-most job from this queue. Returns a Job instance, which can be executed or inspected. """ - blob = conn.lpop(self.key) - if blob is None: + job_id = self.pop_job_id() + if job_id is None: return None try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError as e: + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting @@ -107,20 +125,6 @@ class Queue(object): job.origin = self return job - @classmethod - def _lpop_any(cls, queue_keys): - """Helper method. You should not call this directly. - - Redis' BLPOP command takes multiple queue arguments, but LPOP can only - take a single queue. Therefore, we need to loop over all queues - manually, in order, and return None if no more work is available. - """ - for queue_key in queue_keys: - blob = conn.lpop(queue_key) - if blob is not None: - return (queue_key, blob) - return None - @classmethod def dequeue_any(cls, queues, blocking): """Class method returning the Job instance at the front of the given set @@ -130,18 +134,17 @@ class Queue(object): either blocks execution of this function until new messages arrive on any of the queues, or returns None. """ - queue_keys = map(lambda q: q.key, queues) - if blocking: - queue_key, blob = conn.blpop(queue_keys) - else: - redis_result = cls._lpop_any(queue_keys) - if redis_result is None: - return None - queue_key, blob = redis_result - + queue_keys = [q.key for q in queues] + result = cls.lpop(queue_keys, blocking) + if result is None: + return None + queue_key, job_id = result queue = Queue.from_queue_key(queue_key) try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError: + # Silently pass on jobs that don't exist (anymore) + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting diff --git a/rq/worker.py b/rq/worker.py index b2e6484..7c6911c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,9 +1,9 @@ import sys import os import errno -import datetime import random import time +import times import procname import socket import signal @@ -233,7 +233,7 @@ class Worker(object): Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the - queues, unless `burst` is True. + queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ @@ -249,7 +249,7 @@ class Worker(object): break self.state = 'idle' qnames = self.queue_names() - self.procline('Listening on %s' % (','.join(qnames))) + self.procline('Listening on %s' % ','.join(qnames)) self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst try: @@ -321,7 +321,7 @@ class Worker(object): fq = self.failure_queue self.log.warning('Moving job to %s queue.' % (fq.name,)) - job.ended_at = datetime.datetime.utcnow() + job.ended_at = times.now() job.exc_info = e fq._push(job.pickle()) else: diff --git a/tests/test_job.py b/tests/test_job.py index 45bdf97..112a5f2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,6 +1,6 @@ from datetime import datetime from tests import RQTestCase -from pickle import dumps, loads +from pickle import loads from rq.job import Job from rq.exceptions import NoSuchJobError, UnpickleError @@ -95,16 +95,26 @@ class TestJob(RQTestCase): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') - def test_unpickle_errors(self): - """Handling of unpickl'ing errors.""" - with self.assertRaises(UnpickleError): - Job.unpickle('this is no pickle data') + def test_dequeue_unreadable_data(self): + """Dequeue fails on unreadable data.""" + # Set up + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Just replace the data hkey with some random noise + self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): - Job.unpickle(13) + job.refresh() - pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) - corrupt_data = pickle_data.replace('arbitrary', 'b0rken') - with self.assertRaises(UnpickleError): - Job.unpickle(corrupt_data) + # Set up (part B) + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Now slightly modify the job to make it unpickl'able (this is + # equivalent to a worker not having the most up-to-date source code and + # unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('arbitrary_function', 'broken') + self.testconn.hset(job.key, 'data', unimportable_data) + with self.assertRaises(UnpickleError): + job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index 80e1acc..a1866dc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -39,7 +39,7 @@ class TestQueue(RQTestCase): def test_enqueue(self): - """Enqueueing writes job IDs to queues.""" + """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -53,18 +53,51 @@ class TestQueue(RQTestCase): self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + def test_pop_job_id(self): + """Popping job IDs from queues.""" + # Set up + q = Queue() + uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da' + q.push_job_id(uuid) + + # Pop it off the queue... + self.assertEquals(q.count, 1) + self.assertEquals(q.pop_job_id(), uuid) + + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue(self): - """Fetching work from specific queue.""" - q = Queue('foo') - q.enqueue(testjob, 'Rick', foo='bar') + """Dequeueing jobs from queues.""" + # Set up + q = Queue() + result = q.enqueue(testjob, 'Rick', foo='bar') - # Pull it off the queue (normally, a worker would do this) + # Dequeue a job (not a job ID) off the queue + self.assertEquals(q.count, 1) job = q.dequeue() + self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) self.assertEquals(job.origin, q) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue_ignores_nonexisting_jobs(self): + """Dequeuing silently ignores non-existing jobs.""" + + q = Queue() + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) + + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(q.dequeue(), None) + self.assertEquals(q.count, 0) + def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -91,33 +124,15 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, barq) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') - def test_dequeue_unpicklable_data(self): - """Error handling of invalid pickle data.""" - - # Push non-pickle data on the queue - q = Queue('foo') - blob = 'this is nothing like pickled data' - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push value pickle data, but not representing a job tuple - q = Queue('foo') - blob = dumps('this is pickled, but not a job tuple') - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push slightly incorrect pickled data onto the queue (simulate - # a function that can't be imported from the worker) - q = Queue('foo') + def test_dequeue_any_ignores_nonexisting_jobs(self): + """Dequeuing (from any queue) silently ignores non-existing jobs.""" - job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) - blob = job_tuple.replace('testjob', 'fooobar') - self.testconn.rpush(q._key, blob) + q = Queue('low') + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when dequeue()'ing + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None) + self.assertEquals(q.count, 0) diff --git a/tests/test_worker.py b/tests/test_worker.py index 06242f6..b2ddacb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker processes unreadable job.""" + """Worker ignores unreadable job.""" q = Queue() failure_q = Queue('failure') @@ -32,13 +32,14 @@ class TestWorker(RQTestCase): # What we're simulating here is a call to a function that is not # importable from the worker process. job = Job.for_call(failing_job, 3) - pickled_job = job.pickle() - invalid_data = pickled_job.replace( - 'failing_job', 'nonexisting_job') + job.save() + data = self.testconn.hget(job.key, 'data') + invalid_data = data.replace('failing_job', 'nonexisting_job') + self.testconn.hset(job.key, 'data', invalid_data) # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q._push(invalid_data) + q.push_job_id(invalid_data) self.assertEquals(q.count, 1) From f516f8df2e3342784ea8820dfc8440ce0b6fa96f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 15:18:24 +0100 Subject: [PATCH 05/13] CHECKPOINT: Handle failing and unreadable jobs. Failing (or unreadable) jobs are correctly put on the failure queue by the worker now. --- rq/queue.py | 10 +++++++--- rq/worker.py | 11 ++++++++--- tests/test_queue.py | 11 ++++++----- tests/test_worker.py | 9 ++++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a938523..734d4f5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -116,7 +116,9 @@ class Queue(object): try: job = Job.fetch(job_id) except NoSuchJobError as e: - return None + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking itself recursively + return self.dequeue() except UnpickleError as e: # Attach queue information on the exception for improved error # reporting @@ -143,11 +145,13 @@ class Queue(object): try: job = Job.fetch(job_id) except NoSuchJobError: - # Silently pass on jobs that don't exist (anymore) - return None + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking the same function recursively + return cls.dequeue_any(queues, blocking) except UnpickleError as e: # Attach queue information on the exception for improved error # reporting + e.job_id = job_id e.queue = queue raise e job.origin = queue diff --git a/rq/worker.py b/rq/worker.py index 7c6911c..bf3671f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -259,9 +259,7 @@ class Worker(object): self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') - - fq = self.failure_queue - fq._push(e.raw_data) + self.failure_queue.push_job_id(e.job_id) continue if job is None: @@ -286,6 +284,13 @@ class Worker(object): self.perform_job(job) except Exception as e: self.log.exception(e) + + # Store the exception information... + job.exc_info = e + job.save() + + # ...and put the job on the failure queue + self.failure_queue.push_job_id(job.id) sys.exit(1) sys.exit(0) else: diff --git a/tests/test_queue.py b/tests/test_queue.py index a1866dc..0fb0b6e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,8 +1,6 @@ from tests import RQTestCase from tests import testjob -from pickle import dumps from rq import Queue -from rq.exceptions import UnpickleError class TestQueue(RQTestCase): @@ -67,7 +65,6 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) - def test_dequeue(self): """Dequeueing jobs from queues.""" # Set up @@ -92,10 +89,14 @@ class TestQueue(RQTestCase): q = Queue() uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' q.push_job_id(uuid) + q.push_job_id(uuid) + result = q.enqueue(testjob, 'Nick', foo='bar') + q.push_job_id(uuid) # Dequeue simply ignores the missing job and returns None - self.assertEquals(q.count, 1) - self.assertEquals(q.dequeue(), None) + self.assertEquals(q.count, 4) + self.assertEquals(q.dequeue().id, result.id) + self.assertIsNone(q.dequeue()) self.assertEquals(q.count, 0) def test_dequeue_any(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index b2ddacb..2be9fe1 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker ignores unreadable job.""" + """Unreadable jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') @@ -39,7 +39,7 @@ class TestWorker(RQTestCase): # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q.push_job_id(invalid_data) + q.push_job_id(job.id) self.assertEquals(q.count, 1) @@ -51,7 +51,7 @@ class TestWorker(RQTestCase): self.assertEquals(failure_q.count, 1) def test_work_fails(self): - """Worker processes failing job.""" + """Failing jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') @@ -59,12 +59,11 @@ class TestWorker(RQTestCase): self.assertEquals(q.count, 0) q.enqueue(failing_job) - self.assertEquals(q.count, 1) + w = Worker([q]) w.work(burst=True) # should silently pass self.assertEquals(q.count, 0) - self.assertEquals(failure_q.count, 1) From 370399f8f7331b32961b948fe5846303e6fa63fa Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 17:55:38 +0100 Subject: [PATCH 06/13] CHECKPOINT: dequeue_any now returns the queue that was popped from. --- rq/job.py | 3 ++- rq/queue.py | 24 +++++++++++++++++------- rq/worker.py | 18 +++++++++++------- tests/test_queue.py | 15 +++++++++------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/rq/job.py b/rq/job.py index c68af23..0ceccd2 100644 --- a/rq/job.py +++ b/rq/job.py @@ -120,8 +120,8 @@ class Job(object): if pickled_data is None: raise NoSuchJobError('No such job: %s' % (key,)) + self.origin = conn.hget(key, 'origin') self.func, self.args, self.kwargs = unpickle(pickled_data) - self.created_at = times.to_universal(conn.hget(key, 'created_at')) def save(self): @@ -130,6 +130,7 @@ class Job(object): key = self.key conn.hset(key, 'data', pickled_data) + conn.hset(key, 'origin', self.origin) conn.hset(key, 'created_at', times.format(self.created_at, 'UTC')) diff --git a/rq/queue.py b/rq/queue.py index 734d4f5..80138a2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -5,6 +5,10 @@ from .job import Job from .exceptions import NoSuchJobError, UnpickleError +def compact(lst): + return [item for item in lst if item is not None] + + @total_ordering class Queue(object): redis_queue_namespace_prefix = 'rq:queue:' @@ -43,14 +47,21 @@ class Queue(object): return self.count == 0 @property - def messages(self): - """Returns a list of all messages (pickled job data) in the queue.""" + def job_ids(self): + """Returns a list of all job IDS in the queue.""" return conn.lrange(self.key, 0, -1) @property def jobs(self): - """Returns a list of all jobs in the queue.""" - return map(Job.unpickle, self.messages) + """Returns a list of all (valid) jobs in the queue.""" + def safe_fetch(job_id): + try: + job = Job.fetch(job_id) + except UnpickleError: + return None + return job + + return compact([safe_fetch(job_id) for job_id in self.job_ids]) @property def count(self): @@ -98,6 +109,7 @@ class Queue(object): """ if blocking: queue_key, job_id = conn.blpop(queue_keys) + return queue_key, job_id else: for queue_key in queue_keys: blob = conn.lpop(queue_key) @@ -124,7 +136,6 @@ class Queue(object): # reporting e.queue = self raise e - job.origin = self return job @classmethod @@ -154,8 +165,7 @@ class Queue(object): e.job_id = job_id e.queue = queue raise e - job.origin = queue - return job + return job, queue # Total ordering defition (the rest of the required Python methods are diff --git a/rq/worker.py b/rq/worker.py index bf3671f..9268205 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -15,8 +15,11 @@ except ImportError: from logging import Logger from .queue import Queue from .proxy import conn +from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +green = make_colorizer('darkgreen') + def iterable(x): return hasattr(x, '__iter__') @@ -239,7 +242,7 @@ class Worker(object): """ self._install_signal_handlers() - did_work = False + did_perform_work = False self.register_birth() self.state = 'starting' try: @@ -253,7 +256,11 @@ class Worker(object): self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst try: - job = Queue.dequeue_any(self.queues, wait_for_job) + result = Queue.dequeue_any(self.queues, wait_for_job) + if result is None: + break + job, queue = result + self.log.info('%s: %s' % (green(queue.name), job)) except UnpickleError as e: self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.debug('Data follows:') @@ -262,17 +269,14 @@ class Worker(object): self.failure_queue.push_job_id(e.job_id) continue - if job is None: - break self.state = 'busy' - self.fork_and_perform_job(job) - did_work = True + did_perform_work = True finally: if not self.is_horse: self.register_death() - return did_work + return did_perform_work def fork_and_perform_job(self, job): child_pid = os.fork() diff --git a/tests/test_queue.py b/tests/test_queue.py index 0fb0b6e..0cfe613 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -76,7 +76,7 @@ class TestQueue(RQTestCase): job = q.dequeue() self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, q) + self.assertEquals(job.origin, q.name) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') @@ -108,21 +108,24 @@ class TestQueue(RQTestCase): # Enqueue a single item barq.enqueue(testjob) - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) self.assertEquals(job.func, testjob) + self.assertEquals(queue, barq) # Enqueue items on both queues barq.enqueue(testjob, 'for Bar') fooq.enqueue(testjob, 'for Foo') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, fooq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, fooq) + self.assertEquals(job.origin, fooq.name) self.assertEquals(job.args[0], 'for Foo', 'Foo should be dequeued first.') - job = Queue.dequeue_any([fooq, barq], False) + job, queue = Queue.dequeue_any([fooq, barq], False) + self.assertEquals(queue, barq) self.assertEquals(job.func, testjob) - self.assertEquals(job.origin, barq) + self.assertEquals(job.origin, barq.name) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') def test_dequeue_any_ignores_nonexisting_jobs(self): From bffe6cbbdeb4f0b2b73e1b426adf5e072659de18 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 17:15:16 +0100 Subject: [PATCH 07/13] Encapsulate internal function call representation. This means it's not allowed anymore to directly set func, args, and kwargs. Instead, use the for_call() constructor. --- rq/job.py | 50 +++++++++++++++++++++++++++++++++----------------- rq/queue.py | 2 +- rq/worker.py | 2 +- 3 files changed, 35 insertions(+), 19 deletions(-) diff --git a/rq/job.py b/rq/job.py index 0ceccd2..afbc3a8 100644 --- a/rq/job.py +++ b/rq/job.py @@ -31,11 +31,24 @@ class Job(object): keyword arguments. """ job = Job() - job.func = func - job.args = args - job.kwargs = kwargs + job._func = func + job._args = args + job._kwargs = kwargs + job.description = job.get_call_string() return job + @property + def func(self): + return self._func + + @property + def args(self): + return self._args + + @property + def kwargs(self): + return self._kwargs + @classmethod def fetch(cls, id): """Fetches a persisted job from its corresponding Redis key and @@ -47,12 +60,14 @@ class Job(object): def __init__(self, id=None): self._id = id - self.func = None - self.args = None - self.kwargs = None - self.origin = None self.created_at = times.now() + self._func = None + self._args = None + self._kwargs = None + self.description = None + self.origin = None self.enqueued_at = None + self.ended_at = None self.result = None self.exc_info = None @@ -121,13 +136,12 @@ class Job(object): raise NoSuchJobError('No such job: %s' % (key,)) self.origin = conn.hget(key, 'origin') - self.func, self.args, self.kwargs = unpickle(pickled_data) + self._func, self._args, self._kwargs = unpickle(data) self.created_at = times.to_universal(conn.hget(key, 'created_at')) + def save(self): """Persists the current job instance to its corresponding Redis key.""" - pickled_data = dumps(self.job_tuple) - key = self.key conn.hset(key, 'data', pickled_data) conn.hset(key, 'origin', self.origin) @@ -142,18 +156,20 @@ class Job(object): # Representation - @property - def call_string(self): + def get_call_string(self): """Returns a string representation of the call, formatted as a regular Python function invocation statement. """ - arg_list = map(repr, self.args) - arg_list += map(lambda tup: '%s=%r' % (tup[0], tup[1]), - self.kwargs.items()) - return '%s(%s)' % (self.func.__name__, ', '.join(arg_list)) + if self.func is None: + return None + + arg_list = [repr(arg) for arg in self.args] + arg_list += ['%s=%r' % (k, v) for k, v in self.kwargs.items()] + args = ', '.join(arg_list) + return '%s(%s)' % (self.func.__name__, args) def __str__(self): - return '' % (self.id, self.call_string) + return '' % (self.id, self.description) # Job equality diff --git a/rq/queue.py b/rq/queue.py index 80138a2..a1fdc57 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,7 +74,7 @@ class Queue(object): conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): - """Enqueues a function call for delayed execution. + """Creates a job to represent the delayed function call and enqueues it. Expects the function to call, along with the arguments and keyword arguments. diff --git a/rq/worker.py b/rq/worker.py index 9268205..cc4b282 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -319,7 +319,7 @@ class Worker(object): job.func.__name__, job.origin.name, time.time())) msg = 'Got job %s from %s' % ( - job.call_string, + job.description, job.origin.name) self.log.info(msg) try: From 7c903e45ef0af28c8f2eeafe163fb1226e6071cc Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 17:17:38 +0100 Subject: [PATCH 08/13] Simplify the persistence of jobs. Fixes #23 and #24. --- rq/job.py | 46 +++++++++++++++++++++++++----- tests/helpers.py | 6 ++++ tests/test_job.py | 68 ++++++++++++++++++++++++++++++++------------ tests/test_worker.py | 1 + 4 files changed, 96 insertions(+), 25 deletions(-) create mode 100644 tests/helpers.py diff --git a/rq/job.py b/rq/job.py index afbc3a8..9563e1c 100644 --- a/rq/job.py +++ b/rq/job.py @@ -131,21 +131,53 @@ class Job(object): Will raise a NoSuchJobError if no corresponding Redis key exists. """ key = self.key - pickled_data = conn.hget(key, 'data') - if pickled_data is None: + properties = ['data', 'created_at', 'origin', 'description', + 'enqueued_at', 'ended_at', 'result', 'exc_info'] + data, created_at, origin, description, \ + enqueued_at, ended_at, result, \ + exc_info = conn.hmget(key, properties) + if data is None: raise NoSuchJobError('No such job: %s' % (key,)) - self.origin = conn.hget(key, 'origin') + def to_date(date_str): + if date_str is None: + return None + else: + return times.to_universal(date_str) + self._func, self._args, self._kwargs = unpickle(data) - self.created_at = times.to_universal(conn.hget(key, 'created_at')) + self.created_at = to_date(created_at) + self.origin = origin + self.description = description + self.enqueued_at = to_date(enqueued_at) + self.ended_at = to_date(ended_at) + self.result = result + self.exc_info = exc_info def save(self): """Persists the current job instance to its corresponding Redis key.""" key = self.key - conn.hset(key, 'data', pickled_data) - conn.hset(key, 'origin', self.origin) - conn.hset(key, 'created_at', times.format(self.created_at, 'UTC')) + + obj = {} + obj['created_at'] = times.format(self.created_at, 'UTC') + + if self.func is not None: + obj['data'] = dumps(self.job_tuple) + if self.origin is not None: + obj['origin'] = self.origin + if self.description is not None: + obj['description'] = self.description + if self.enqueued_at is not None: + obj['enqueued_at'] = times.format(self.enqueued_at, 'UTC') + if self.ended_at is not None: + obj['ended_at'] = times.format(self.ended_at, 'UTC') + if self.result is not None: + obj['result'] = self.result + if self.exc_info is not None: + obj['exc_info'] = self.exc_info + + conn.hmset(key, obj) # Job execution diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..32d47ed --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,6 @@ +import times + + +def strip_milliseconds(date): + return times.to_universal(times.format(date, 'UTC')) + diff --git a/tests/test_job.py b/tests/test_job.py index 112a5f2..c62c1d9 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,5 +1,7 @@ +import times from datetime import datetime from tests import RQTestCase +from tests.helpers import strip_milliseconds from pickle import loads from rq.job import Job from rq.exceptions import NoSuchJobError, UnpickleError @@ -14,28 +16,28 @@ class TestJob(RQTestCase): """Creation of new empty jobs.""" job = Job() - # Jobs have a random UUID + # Jobs have a random UUID and a creation date self.assertIsNotNone(job.id) - - # Jobs have no data yet... - self.assertEquals(job.func, None) - self.assertEquals(job.args, None) - self.assertEquals(job.kwargs, None) - self.assertEquals(job.origin, None) - self.assertEquals(job.enqueued_at, None) - self.assertEquals(job.result, None) - self.assertEquals(job.exc_info, None) - - # ...except for a created_at property self.assertIsNotNone(job.created_at) - def test_create_normal_job(self): + # ...and nothing else + self.assertIsNone(job.func, None) + self.assertIsNone(job.args, None) + self.assertIsNone(job.kwargs, None) + self.assertIsNone(job.origin, None) + self.assertIsNone(job.enqueued_at, None) + self.assertIsNone(job.ended_at, None) + self.assertIsNone(job.result, None) + self.assertIsNone(job.exc_info, None) + + def test_create_typical_job(self): """Creation of jobs for function calls.""" job = Job.for_call(arbitrary_function, 3, 4, z=2) # Jobs have a random UUID self.assertIsNotNone(job.id) self.assertIsNotNone(job.created_at) + self.assertIsNotNone(job.description) # Job data is set... self.assertEquals(job.func, arbitrary_function) @@ -76,8 +78,39 @@ class TestJob(RQTestCase): self.assertEquals(job.created_at, datetime(2012, 2, 7, 22, 13, 24)) - def test_persistence_of_jobs(self): - """Storing and fetching of jobs.""" + def test_persistence_of_empty_jobs(self): + """Storing empty jobs.""" + job = Job() + job.save() + + expected_date = strip_milliseconds(job.created_at) + stored_date = self.testconn.hget(job.key, 'created_at') + self.assertEquals( + times.to_universal(stored_date), + expected_date) + + # ... and no other keys are stored + self.assertItemsEqual( + self.testconn.hkeys(job.key), + ['created_at']) + + def test_persistence_of_typical_jobs(self): + """Storing typical jobs.""" + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + + expected_date = strip_milliseconds(job.created_at) + stored_date = self.testconn.hget(job.key, 'created_at') + self.assertEquals( + times.to_universal(stored_date), + expected_date) + + # ... and no other keys are stored + self.assertItemsEqual( + self.testconn.hkeys(job.key), + ['created_at', 'data', 'description']) + + def test_store_then_fetch(self): job = Job.for_call(arbitrary_function, 3, 4, z=2) job.save() @@ -94,9 +127,8 @@ class TestJob(RQTestCase): with self.assertRaises(NoSuchJobError): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') - - def test_dequeue_unreadable_data(self): - """Dequeue fails on unreadable data.""" + def test_fetching_unreadable_data(self): + """Fetching fails on unreadable data.""" # Set up job = Job.for_call(arbitrary_function, 3, 4, z=2) job.save() diff --git a/tests/test_worker.py b/tests/test_worker.py index 2be9fe1..5065f83 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,5 +1,6 @@ from tests import RQTestCase from tests import testjob, failing_job +from tests.helpers import strip_milliseconds from rq import Queue, Worker from rq.job import Job From e05acfedce0143a48e898fcbab01e3d5029d9472 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 17:19:30 +0100 Subject: [PATCH 09/13] Fix putting jobs on the failure queue when they fail. --- rq/queue.py | 6 +++++- rq/worker.py | 49 +++++++++++++++++++++----------------------- tests/test_queue.py | 17 +++++++++++++++ tests/test_worker.py | 16 ++++++++++++++- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a1fdc57..efccff8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -83,11 +83,15 @@ class Queue(object): raise ValueError('Functions from the __main__ module cannot be processed by workers.') job = Job.for_call(f, *args, **kwargs) + return self.enqueue_job(job) + + def enqueue_job(self, job): + """Enqueues a job for delayed execution.""" job.origin = self.name job.enqueued_at = times.now() job.save() self.push_job_id(job.id) - return Job(job.id) + return job def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" diff --git a/rq/worker.py b/rq/worker.py index cc4b282..f52c659 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -7,6 +7,7 @@ import times import procname import socket import signal +import traceback from pickle import dumps try: from logbook import Logger @@ -284,19 +285,9 @@ class Worker(object): self._is_horse = True random.seed() self.log = Logger('horse') - try: - self.perform_job(job) - except Exception as e: - self.log.exception(e) - - # Store the exception information... - job.exc_info = e - job.save() - - # ...and put the job on the failure queue - self.failure_queue.push_job_id(job.id) - sys.exit(1) - sys.exit(0) + + success = self.perform_job(job) + sys.exit(int(not success)) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -317,29 +308,35 @@ class Worker(object): def perform_job(self, job): self.procline('Processing %s from %s since %s' % ( job.func.__name__, - job.origin.name, time.time())) + job.origin, time.time())) msg = 'Got job %s from %s' % ( job.description, - job.origin.name) + job.origin) self.log.info(msg) try: rv = job.perform() except Exception as e: - rv = e - self.log.exception(e) - fq = self.failure_queue + self.log.exception(e) self.log.warning('Moving job to %s queue.' % (fq.name,)) + + # Store the exception information... job.ended_at = times.now() - job.exc_info = e - fq._push(job.pickle()) + job.exc_info = traceback.format_exc() + + # ------ REFACTOR THIS ------------------------- + job.save() + # ...and put the job on the failure queue + fq.push_job_id(job.id) + # ------ UNTIL HERE ---------------------------- + # (should be as easy as fq.enqueue(job) or so) + + return False else: - if rv is not None: - self.log.info('Job result = %s' % (rv,)) - else: - self.log.info('Job ended normally without result') + self.log.info('Job OK, result = %s' % (rv,)) if rv is not None: p = conn.pipeline() - p.set(job.rv_key, dumps(rv)) - p.expire(job.rv_key, self.rv_ttl) + p.set(job.result, dumps(rv)) + p.expire(job.result, self.rv_ttl) p.execute() + return True diff --git a/tests/test_queue.py b/tests/test_queue.py index 0cfe613..d1fa44e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,6 +1,7 @@ from tests import RQTestCase from tests import testjob from rq import Queue +from rq.job import Job class TestQueue(RQTestCase): @@ -50,6 +51,22 @@ class TestQueue(RQTestCase): self.assertEquals(self.testconn.llen(q_key), 1) self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + def test_enqueue_sets_metadata(self): + """Enqueueing job onto queues modifies meta data.""" + q = Queue() + job = Job.for_call(testjob, 'Nick', foo='bar') + + # Preconditions + self.assertIsNone(job.origin) + self.assertIsNone(job.enqueued_at) + + # Action + q.enqueue_job(job) + + # Postconditions + self.assertEquals(job.origin, q.name) + self.assertIsNotNone(job.enqueued_at) + def test_pop_job_id(self): """Popping job IDs from queues.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 5065f83..8a0f2cf 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -56,15 +56,29 @@ class TestWorker(RQTestCase): q = Queue() failure_q = Queue('failure') + # Preconditions self.assertEquals(failure_q.count, 0) self.assertEquals(q.count, 0) - q.enqueue(failing_job) + # Action + job = q.enqueue(failing_job) self.assertEquals(q.count, 1) + # keep for later + enqueued_at_date = strip_milliseconds(job.enqueued_at) + w = Worker([q]) w.work(burst=True) # should silently pass + + # Postconditions self.assertEquals(q.count, 0) self.assertEquals(failure_q.count, 1) + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.origin, q.name) + + # should be the original enqueued_at date, not the date of enqueueing to the failure queue + self.assertEquals(job.enqueued_at, enqueued_at_date) + self.assertIsNotNone(job.exc_info) # should contain exc_info From 791f8169f57eada94bf27f02751f59bad99ad64c Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 17:52:05 +0100 Subject: [PATCH 10/13] Remove dead code. --- rq/job.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/rq/job.py b/rq/job.py index 9563e1c..386e594 100644 --- a/rq/job.py +++ b/rq/job.py @@ -154,7 +154,6 @@ class Job(object): self.result = result self.exc_info = exc_info - def save(self): """Persists the current job instance to its corresponding Redis key.""" key = self.key @@ -211,11 +210,3 @@ class Job(object): def __hash__(self): return hash(self.id) - - # TODO: TO REFACTOR / REMOVE - def pickle(self): - """Returns the pickle'd string represenation of a Job. Suitable for - writing to Redis. - """ - return dumps(self) - From 63ef198fd6fedb8d19168f8f2f0aa686e7196112 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 18:32:59 +0100 Subject: [PATCH 11/13] Improve work generator. --- bin/rqgenload | 9 +++++---- rq/dummy.py | 6 ++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/bin/rqgenload b/bin/rqgenload index 674889c..7eb6f49 100755 --- a/bin/rqgenload +++ b/bin/rqgenload @@ -18,10 +18,7 @@ def main(): use_redis() - #funcs = filter(lambda s: not s.startswith('_'), dir(rq.dummy)) - #print(funcs) - - queues = ('default', 'high', 'normal', 'low') + queues = ('default', 'high', 'low') sample_calls = [ (dummy.do_nothing, [], {}), @@ -30,6 +27,10 @@ def main(): (dummy.do_nothing, [], {}), (dummy.do_nothing, [], {}), (dummy.sleep, [1], {}), + (dummy.fib, [8], {}), # normal result + (dummy.fib, [24], {}), # takes pretty long + (dummy.div_by_zero, [], {}), # 5 / 0 => div by zero exc + (dummy.fib, [30], {}), # takes long, then crashes ] for i in range(opts.count): diff --git a/rq/dummy.py b/rq/dummy.py index f4b8dc2..b2ad055 100644 --- a/rq/dummy.py +++ b/rq/dummy.py @@ -21,6 +21,12 @@ def endless_loop(): def div_by_zero(): 1/0 +def fib(n): + if n <= 1: + return 1 + else: + return fib(n-2) + fib(n-1) + def yield_stuff(): yield 7 yield 'foo' From 90a458ca8e33459365db808abc4d1e7da44e7162 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 18:33:11 +0100 Subject: [PATCH 12/13] Add more colorful terminal output. For better visual parsability. --- rq/worker.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index f52c659..a6bbd51 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -20,6 +20,8 @@ from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError green = make_colorizer('darkgreen') +yellow = make_colorizer('darkyellow') +blue = make_colorizer('darkblue') def iterable(x): return hasattr(x, '__iter__') @@ -254,14 +256,13 @@ class Worker(object): self.state = 'idle' qnames = self.queue_names() self.procline('Listening on %s' % ','.join(qnames)) - self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) + self.log.info('') + self.log.info('*** Listening on %s...' % (green(', '.join(qnames)))) wait_for_job = not burst try: result = Queue.dequeue_any(self.queues, wait_for_job) if result is None: break - job, queue = result - self.log.info('%s: %s' % (green(queue.name), job)) except UnpickleError as e: self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.debug('Data follows:') @@ -270,6 +271,9 @@ class Worker(object): self.failure_queue.push_job_id(e.job_id) continue + job, queue = result + self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) + self.state = 'busy' self.fork_and_perform_job(job) @@ -309,10 +313,6 @@ class Worker(object): self.procline('Processing %s from %s since %s' % ( job.func.__name__, job.origin, time.time())) - msg = 'Got job %s from %s' % ( - job.description, - job.origin) - self.log.info(msg) try: rv = job.perform() except Exception as e: @@ -333,7 +333,10 @@ class Worker(object): return False else: - self.log.info('Job OK, result = %s' % (rv,)) + if rv is None: + self.log.info('Job OK') + else: + self.log.info('Job OK, result = %s' % (yellow(rv),)) if rv is not None: p = conn.pipeline() p.set(job.result, dumps(rv)) From 39f106cdb305cf5dd5d9e634c7d56772934c06ad Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 13 Feb 2012 09:06:39 +0100 Subject: [PATCH 13/13] Have the test suite find an empty Redis database. Since the test suite `flushdb()`'s after running each test, we should make sure the database is empty before we even start running tests. This patch will make sure to never destroy any local production data inside the running Redis instance. This fixes #25. --- tests/__init__.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/__init__.py b/tests/__init__.py index b912bed..b833a91 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -14,6 +14,18 @@ def failing_job(x): return x / 0 +def find_empty_redis_database(): + """Tries to connect to a random Redis database (starting from 4), and + will use/connect it when no keys are in there. + """ + for dbnum in range(4, 17): + testconn = Redis(db=dbnum) + empty = len(testconn.keys('*')) == 0 + if empty: + return testconn + assert False, 'No empty Redis database found to run tests in.' + + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. @@ -27,7 +39,7 @@ class RQTestCase(unittest.TestCase): @classmethod def setUpClass(cls): # Set up connection to Redis - testconn = Redis() + testconn = find_empty_redis_database() conn.push(testconn) # Store the connection (for sanity checking)