From e05acfedce0143a48e898fcbab01e3d5029d9472 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 10 Feb 2012 17:19:30 +0100 Subject: [PATCH] Fix putting jobs on the failure queue when they fail. --- rq/queue.py | 6 +++++- rq/worker.py | 49 +++++++++++++++++++++----------------------- tests/test_queue.py | 17 +++++++++++++++ tests/test_worker.py | 16 ++++++++++++++- 4 files changed, 60 insertions(+), 28 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index a1fdc57..efccff8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -83,11 +83,15 @@ class Queue(object): raise ValueError('Functions from the __main__ module cannot be processed by workers.') job = Job.for_call(f, *args, **kwargs) + return self.enqueue_job(job) + + def enqueue_job(self, job): + """Enqueues a job for delayed execution.""" job.origin = self.name job.enqueued_at = times.now() job.save() self.push_job_id(job.id) - return Job(job.id) + return job def requeue(self, job): """Requeues an existing (typically a failed job) onto the queue.""" diff --git a/rq/worker.py b/rq/worker.py index cc4b282..f52c659 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -7,6 +7,7 @@ import times import procname import socket import signal +import traceback from pickle import dumps try: from logbook import Logger @@ -284,19 +285,9 @@ class Worker(object): self._is_horse = True random.seed() self.log = Logger('horse') - try: - self.perform_job(job) - except Exception as e: - self.log.exception(e) - - # Store the exception information... - job.exc_info = e - job.save() - - # ...and put the job on the failure queue - self.failure_queue.push_job_id(job.id) - sys.exit(1) - sys.exit(0) + + success = self.perform_job(job) + sys.exit(int(not success)) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -317,29 +308,35 @@ class Worker(object): def perform_job(self, job): self.procline('Processing %s from %s since %s' % ( job.func.__name__, - job.origin.name, time.time())) + job.origin, time.time())) msg = 'Got job %s from %s' % ( job.description, - job.origin.name) + job.origin) self.log.info(msg) try: rv = job.perform() except Exception as e: - rv = e - self.log.exception(e) - fq = self.failure_queue + self.log.exception(e) self.log.warning('Moving job to %s queue.' % (fq.name,)) + + # Store the exception information... job.ended_at = times.now() - job.exc_info = e - fq._push(job.pickle()) + job.exc_info = traceback.format_exc() + + # ------ REFACTOR THIS ------------------------- + job.save() + # ...and put the job on the failure queue + fq.push_job_id(job.id) + # ------ UNTIL HERE ---------------------------- + # (should be as easy as fq.enqueue(job) or so) + + return False else: - if rv is not None: - self.log.info('Job result = %s' % (rv,)) - else: - self.log.info('Job ended normally without result') + self.log.info('Job OK, result = %s' % (rv,)) if rv is not None: p = conn.pipeline() - p.set(job.rv_key, dumps(rv)) - p.expire(job.rv_key, self.rv_ttl) + p.set(job.result, dumps(rv)) + p.expire(job.result, self.rv_ttl) p.execute() + return True diff --git a/tests/test_queue.py b/tests/test_queue.py index 0cfe613..d1fa44e 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,6 +1,7 @@ from tests import RQTestCase from tests import testjob from rq import Queue +from rq.job import Job class TestQueue(RQTestCase): @@ -50,6 +51,22 @@ class TestQueue(RQTestCase): self.assertEquals(self.testconn.llen(q_key), 1) self.assertEquals(self.testconn.lrange(q_key, 0, -1)[0], job_id) + def test_enqueue_sets_metadata(self): + """Enqueueing job onto queues modifies meta data.""" + q = Queue() + job = Job.for_call(testjob, 'Nick', foo='bar') + + # Preconditions + self.assertIsNone(job.origin) + self.assertIsNone(job.enqueued_at) + + # Action + q.enqueue_job(job) + + # Postconditions + self.assertEquals(job.origin, q.name) + self.assertIsNotNone(job.enqueued_at) + def test_pop_job_id(self): """Popping job IDs from queues.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 5065f83..8a0f2cf 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -56,15 +56,29 @@ class TestWorker(RQTestCase): q = Queue() failure_q = Queue('failure') + # Preconditions self.assertEquals(failure_q.count, 0) self.assertEquals(q.count, 0) - q.enqueue(failing_job) + # Action + job = q.enqueue(failing_job) self.assertEquals(q.count, 1) + # keep for later + enqueued_at_date = strip_milliseconds(job.enqueued_at) + w = Worker([q]) w.work(burst=True) # should silently pass + + # Postconditions self.assertEquals(q.count, 0) self.assertEquals(failure_q.count, 1) + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.origin, q.name) + + # should be the original enqueued_at date, not the date of enqueueing to the failure queue + self.assertEquals(job.enqueued_at, enqueued_at_date) + self.assertIsNotNone(job.exc_info) # should contain exc_info