From 4a64244f405348e534512c40894cba5d9355b126 Mon Sep 17 00:00:00 2001 From: thomas Date: Sat, 14 Dec 2019 20:57:39 -0500 Subject: [PATCH] Only enqueue dependents for all dependencies are FINISHED --- rq/queue.py | 18 ++++++++- tests/test_queue.py | 97 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 114 insertions(+), 1 deletion(-) diff --git a/rq/queue.py b/rq/queue.py index 56ec31a..a354d1c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -464,9 +464,25 @@ class Queue(object): dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) for job_id in pipe.smembers(dependents_key)] + dependencies_statuses = [ + dependent.get_dependencies_statuses(watch=True, pipeline=pipe) + for dependent in dependent_jobs + ] + pipe.multi() - for dependent, dependents_dependencies in dependent_jobs: + for dependent, dependents_dependencies in zip(dependent_jobs, + dependencies_statuses): + + # Enqueue this dependent job only if all of it's _other_ + # dependencies are FINISHED. + if not all( + status == JobStatus.FINISHED + for job_id, status + in dependents_dependencies + if job_id != job.id + ): + continue registry = DeferredJobRegistry(dependent.origin, self.connection, diff --git a/tests/test_queue.py b/tests/test_queue.py index 7004aad..4b43677 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import json from datetime import datetime, timedelta +from mock.mock import patch from rq import Queue from rq.compat import utc @@ -524,6 +525,102 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, []) + def test_enqueue_job_with_multiple_queued_dependencies(self): + + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.QUEUED + job.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in parent_jobs] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.job_ids, []) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueue_job_with_multiple_finished_dependencies(self): + + parent_jobs = [Job.create(func=say_hello) for _ in range(2)] + + for job in parent_jobs: + job._status = JobStatus.FINISHED + job.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in parent_jobs] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + job = q.enqueue(say_hello, depends_on=parent_jobs[0]) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [job.id]) + self.assertEqual(job.fetch_dependencies(), parent_jobs) + + def test_enqueues_dependent_if_other_dependencies_finished(self): + + started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) + started_dependency.save() + + finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED) + finished_dependency.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + + q.enqueue_dependents(started_dependency) + self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED) + self.assertEqual(q.job_ids, [dependent_job.id]) + + def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self): + + started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED) + started_dependency.save() + + queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED) + queued_dependency.save() + + job_create = Job.create + + def create_job_patch(*args, **kwargs): + # patch Job#create to set parent jobs as dependencies. + job = job_create(*args, **kwargs) + job._dependency_ids = [job.id for job in [started_dependency, queued_dependency]] + return job + + q = Queue() + with patch.object(Job, 'create', create_job_patch) as patch_create_job: + dependent_job = q.enqueue(say_hello, depends_on=[started_dependency]) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + + q.enqueue_dependents(started_dependency) + self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED) + self.assertEqual(q.job_ids, []) + def test_fetch_job_successful(self): """Fetch a job from a queue.""" q = Queue('example')