From f516f8df2e3342784ea8820dfc8440ce0b6fa96f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 8 Feb 2012 15:18:24 +0100 Subject: [PATCH] CHECKPOINT: Handle failing and unreadable jobs. Failing (or unreadable) jobs are correctly put on the failure queue by the worker now. --- rq/queue.py | 10 +++++++--- rq/worker.py | 11 ++++++++--- tests/test_queue.py | 11 ++++++----- tests/test_worker.py | 9 ++++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a938523..734d4f5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -116,7 +116,9 @@ class Queue(object): try: job = Job.fetch(job_id) except NoSuchJobError as e: - return None + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking itself recursively + return self.dequeue() except UnpickleError as e: # Attach queue information on the exception for improved error # reporting @@ -143,11 +145,13 @@ class Queue(object): try: job = Job.fetch(job_id) except NoSuchJobError: - # Silently pass on jobs that don't exist (anymore) - return None + # Silently pass on jobs that don't exist (anymore), + # and continue by reinvoking the same function recursively + return cls.dequeue_any(queues, blocking) except UnpickleError as e: # Attach queue information on the exception for improved error # reporting + e.job_id = job_id e.queue = queue raise e job.origin = queue diff --git a/rq/worker.py b/rq/worker.py index 7c6911c..bf3671f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -259,9 +259,7 @@ class Worker(object): self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') - - fq = self.failure_queue - fq._push(e.raw_data) + self.failure_queue.push_job_id(e.job_id) continue if job is None: @@ -286,6 +284,13 @@ class Worker(object): self.perform_job(job) except Exception as e: self.log.exception(e) + + # Store the exception information... + job.exc_info = e + job.save() + + # ...and put the job on the failure queue + self.failure_queue.push_job_id(job.id) sys.exit(1) sys.exit(0) else: diff --git a/tests/test_queue.py b/tests/test_queue.py index a1866dc..0fb0b6e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,8 +1,6 @@ from tests import RQTestCase from tests import testjob -from pickle import dumps from rq import Queue -from rq.exceptions import UnpickleError class TestQueue(RQTestCase): @@ -67,7 +65,6 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) - def test_dequeue(self): """Dequeueing jobs from queues.""" # Set up @@ -92,10 +89,14 @@ class TestQueue(RQTestCase): q = Queue() uuid = '49f205ab-8ea3-47dd-a1b5-bfa186870fc8' q.push_job_id(uuid) + q.push_job_id(uuid) + result = q.enqueue(testjob, 'Nick', foo='bar') + q.push_job_id(uuid) # Dequeue simply ignores the missing job and returns None - self.assertEquals(q.count, 1) - self.assertEquals(q.dequeue(), None) + self.assertEquals(q.count, 4) + self.assertEquals(q.dequeue().id, result.id) + self.assertIsNone(q.dequeue()) self.assertEquals(q.count, 0) def test_dequeue_any(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index b2ddacb..2be9fe1 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -21,7 +21,7 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') def test_work_is_unreadable(self): - """Worker ignores unreadable job.""" + """Unreadable jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') @@ -39,7 +39,7 @@ class TestWorker(RQTestCase): # We use the low-level internal function to enqueue any data (bypassing # validity checks) - q.push_job_id(invalid_data) + q.push_job_id(job.id) self.assertEquals(q.count, 1) @@ -51,7 +51,7 @@ class TestWorker(RQTestCase): self.assertEquals(failure_q.count, 1) def test_work_fails(self): - """Worker processes failing job.""" + """Failing jobs are put on the failure queue.""" q = Queue() failure_q = Queue('failure') @@ -59,12 +59,11 @@ class TestWorker(RQTestCase): self.assertEquals(q.count, 0) q.enqueue(failing_job) - self.assertEquals(q.count, 1) + w = Worker([q]) w.work(burst=True) # should silently pass self.assertEquals(q.count, 0) - self.assertEquals(failure_q.count, 1)