From 4750eb43161cf6d2dee5cab4c20b7257476fbc1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20R=C3=BCegg?= Date: Sat, 1 Oct 2022 11:48:30 +0200 Subject: [PATCH] Fix broken pipeline / transaction (#1715) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jan Rüegg --- rq/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 8a0c40e..9e940fe 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1027,9 +1027,14 @@ class Worker: # if dependencies are inserted after enqueue_dependents # a WatchError is thrown by execute() pipeline.watch(job.dependents_key) - # enqueue_dependents calls multi() on the pipeline! + # enqueue_dependents might call multi() on the pipeline queue.enqueue_dependents(job, pipeline=pipeline) + if not pipeline.explicit_transaction: + # enqueue_dependents didn't call multi after all! + # We have to do it ourselves to make sure everything runs in a transaction + pipeline.multi() + self.set_current_job_id(None, pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline) self.increment_total_working_time(