Fixed job not getting enqueued due to parallel modification (#1615)

The handling of WatchErrors in setup_dependencies() did not reset
the local status of the job, so if, due to parallel processing, all
dependencies get finished while enqueuing the job (causing
WatchError to be raised), the job returned to enqueue_call()
remained in DEFERRED state, which resulted in no execution at all.
main
th3hamm0r 3 years ago committed by GitHub
parent c5a1ef1734
commit 7be119068f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -343,6 +343,7 @@ class Queue:
# something else has modified either the set of dependencies or the # something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry. # status of one of them. In this case, we simply retry.
if len(job._dependency_ids) > 0: if len(job._dependency_ids) > 0:
orig_status = job.get_status(refresh=False)
pipe = pipeline if pipeline is not None else self.connection.pipeline() pipe = pipeline if pipeline is not None else self.connection.pipeline()
while True: while True:
try: try:
@ -360,6 +361,8 @@ class Queue:
for dependency in dependencies: for dependency in dependencies:
if dependency.get_status(refresh=False) != JobStatus.FINISHED: if dependency.get_status(refresh=False) != JobStatus.FINISHED:
# NOTE: If the following code changes local variables, those values probably have
# to be set back to their original values in the handling of WatchError below!
job.set_status(JobStatus.DEFERRED, pipeline=pipe) job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
@ -370,6 +373,11 @@ class Queue:
break break
except WatchError: except WatchError:
if pipeline is None: if pipeline is None:
# The call to job.set_status(JobStatus.DEFERRED, pipeline=pipe) above has changed the
# internal "_status". We have to reset it to its original value (probably QUEUED), so
# if during the next run no unfinished dependencies exist anymore, the job gets
# enqueued correctly by enqueue_call().
job._status = orig_status
continue continue
else: else:
# if pipeline comes from caller, re-raise to them # if pipeline comes from caller, re-raise to them

Loading…
Cancel
Save