diff --git a/rq/worker.py b/rq/worker.py index 39547cc..b44ffe7 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -675,6 +675,13 @@ class Worker(object): while True: try: + # if dependencies are inserted after enqueue_dependents + # a WatchError is thrown by execute() + pipeline.watch(job.dependents_key) + queue.enqueue_dependents(job, pipeline=pipeline) + + # pipeline all following commands (reads won't work!) + pipeline.multi() self.set_current_job_id(None, pipeline=pipeline) if result_ttl != 0: @@ -685,10 +692,6 @@ class Worker(object): self.connection) finished_job_registry.add(job, result_ttl, pipeline) - # avoid missing dependents that where inserted after enqueue_dependents() - pipeline.watch(job.dependents_key) - queue.enqueue_dependents(job, pipeline=pipeline) - job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) started_job_registry.remove(job, pipeline=pipeline)