diff --git a/rq/exceptions.py b/rq/exceptions.py index 7f8df37..25e4f0e 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -15,9 +15,6 @@ class UnpickleError(Exception): super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data + class DequeueTimeout(Exception): pass - - -class EnqueueError(Exception): - pass \ No newline at end of file diff --git a/rq/queue.py b/rq/queue.py index 0e8bdb7..8a60102 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,7 +1,7 @@ import times from .connections import resolve_connection from .job import Job, Status -from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError, +from .exceptions import (DequeueTimeout, InvalidJobOperationError, NoSuchJobError, UnpickleError) from .compat import total_ordering @@ -145,20 +145,21 @@ class Queue(object): dependency=after) # If job depends on an unfinished job, register itself on it's - # parent's waitlist instead of enqueueing it + # parent's waitlist instead of enqueueing it. + # If WatchError is raised in the process, that means something else is + # modifying the dependency. In this case we simply retry if after is not None: with self.connection.pipeline() as pipe: - try: - pipe.watch(after.key) - if after.status != Status.FINISHED: - job.register_dependency() - job.save() - return job - except WatchError: - raise EnqueueError( - 'Parent job (%s) modified during enqueue process. ' + - 'Bailing out to avoid race conditions' % after.id - ) + while True: + try: + pipe.watch(after.key) + if after.status != Status.FINISHED: + job.register_dependency() + job.save() + return job + break + except WatchError: + continue return self.enqueue_job(job, timeout=timeout)