From 6550f866463cf3d25da5a4ee6b5ec5dcf31f01d3 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Thu, 18 Apr 2013 22:18:56 +0700 Subject: [PATCH] Don't enqueue waitlisted jobs on failed execution. --- rq/queue.py | 3 ++- rq/worker.py | 3 ++- tests/test_worker.py | 9 ++++++++- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index f70033a..31d33c9 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -113,7 +113,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None, + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, after=None): """Creates a job to represent the delayed function call and enqueues it. @@ -123,6 +123,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout + # TODO: job with dependency shouldn't have "queued" as status job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, parent=after) diff --git a/rq/worker.py b/rq/worker.py index 8d43f9f..6d35656 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -328,7 +328,8 @@ class Worker(object): self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) self.connection.expire(self.key, self.default_worker_ttl) - queue.enqueue_waitlist(job) + if job.status == 'finished': + queue.enqueue_waitlist(job) did_perform_work = True finally: diff --git a/tests/test_worker.py b/tests/test_worker.py index 143fdbc..8615801 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -236,10 +236,17 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_failed, True) def test_job_dependency(self): + """Waitlisted jobs are enqueued only if their parents don't fail""" q = Queue() w = Worker([q]) parent_job = q.enqueue(say_hello) job = q.enqueue_call(say_hello, after=parent_job) w.work(burst=True) job = Job.fetch(job.id) - self.assertEqual(job.status, 'finished') + self.assertEqual(job.status, 'finished') + + parent_job = q.enqueue(div_by_zero) + job = q.enqueue_call(say_hello, after=parent_job) + w.work(burst=True) + job = Job.fetch(job.id) + self.assertNotEqual(job.status, 'finished')