diff --git a/rq/queue.py b/rq/queue.py index e5d3df8..8f288ca 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -293,8 +293,10 @@ class Queue(object): # TODO: can probably be pipelined from .registry import DeferredJobRegistry + dependents_connection = pipeline if pipeline is not None else self.connection + while True: - job_id = as_text(self.connection.spop(job.dependents_key)) + job_id = as_text(dependents_connection.spop(job.dependents_key)) if job_id is None: break dependent = self.job_class.fetch(job_id, connection=self.connection) diff --git a/rq/worker.py b/rq/worker.py index 95bcded..39547cc 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -14,6 +14,8 @@ import traceback import warnings from datetime import timedelta +from redis import WatchError + from rq.compat import as_text, string_types, text_type from .compat import PY2 @@ -667,24 +669,34 @@ class Worker(object): # to use the same exc handling when pickling fails job._result = rv - self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection) - finished_job_registry.add(job, result_ttl, pipeline) + while True: + try: + self.set_current_job_id(None, pipeline=pipeline) + + if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.save(pipeline=pipeline) - queue.enqueue_dependents(job, pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) - started_job_registry.remove(job, pipeline=pipeline) + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) + finished_job_registry.add(job, result_ttl, pipeline) - pipeline.execute() + # avoid missing dependents that where inserted after enqueue_dependents() + pipeline.watch(job.dependents_key) + queue.enqueue_dependents(job, pipeline=pipeline) + + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) + started_job_registry.remove(job, pipeline=pipeline) + + pipeline.execute() + break + except WatchError: + continue except Exception: self.handle_job_failure(