Don't fail if job dependency is modified during enqueue process.

main
Selwin Ong 12 years ago
parent e7e8579888
commit 6ee45597ca

@ -15,9 +15,6 @@ class UnpickleError(Exception):
super(UnpickleError, self).__init__(message, inner_exception) super(UnpickleError, self).__init__(message, inner_exception)
self.raw_data = raw_data self.raw_data = raw_data
class DequeueTimeout(Exception):
pass
class DequeueTimeout(Exception):
class EnqueueError(Exception):
pass pass

@ -1,7 +1,7 @@
import times import times
from .connections import resolve_connection from .connections import resolve_connection
from .job import Job, Status from .job import Job, Status
from .exceptions import (DequeueTimeout, EnqueueError, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobOperationError,
NoSuchJobError, UnpickleError) NoSuchJobError, UnpickleError)
from .compat import total_ordering from .compat import total_ordering
@ -145,20 +145,21 @@ class Queue(object):
dependency=after) dependency=after)
# If job depends on an unfinished job, register itself on it's # If job depends on an unfinished job, register itself on it's
# parent's waitlist instead of enqueueing it # parent's waitlist instead of enqueueing it.
# If WatchError is raised in the process, that means something else is
# modifying the dependency. In this case we simply retry
if after is not None: if after is not None:
with self.connection.pipeline() as pipe: with self.connection.pipeline() as pipe:
try: while True:
pipe.watch(after.key) try:
if after.status != Status.FINISHED: pipe.watch(after.key)
job.register_dependency() if after.status != Status.FINISHED:
job.save() job.register_dependency()
return job job.save()
except WatchError: return job
raise EnqueueError( break
'Parent job (%s) modified during enqueue process. ' + except WatchError:
'Bailing out to avoid race conditions' % after.id continue
)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)

Loading…
Cancel
Save