protecting dependency enqueuing with redis WATCH

main
Stefan Hammer 8 years ago
parent 69b43daa72
commit 7b8d4e075e

@ -293,8 +293,10 @@ class Queue(object):
# TODO: can probably be pipelined
from .registry import DeferredJobRegistry
dependents_connection = pipeline if pipeline is not None else self.connection
while True:
job_id = as_text(self.connection.spop(job.dependents_key))
job_id = as_text(dependents_connection.spop(job.dependents_key))
if job_id is None:
break
dependent = self.job_class.fetch(job_id, connection=self.connection)

@ -14,6 +14,8 @@ import traceback
import warnings
from datetime import timedelta
from redis import WatchError
from rq.compat import as_text, string_types, text_type
from .compat import PY2
@ -667,24 +669,34 @@ class Worker(object):
# to use the same exc handling when pickling fails
job._result = rv
self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline)
while True:
try:
self.set_current_job_id(None, pipeline=pipeline)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
queue.enqueue_dependents(job, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline)
pipeline.execute()
# avoid missing dependents that where inserted after enqueue_dependents()
pipeline.watch(job.dependents_key)
queue.enqueue_dependents(job, pipeline=pipeline)
job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
break
except WatchError:
continue
except Exception:
self.handle_job_failure(

Loading…
Cancel
Save