Only enqueue dependents for all dependencies are FINISHED

main
thomas 5 years ago committed by Thomas Matecki
parent ee215a1853
commit 4a64244f40

@ -464,9 +464,25 @@ class Queue(object):
dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection)
for job_id in pipe.smembers(dependents_key)]
dependencies_statuses = [
dependent.get_dependencies_statuses(watch=True, pipeline=pipe)
for dependent in dependent_jobs
]
pipe.multi()
for dependent, dependents_dependencies in dependent_jobs:
for dependent, dependents_dependencies in zip(dependent_jobs,
dependencies_statuses):
# Enqueue this dependent job only if all of it's _other_
# dependencies are FINISHED.
if not all(
status == JobStatus.FINISHED
for job_id, status
in dependents_dependencies
if job_id != job.id
):
continue
registry = DeferredJobRegistry(dependent.origin,
self.connection,

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import json
from datetime import datetime, timedelta
from mock.mock import patch
from rq import Queue
from rq.compat import utc
@ -524,6 +525,102 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [])
def test_enqueue_job_with_multiple_queued_dependencies(self):
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.QUEUED
job.save()
job_create = Job.create
def create_job_patch(*args, **kwargs):
# patch Job#create to set parent jobs as dependencies.
job = job_create(*args, **kwargs)
job._dependency_ids = [job.id for job in parent_jobs]
return job
q = Queue()
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
job = q.enqueue(say_hello, depends_on=parent_jobs[0])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueue_job_with_multiple_finished_dependencies(self):
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.FINISHED
job.save()
job_create = Job.create
def create_job_patch(*args, **kwargs):
# patch Job#create to set parent jobs as dependencies.
job = job_create(*args, **kwargs)
job._dependency_ids = [job.id for job in parent_jobs]
return job
q = Queue()
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
job = q.enqueue(say_hello, depends_on=parent_jobs[0])
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueues_dependent_if_other_dependencies_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED)
started_dependency.save()
finished_dependency = Job.create(func=say_hello, status=JobStatus.FINISHED)
finished_dependency.save()
job_create = Job.create
def create_job_patch(*args, **kwargs):
# patch Job#create to set parent jobs as dependencies.
job = job_create(*args, **kwargs)
job._dependency_ids = [job.id for job in [started_dependency, finished_dependency]]
return job
q = Queue()
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency])
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency)
self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [dependent_job.id])
def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED)
started_dependency.save()
queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED)
queued_dependency.save()
job_create = Job.create
def create_job_patch(*args, **kwargs):
# patch Job#create to set parent jobs as dependencies.
job = job_create(*args, **kwargs)
job._dependency_ids = [job.id for job in [started_dependency, queued_dependency]]
return job
q = Queue()
with patch.object(Job, 'create', create_job_patch) as patch_create_job:
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency])
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
def test_fetch_job_successful(self):
"""Fetch a job from a queue."""
q = Queue('example')

Loading…
Cancel
Save