diff --git a/rq/job.py b/rq/job.py index a17e490..c68af23 100644 --- a/rq/job.py +++ b/rq/job.py @@ -5,6 +5,21 @@ from .proxy import conn from .exceptions import UnpickleError, NoSuchJobError +def unpickle(pickled_string): + """Unpickles a string, but raises a unified UnpickleError in case anything + fails. + + This is a helper method to not have to deal with the fact that `loads()` + potentially raises many types of exceptions (e.g. AttributeError, + IndexError, TypeError, KeyError, etc.) + """ + try: + obj = loads(pickled_string) + except StandardError: + raise UnpickleError('Could not unpickle.', pickled_string) + return obj + + class Job(object): """A Job is just a convenient datastructure to pass around job (meta) data. """ @@ -105,7 +120,7 @@ class Job(object): if pickled_data is None: raise NoSuchJobError('No such job: %s' % (key,)) - self.func, self.args, self.kwargs = loads(pickled_data) + self.func, self.args, self.kwargs = unpickle(pickled_data) self.created_at = times.to_universal(conn.hget(key, 'created_at')) @@ -155,15 +170,3 @@ class Job(object): """ return dumps(self) - @classmethod - def unpickle(cls, pickle_data): - """Constructs a Job instance form the given pickle'd job tuple data.""" - try: - unpickled_obj = loads(pickle_data) - assert isinstance(unpickled_obj, Job) - return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError, KeyError): - - raise UnpickleError('Could not unpickle Job.', pickle_data) - - diff --git a/rq/queue.py b/rq/queue.py index 43d0db0..a938523 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,8 +1,8 @@ -from datetime import datetime +import times from functools import total_ordering from .proxy import conn from .job import Job -from .exceptions import UnpickleError +from .exceptions import NoSuchJobError, UnpickleError @total_ordering @@ -58,21 +58,9 @@ class Queue(object): return conn.llen(self.key) - def _create_job(self, f, *args, **kwargs): - """Creates a Job instance for the given function call and attaches queue - meta data to it. - """ - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - - job = Job.for_call(f, *args, **kwargs) - job.origin = self.name - return job - - def enqueue_job(self, job): - """Enqueues a pickled_job on the corresponding Redis queue.""" - job.save() - conn.rpush(self.key, job.id) + def push_job_id(self, job_id): + """Pushes a job ID on the corresponding Redis queue.""" + conn.rpush(self.key, job_id) def enqueue(self, f, *args, **kwargs): """Enqueues a function call for delayed execution. @@ -80,25 +68,55 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. """ - job = self._create_job(f, *args, **kwargs) - job.enqueued_at = datetime.utcnow() - self.enqueue_job(job) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + + job = Job.for_call(f, *args, **kwargs) + job.origin = self.name + job.enqueued_at = times.now() + job.save() + self.push_job_id(job.id) return Job(job.id) def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" raise NotImplementedError('Implement this') + def pop_job_id(self): + """Pops a given job ID from this Redis queue.""" + return conn.lpop(self.key) + + @classmethod + def lpop(cls, queue_keys, blocking): + """Helper method. Intermediate method to abstract away from some Redis + API details, where LPOP accepts only a single key, whereas BLPOP accepts + multiple. So if we want the non-blocking LPOP, we need to iterate over + all queues, do individual LPOPs, and return the result. + + Until Redis receives a specific method for this, we'll have to wrap it + this way. + """ + if blocking: + queue_key, job_id = conn.blpop(queue_keys) + else: + for queue_key in queue_keys: + blob = conn.lpop(queue_key) + if blob is not None: + return queue_key, blob + return None + def dequeue(self): - """Dequeues the function call at the front of this Queue. + """Dequeues the front-most job from this queue. Returns a Job instance, which can be executed or inspected. """ - blob = conn.lpop(self.key) - if blob is None: + job_id = self.pop_job_id() + if job_id is None: return None try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError as e: + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting @@ -107,20 +125,6 @@ class Queue(object): job.origin = self return job - @classmethod - def _lpop_any(cls, queue_keys): - """Helper method. You should not call this directly. - - Redis' BLPOP command takes multiple queue arguments, but LPOP can only - take a single queue. Therefore, we need to loop over all queues - manually, in order, and return None if no more work is available. - """ - for queue_key in queue_keys: - blob = conn.lpop(queue_key) - if blob is not None: - return (queue_key, blob) - return None - @classmethod def dequeue_any(cls, queues, blocking): """Class method returning the Job instance at the front of the given set @@ -130,18 +134,17 @@ class Queue(object): either blocks execution of this function until new messages arrive on any of the queues, or returns None. """ - queue_keys = map(lambda q: q.key, queues) - if blocking: - queue_key, blob = conn.blpop(queue_keys) - else: - redis_result = cls._lpop_any(queue_keys) - if redis_result is None: - return None - queue_key, blob = redis_result - + queue_keys = [q.key for q in queues] + result = cls.lpop(queue_keys, blocking) + if result is None: + return None + queue_key, job_id = result queue = Queue.from_queue_key(queue_key) try: - job = Job.unpickle(blob) + job = Job.fetch(job_id) + except NoSuchJobError: + # Silently pass on jobs that don't exist (anymore) + return None except UnpickleError as e: # Attach queue information on the exception for improved error # reporting diff --git a/rq/worker.py b/rq/worker.py index b2e6484..7c6911c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,9 +1,9 @@ import sys import os import errno -import datetime import random import time +import times import procname import socket import signal @@ -233,7 +233,7 @@ class Worker(object): Pops and performs all jobs on the current list of queues. When all queues are empty, block and wait for new jobs to arrive on any of the - queues, unless `burst` is True. + queues, unless `burst` mode is enabled. The return value indicates whether any jobs were processed. """ @@ -249,7 +249,7 @@ class Worker(object): break self.state = 'idle' qnames = self.queue_names() - self.procline('Listening on %s' % (','.join(qnames))) + self.procline('Listening on %s' % ','.join(qnames)) self.log.info('*** Listening for work on %s...' % (', '.join(qnames))) wait_for_job = not burst try: @@ -321,7 +321,7 @@ class Worker(object): fq = self.failure_queue self.log.warning('Moving job to %s queue.' % (fq.name,)) - job.ended_at = datetime.datetime.utcnow() + job.ended_at = times.now() job.exc_info = e fq._push(job.pickle()) else: diff --git a/tests/test_job.py b/tests/test_job.py index 45bdf97..112a5f2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,6 +1,6 @@ from datetime import datetime from tests import RQTestCase -from pickle import dumps, loads +from pickle import loads from rq.job import Job from rq.exceptions import NoSuchJobError, UnpickleError @@ -95,16 +95,26 @@ class TestJob(RQTestCase): Job.fetch('b4a44d44-da16-4620-90a6-798e8cd72ca0') - def test_unpickle_errors(self): - """Handling of unpickl'ing errors.""" - with self.assertRaises(UnpickleError): - Job.unpickle('this is no pickle data') + def test_dequeue_unreadable_data(self): + """Dequeue fails on unreadable data.""" + # Set up + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Just replace the data hkey with some random noise + self.testconn.hset(job.key, 'data', 'this is no pickle string') with self.assertRaises(UnpickleError): - Job.unpickle(13) + job.refresh() - pickle_data = dumps(Job.for_call(arbitrary_function, 2, 3)) - corrupt_data = pickle_data.replace('arbitrary', 'b0rken') - with self.assertRaises(UnpickleError): - Job.unpickle(corrupt_data) + # Set up (part B) + job = Job.for_call(arbitrary_function, 3, 4, z=2) + job.save() + # Now slightly modify the job to make it unpickl'able (this is + # equivalent to a worker not having the most up-to-date source code and + # unable to import the function) + data = self.testconn.hget(job.key, 'data') + unimportable_data = data.replace('arbitrary_function', 'broken') + self.testconn.hset(job.key, 'data', unimportable_data) + with self.assertRaises(UnpickleError): + job.refresh() diff --git a/tests/test_queue.py b/tests/test_queue.py index 80e1acc..a1866dc 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -39,7 +39,7 @@ class TestQueue(RQTestCase): def test_enqueue(self): - """Enqueueing writes job IDs to queues.""" + """Enqueueing job onto queues.""" q = Queue() self.assertEquals(q.is_empty(), True) @@ -53,18 +53,51 @@ class TestQueue(RQTestCase): self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + def test_pop_job_id(self): + """Popping job IDs from queues.""" + # Set up + q = Queue() + uuid = '112188ae-4e9d-4a5b-a5b3-f26f2cb054da' + q.push_job_id(uuid) + + # Pop it off the queue... + self.assertEquals(q.count, 1) + self.assertEquals(q.pop_job_id(), uuid) + + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue(self): - """Fetching work from specific queue.""" - q = Queue('foo') - q.enqueue(testjob, 'Rick', foo='bar') + """Dequeueing jobs from queues.""" + # Set up + q = Queue() + result = q.enqueue(testjob, 'Rick', foo='bar') - # Pull it off the queue (normally, a worker would do this) + # Dequeue a job (not a job ID) off the queue + self.assertEquals(q.count, 1) job = q.dequeue() + self.assertEquals(job.id, result.id) self.assertEquals(job.func, testjob) self.assertEquals(job.origin, q) self.assertEquals(job.args[0], 'Rick') self.assertEquals(job.kwargs['foo'], 'bar') + # ...and assert the queue count when down + self.assertEquals(q.count, 0) + + def test_dequeue_ignores_nonexisting_jobs(self): + """Dequeuing silently ignores non-existing jobs.""" + + q = Queue() + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) + + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(q.dequeue(), None) + self.assertEquals(q.count, 0) + def test_dequeue_any(self): """Fetching work from any given queue.""" fooq = Queue('foo') @@ -91,33 +124,15 @@ class TestQueue(RQTestCase): self.assertEquals(job.origin, barq) self.assertEquals(job.args[0], 'for Bar', 'Bar should be dequeued second.') - def test_dequeue_unpicklable_data(self): - """Error handling of invalid pickle data.""" - - # Push non-pickle data on the queue - q = Queue('foo') - blob = 'this is nothing like pickled data' - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push value pickle data, but not representing a job tuple - q = Queue('foo') - blob = dumps('this is pickled, but not a job tuple') - self.testconn.rpush(q._key, blob) - - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when perform()'ing - - # Push slightly incorrect pickled data onto the queue (simulate - # a function that can't be imported from the worker) - q = Queue('foo') + def test_dequeue_any_ignores_nonexisting_jobs(self): + """Dequeuing (from any queue) silently ignores non-existing jobs.""" - job_tuple = dumps((testjob, [], dict(name='Frank'), 'unused')) - blob = job_tuple.replace('testjob', 'fooobar') - self.testconn.rpush(q._key, blob) + q = Queue('low') + uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' + q.push_job_id(uuid) - with self.assertRaises(UnpickleError): - q.dequeue() # error occurs when dequeue()'ing + # Dequeue simply ignores the missing job and returns None + self.assertEquals(q.count, 1) + self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), None) + self.assertEquals(q.count, 0) diff --git a/tests/test_worker.py b/tests/test_worker.py index 06242f6..b2ddacb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker processes unreadable job.""" + """Worker ignores unreadable job.""" q = Queue() failure_q = Queue('failure') @@ -32,13 +32,14 @@ class TestWorker(RQTestCase): # What we're simulating here is a call to a function that is not # importable from the worker process. job = Job.for_call(failing_job, 3) - pickled_job = job.pickle() - invalid_data = pickled_job.replace( - 'failing_job', 'nonexisting_job') + job.save() + data = self.testconn.hget(job.key, 'data') + invalid_data = data.replace('failing_job', 'nonexisting_job') + self.testconn.hset(job.key, 'data', invalid_data) # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q._push(invalid_data) + q.push_job_id(invalid_data) self.assertEquals(q.count, 1)