|
|
@ -675,6 +675,13 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|
|
|
|
# if dependencies are inserted after enqueue_dependents
|
|
|
|
|
|
|
|
# a WatchError is thrown by execute()
|
|
|
|
|
|
|
|
pipeline.watch(job.dependents_key)
|
|
|
|
|
|
|
|
queue.enqueue_dependents(job, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# pipeline all following commands (reads won't work!)
|
|
|
|
|
|
|
|
pipeline.multi()
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
if result_ttl != 0:
|
|
|
|
if result_ttl != 0:
|
|
|
@ -685,10 +692,6 @@ class Worker(object):
|
|
|
|
self.connection)
|
|
|
|
self.connection)
|
|
|
|
finished_job_registry.add(job, result_ttl, pipeline)
|
|
|
|
finished_job_registry.add(job, result_ttl, pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
# avoid missing dependents that where inserted after enqueue_dependents()
|
|
|
|
|
|
|
|
pipeline.watch(job.dependents_key)
|
|
|
|
|
|
|
|
queue.enqueue_dependents(job, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline,
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline,
|
|
|
|
remove_from_queue=False)
|
|
|
|
remove_from_queue=False)
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|