From fdce187c27df4e6d7acb90583c17fc7d99152161 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 30 Jan 2012 20:55:52 +0100 Subject: [PATCH] Putting failed jobs on the failure queue. --- rq/job.py | 2 +- rq/worker.py | 16 ++++++++++++---- tests/test_worker.py | 17 +++++++++++++++++ 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/rq/job.py b/rq/job.py index 3e67a76..36831d5 100644 --- a/rq/job.py +++ b/rq/job.py @@ -14,7 +14,7 @@ class Job(object): unpickled_obj = loads(pickle_data) assert isinstance(unpickled_obj, Job) return unpickled_obj - except (AssertionError, AttributeError, IndexError, TypeError): + except (AssertionError, AttributeError, IndexError, TypeError, KeyError): raise UnpickleError('Could not unpickle Job.', pickle_data) def __init__(self, func, *args, **kwargs): diff --git a/rq/worker.py b/rq/worker.py index 49cdd97..b2e6484 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,6 +1,7 @@ import sys import os import errno +import datetime import random import time import procname @@ -81,6 +82,7 @@ class Worker(object): self._horse_pid = 0 self._stopped = False self.log = Logger('worker') + self.failure_queue = Queue('failure') def validate_queues(self): @@ -253,13 +255,13 @@ class Worker(object): try: job = Queue.dequeue_any(self.queues, wait_for_job) except UnpickleError as e: - self.log.warning('*** Ignoring unpickleable data on %s.', e.queue) + self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,)) self.log.debug('Data follows:') self.log.debug(e.raw_data) self.log.debug('End of unreadable data.') - q = Queue('failure') - q._push(e.raw_data) + fq = self.failure_queue + fq._push(e.raw_data) continue if job is None: @@ -313,9 +315,15 @@ class Worker(object): self.log.info(msg) try: rv = job.perform() - except Exception, e: + except Exception as e: rv = e self.log.exception(e) + + fq = self.failure_queue + self.log.warning('Moving job to %s queue.' % (fq.name,)) + job.ended_at = datetime.datetime.utcnow() + job.exc_info = e + fq._push(job.pickle()) else: if rv is not None: self.log.info('Job result = %s' % (rv,)) diff --git a/tests/test_worker.py b/tests/test_worker.py index 182c3ea..b4cd0aa 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -49,4 +49,21 @@ class TestWorker(RQTestCase): self.assertEquals(failure_q.count, 1) + def test_work_fails(self): + """Worker processes failing job.""" + q = Queue() + failure_q = Queue('failure') + + self.assertEquals(failure_q.count, 0) + self.assertEquals(q.count, 0) + + q.enqueue(failing_job) + + self.assertEquals(q.count, 1) + w = Worker([q]) + w.work(burst=True) # should silently pass + self.assertEquals(q.count, 0) + + self.assertEquals(failure_q.count, 1) +