Avoid race conditions when enqueueing job with dependency.

main
Selwin Ong 12 years ago
parent 6550f86646
commit 18ff57ef35

@ -17,3 +17,7 @@ class UnpickleError(Exception):
class DequeueTimeout(Exception): class DequeueTimeout(Exception):
pass pass
class EnqueueError(Exception):
pass

@ -1,10 +1,12 @@
import times import times
from .connections import resolve_connection from .connections import resolve_connection
from .job import Job, Status from .job import Job, Status
from .exceptions import (NoSuchJobError, UnpickleError, from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError,
InvalidJobOperationError, DequeueTimeout) NoSuchJobError, UnpickleError)
from .compat import total_ordering from .compat import total_ordering
from redis import WatchError
def get_failed_queue(connection=None): def get_failed_queue(connection=None):
"""Returns a handle to the special failed queue.""" """Returns a handle to the special failed queue."""
@ -127,12 +129,23 @@ class Queue(object):
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, status=Status.QUEUED,
parent=after) parent=after)
# If job depends on another job to finish, register itself on it's
# If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it # parent's waitlist instead of enqueueing it
if after is not None: if after is not None:
job.register_dependency() with self.connection.pipeline() as pipe:
job.save() try:
return job pipe.watch(after.key)
if after.status != Status.FINISHED:
job.register_dependency()
job.save()
return job
except WatchError:
raise EnqueueError(
'Parent job (%s) modified during enqueue process. ' +
'Bailing out to avoid race conditions' % after.id
)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):

@ -256,12 +256,19 @@ class TestQueue(RQTestCase):
self.assertFalse(self.testconn.exists(parent_job.waitlist_key)) self.assertFalse(self.testconn.exists(parent_job.waitlist_key))
def test_enqueue_job_with_dependency(self): def test_enqueue_job_with_dependency(self):
"""Job with dependency is not queued right away""" """Test enqueueing job with dependency"""
# Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
q = Queue() q = Queue()
q.enqueue_call(say_hello, after=parent_job) q.enqueue_call(say_hello, after=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
# Jobs dependent on finished jobs are immediately enqueued
parent_job.status = 'finished'
parent_job.save()
job = q.enqueue_call(say_hello, after=parent_job)
self.assertEqual(q.job_ids, [job.id])
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):

@ -236,7 +236,7 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_failed, True) self.assertEqual(job.is_failed, True)
def test_job_dependency(self): def test_job_dependency(self):
"""Waitlisted jobs are enqueued only if their parents don't fail""" """Enqueue waitlisted jobs only if their parents don't fail"""
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
parent_job = q.enqueue(say_hello) parent_job = q.enqueue(say_hello)

Loading…
Cancel
Save