diff --git a/rq/exceptions.py b/rq/exceptions.py index 982a580..7f8df37 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -17,3 +17,7 @@ class UnpickleError(Exception): class DequeueTimeout(Exception): pass + + +class EnqueueError(Exception): + pass \ No newline at end of file diff --git a/rq/queue.py b/rq/queue.py index 31d33c9..da0154a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,10 +1,12 @@ import times from .connections import resolve_connection from .job import Job, Status -from .exceptions import (NoSuchJobError, UnpickleError, - InvalidJobOperationError, DequeueTimeout) +from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError, + NoSuchJobError, UnpickleError) from .compat import total_ordering +from redis import WatchError + def get_failed_queue(connection=None): """Returns a handle to the special failed queue.""" @@ -127,12 +129,23 @@ class Queue(object): job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, parent=after) - # If job depends on another job to finish, register itself on it's + + # If job depends on an unfinished job, register itself on it's # parent's waitlist instead of enqueueing it if after is not None: - job.register_dependency() - job.save() - return job + with self.connection.pipeline() as pipe: + try: + pipe.watch(after.key) + if after.status != Status.FINISHED: + job.register_dependency() + job.save() + return job + except WatchError: + raise EnqueueError( + 'Parent job (%s) modified during enqueue process. ' + + 'Bailing out to avoid race conditions' % after.id + ) + return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): diff --git a/tests/test_queue.py b/tests/test_queue.py index b596004..9304e42 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -256,12 +256,19 @@ class TestQueue(RQTestCase): self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) def test_enqueue_job_with_dependency(self): - """Job with dependency is not queued right away""" + """Test enqueueing job with dependency""" + # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) q = Queue() q.enqueue_call(say_hello, after=parent_job) self.assertEqual(q.job_ids, []) + # Jobs dependent on finished jobs are immediately enqueued + parent_job.status = 'finished' + parent_job.save() + job = q.enqueue_call(say_hello, after=parent_job) + self.assertEqual(q.job_ids, [job.id]) + class TestFailedQueue(RQTestCase): def test_requeue_job(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 8615801..7e4bd2e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -236,7 +236,7 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_failed, True) def test_job_dependency(self): - """Waitlisted jobs are enqueued only if their parents don't fail""" + """Enqueue waitlisted jobs only if their parents don't fail""" q = Queue() w = Worker([q]) parent_job = q.enqueue(say_hello)