From 591f11bcc32805e975dd32eaa05f19e7458883aa Mon Sep 17 00:00:00 2001 From: Josh Cohen Date: Mon, 21 Jun 2021 23:30:33 -0400 Subject: [PATCH] Ensure pipeline in multi mode after dep setup (#1498) --- rq/queue.py | 2 ++ tests/test_queue.py | 16 ++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index 8890d27..e5072cb 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -366,6 +366,8 @@ class Queue: else: # if pipeline comes from caller, re-raise to them raise + elif pipeline is not None: + pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job def enqueue_call(self, func, args=None, kwargs=None, timeout=None, diff --git a/tests/test_queue.py b/tests/test_queue.py index 9d00545..510c671 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -539,6 +539,22 @@ class TestQueue(RQTestCase): self.assertEqual(job.timeout, Queue.DEFAULT_TIMEOUT) self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + def test_enqueue_job_with_no_dependency_prior_watch_and_pipeline(self): + """Jobs are enqueued only when their dependencies are finished, and by the caller when passing a pipeline.""" + q = Queue() + with q.connection.pipeline() as pipe: + pipe.watch(b'fake_key') # Test watch then enqueue + job = q.enqueue_call(say_hello, pipeline=pipe) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.get_status(refresh=False), JobStatus.QUEUED) + # Not in queue before execute, since passed in pipeline + self.assertEqual(len(q), 0) + # Make sure modifying key doesn't cause issues, if in multi mode won't fail + pipe.set(b'fake_key', b'fake_value') + pipe.execute() + # Only in registry after execute, since passed in pipeline + self.assertEqual(len(q), 1) + def test_enqueue_many_internal_pipeline(self): """Jobs should be enqueued in bulk with an internal pipeline, enqueued in order provided (but at_front still applies)"""