Job should not be enqueued if dependency is canceled (#1695)

* Fix job.dependencies_are_met() if dependency is canceled

* Slightly better test coverage on dependencies_are_met()

* Fixed job.cancel(enqueue_dependent=True)
main
Selwin Ong 2 years ago committed by GitHub
parent 9db728921d
commit 8e3283dab3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -734,7 +734,7 @@ class Job:
# Only WATCH if no pipeline passed, otherwise caller is responsible # Only WATCH if no pipeline passed, otherwise caller is responsible
if pipeline is None: if pipeline is None:
pipe.watch(self.dependents_key) pipe.watch(self.dependents_key)
q.enqueue_dependents(self, pipeline=pipeline) q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id)
self._remove_from_registries( self._remove_from_registries(
pipeline=pipe, pipeline=pipe,
remove_from_queue=True remove_from_queue=True
@ -982,7 +982,7 @@ class Job:
return [Job.key_for(_id.decode()) return [Job.key_for(_id.decode())
for _id in dependencies] for _id in dependencies]
def dependencies_are_met(self, parent_job=None, pipeline=None): def dependencies_are_met(self, parent_job=None, pipeline=None, exclude_job_id=None):
"""Returns a boolean indicating if all of this job's dependencies are _FINISHED_ """Returns a boolean indicating if all of this job's dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed. If a pipeline is passed, all dependencies are WATCHed.
@ -1001,13 +1001,18 @@ class Job:
dependencies_ids = {_id.decode() dependencies_ids = {_id.decode()
for _id in connection.smembers(self.dependencies_key)} for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
if parent_job.id == exclude_job_id:
parent_job = None
if parent_job: if parent_job:
# If parent job is canceled, no need to check for status # If parent job is canceled, treat dependency as failed
# If parent job is not finished, we should only continue # If parent job is not finished, we should only continue
# if this job allows parent job to fail # if this job allows parent job to fail
dependencies_ids.discard(parent_job.id) dependencies_ids.discard(parent_job.id)
if parent_job._status == JobStatus.CANCELED: if parent_job._status == JobStatus.CANCELED:
pass return False
elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures: elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures:
return False return False

@ -602,7 +602,7 @@ class Queue:
return job return job
def enqueue_dependents(self, job, pipeline=None): def enqueue_dependents(self, job, pipeline=None, exclude_job_id=None):
"""Enqueues all jobs in the given job's dependents set and clears it. """Enqueues all jobs in the given job's dependents set and clears it.
When called without a pipeline, this method uses WATCH/MULTI/EXEC. When called without a pipeline, this method uses WATCH/MULTI/EXEC.
@ -638,6 +638,7 @@ class Queue:
) if dependent_job and dependent_job.dependencies_are_met( ) if dependent_job and dependent_job.dependencies_are_met(
parent_job=job, parent_job=job,
pipeline=pipe, pipeline=pipe,
exclude_job_id=exclude_job_id,
) )
] ]

@ -1,7 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello
from rq import Queue, SimpleWorker from rq import Queue, SimpleWorker, Worker
from rq.job import Job, JobStatus, Dependency from rq.job import Job, JobStatus, Dependency
@ -97,3 +97,48 @@ class TestDependencies(RQTestCase):
w.work(burst=True) w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn) job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED)
def test_dependencies_are_met_if_parent_is_canceled(self):
"""When parent job is canceled, it should be treated as failed"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
job.set_status(JobStatus.CANCELED)
dependent_job = queue.enqueue(say_hello, depends_on=job)
# dependencies_are_met() should return False, whether or not
# parent_job is provided
self.assertFalse(dependent_job.dependencies_are_met(job))
self.assertFalse(dependent_job.dependencies_are_met())
def test_can_enqueue_job_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(say_hello, result_ttl=0)
w = Worker([queue])
w.work(burst=True)
assert queue.enqueue(say_hello, depends_on=dependency_job)
def test_dependencies_are_met_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(say_hello, result_ttl=0)
dependent_job = queue.enqueue(say_hello, depends_on=dependency_job)
w = Worker([queue])
w.work(burst=True, max_jobs=1)
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.testconn)
queue.empty()
queue.enqueue(say_hello, job_id="A")
queue.enqueue(say_hello, job_id="B")
job_c = queue.enqueue(check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
job_c.dependencies_are_met()
w = Worker([queue])
w.work(burst=True)
assert job_c.result

@ -1119,40 +1119,6 @@ class TestJob(RQTestCase):
pipeline.touch(Job.key_for(dependent_job.id)) pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute() pipeline.execute()
def test_can_enqueue_job_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
w = Worker([queue])
w.work(burst=True)
assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
def test_dependents_are_met_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
w = Worker([queue])
w.work(burst=True, max_jobs=1)
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.testconn)
queue.empty()
queue.enqueue(fixtures.say_hello, job_id="A")
queue.enqueue(fixtures.say_hello, job_id="B")
job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
job_C.dependencies_are_met()
w = Worker([queue])
w.work(burst=True)
assert job_C.result
def test_execution_order_with_sole_dependency(self): def test_execution_order_with_sole_dependency(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
key = 'test_job:job_order' key = 'test_job:job_order'

Loading…
Cancel
Save