Dependency with failures (#1681)

* added Dependency class with allow_failures

* Requested changes

* Check type before setting `job.dependency_allow_fail` within `Job.create`

* Set `job.dependency_allow_fail` within `Job.create`

* Added test to ensure persistence of `dependency_allow_fail`

* Removed typing and allow mixed list of ints and Job objects

* Convert dependency_allow_fail boolean to integer during serialization to avoid redis DataError

* Updated `test_multiple_dependencies_are_accepted_and_persisted` test to include `Dependency` cases

* Adding placeholder test to test actual behavior of new `Dependency` usage in `depends_on`

* Updated `test_job_dependency` to include cases using `Dependency`

* Added dependency_allow_fail logic to `Job.restore`

* Renamed `dependency_allow_fail` to a simpler `allow_failure`

* Update docs to add section about the new `Dependency` class and use-case

* Updated `Job.dependencies_are_met` logic to take `FAILED` and `STOPPED` jobs into account when `allow_failure=True`

* Updated `test_job_dependency` test. Still failing with `Dependency` case.

* Fix `allow_failure` type coercion in `Job.restore`

* Re-arrange tests, so default `Dependency.allow_failure` is before explicit `allow_failure=True`

* Fixed Dependency, so it works correctly when allow_failure=True

* Attempt to execute pipeline prior to queueing a failed job's dependents. test_create_and_cancel_job_enqueue_dependents_in_registry test now passes.

* Added `Depedency` test utilizing multiple dependencies

* Removed irrelevant on_success and on_failure keyword arguments in example

* Replaced use of long_running_job

* Add test to verify `Dependency.jobs` contraints

* Suppress connection error in handle_job_failure

* test_dependencies have passed

* All tests pass if enqueue_dependents called without pipeline.watch()

* All tests now pass

* Removed print statements

* Cleanup Dependency implementation

* Renamed job.allow_failure to job.allow_dependency_failures

Co-authored-by: mattchan <mattchan@tencent.com>
Co-authored-by: Mike Hill <mhilluniversal@gmail.com>
main
Selwin Ong 2 years ago committed by GitHub
parent d82af1469f
commit 5b95725dc4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -155,9 +155,33 @@ baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job])
```
The ability to handle job dependencies allows you to split a big job into
several smaller ones. A job that is dependent on another is enqueued only when
several smaller ones. By default, a job that is dependent on another is enqueued only when
its dependency finishes *successfully*.
_New in 1.11.0._
If you want a job's dependencies to execute regardless if the job completes or fails, RQ provides
the `Dependency` class that will allow you to dictate how to handle job failures.
The `Dependency(jobs=...)` parameter accepts:
- a string representing a single job id
- a Job object
- an iteratable of job id strings and/or Job objects
Example:
```python
from redis import Redis
from rq.job import Dependency
from rq import Queue
queue = Queue(connection=Redis())
job_1 = queue.enqueue(div_by_zero)
dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False
job_2 = queue.enqueue(say_hello, depends_on=dependency)
# job_2 will execute even though its dependency (job_1) fails
```
## Job Callbacks
_New in version 1.9.0._

@ -38,6 +38,22 @@ class JobStatus(str, Enum):
CANCELED = 'canceled'
class Dependency:
def __init__(self, jobs, allow_failure: bool = False):
jobs = ensure_list(jobs)
if not all(
isinstance(job, Job) or isinstance(job, str)
for job in jobs
if job
):
raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
elif len(jobs) < 1:
raise ValueError("jobs: cannot be empty.")
self.dependencies = jobs
self.allow_failure = allow_failure
# Sentinel value to mark that some of our lazily evaluated properties have not
# yet been evaluated.
UNEVALUATED = object()
@ -134,8 +150,16 @@ class Job:
# dependency could be job instance or id, or iterable thereof
if depends_on is not None:
job._dependency_ids = [dep.id if isinstance(dep, Job) else dep
for dep in ensure_list(depends_on)]
if isinstance(depends_on, Dependency):
job.allow_dependency_failures = depends_on.allow_failure
depends_on_list = depends_on.dependencies
else:
depends_on_list = ensure_list(depends_on)
job._dependency_ids = [
dep.id if isinstance(dep, Job) else dep
for dep in depends_on_list
]
return job
def get_position(self):
@ -404,6 +428,7 @@ class Job:
self.retry_intervals = None
self.redis_server_version = None
self.last_heartbeat = None
self.allow_dependency_failures = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -560,7 +585,7 @@ class Job:
dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids
else [dep_id.decode()] if dep_id else [])
self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {}
@ -640,6 +665,10 @@ class Job:
if self.ttl:
obj['ttl'] = self.ttl
if self.allow_dependency_failures is not None:
# convert boolean to integer to avoid redis.exception.DataError
obj["allow_dependency_failures"] = int(self.allow_dependency_failures)
return obj
def save(self, pipeline=None, include_meta=True):
@ -685,12 +714,12 @@ class Job:
You can enqueue the jobs dependents optionally,
Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in.
"""
if self.is_canceled:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
from .registry import CanceledJobRegistry
from .queue import Queue
pipe = pipeline or self.connection.pipeline()
while True:
try:
q = Queue(
@ -699,6 +728,8 @@ class Job:
job_class=self.__class__,
serializer=self.serializer
)
self.set_status(JobStatus.CANCELED, pipeline=pipe)
if enqueue_dependents:
# Only WATCH if no pipeline passed, otherwise caller is responsible
if pipeline is None:
@ -709,8 +740,6 @@ class Job:
remove_from_queue=True
)
self.set_status(JobStatus.CANCELED, pipeline=pipe)
registry = CanceledJobRegistry(
self.origin,
self.connection,
@ -726,7 +755,7 @@ class Job:
continue
else:
# if the pipeline comes from the caller, we re-raise the
# exception as it it the responsibility of the caller to
# exception as it is the responsibility of the caller to
# handle it
raise
@ -953,17 +982,16 @@ class Job:
return [Job.key_for(_id.decode())
for _id in dependencies]
def dependencies_are_met(self, exclude_job_id=None, pipeline=None):
"""Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
def dependencies_are_met(self, parent_job=None, pipeline=None):
"""Returns a boolean indicating if all of this job's dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
`exclude` allows us to exclude some job id from the status check. This is useful
when enqueueing the dependents of a _successful_ job -- that status of
`parent_job` allows us to directly pass parent_job for the status check.
This is useful when enqueueing the dependents of a _successful_ job -- that status of
`FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
method is _called_ in the _stack_ of it's dependents are being enqueued.
method is _called_ in the _stack_ of its dependents are being enqueued.
"""
connection = pipeline if pipeline is not None else self.connection
if pipeline is not None:
@ -973,8 +1001,19 @@ class Job:
dependencies_ids = {_id.decode()
for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
if parent_job:
# If parent job is canceled, no need to check for status
# If parent job is not finished, we should only continue
# if this job allows parent job to fail
dependencies_ids.discard(parent_job.id)
if parent_job._status == JobStatus.CANCELED:
pass
elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures:
return False
# If the only dependency is parent job, dependency has been met
if not dependencies_ids:
return True
with connection.pipeline() as pipeline:
for key in dependencies_ids:
@ -982,8 +1021,13 @@ class Job:
dependencies_statuses = pipeline.execute()
if self.allow_dependency_failures:
allowed_statuses = [JobStatus.FINISHED, JobStatus.FAILED]
else:
allowed_statuses = [JobStatus.FINISHED]
return all(
status.decode() == JobStatus.FINISHED
status.decode() in allowed_statuses
for status
in dependencies_statuses
if status

@ -11,7 +11,7 @@ from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
from .job import Job, JobStatus, Dependency
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow
@ -48,6 +48,7 @@ class Queue:
return cls.from_queue_key(as_text(queue_key),
connection=connection,
job_class=job_class, serializer=serializer)
return [to_queue(rq_key)
for rq_key in connection.smembers(cls.redis_queues_keys)
if rq_key]
@ -324,6 +325,9 @@ class Queue:
job.retries_left = retry.max
job.retry_intervals = retry.intervals
if isinstance(depends_on, Dependency):
job.allow_dependency_failures = depends_on.allow_failure
return job
def setup_dependencies(
@ -386,9 +390,8 @@ class Queue:
result_ttl=None, ttl=None, failure_ttl=None, description=None,
depends_on=None, job_id=None, at_front=False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None):
"""Creates a job to represent the delayed function call and enqueues
it.
nd
"""Creates a job to represent the delayed function call and enqueues it.
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
@ -611,14 +614,19 @@ nd
dependents_key = job.dependents_key
while True:
try:
# if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued
if pipeline is None:
pipe.watch(dependents_key)
dependent_job_ids = [as_text(_id)
for _id in pipe.smembers(dependents_key)]
dependent_job_ids = {as_text(_id)
for _id in pipe.smembers(dependents_key)}
# There's no dependents
if not dependent_job_ids:
break
jobs_to_enqueue = [
dependent_job for dependent_job
@ -627,13 +635,16 @@ nd
connection=self.connection,
serializer=self.serializer
) if dependent_job and dependent_job.dependencies_are_met(
exclude_job_id=job.id,
pipeline=pipe
parent_job=job,
pipeline=pipe,
)
]
pipe.multi()
if not jobs_to_enqueue:
break
for dependent in jobs_to_enqueue:
registry = DeferredJobRegistry(dependent.origin,
self.connection,
@ -646,11 +657,15 @@ nd
queue = self.__class__(name=dependent.origin, connection=self.connection)
queue.enqueue_job(dependent, pipeline=pipe)
pipe.delete(dependents_key)
# Only delete dependents_key if all dependents have been enqueued
if len(jobs_to_enqueue) == len(dependent_job_ids):
pipe.delete(dependents_key)
else:
enqueued_job_ids = [job.id for job in jobs_to_enqueue]
pipe.srem(dependents_key, *enqueued_job_ids)
if pipeline is None:
pipe.execute()
break
except WatchError:
if pipeline is None:

@ -18,7 +18,7 @@ try:
from signal import SIGKILL
except ImportError:
from signal import SIGTERM as SIGKILL
from contextlib import suppress
import redis.exceptions
from . import worker_registration
@ -54,7 +54,6 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
logger = logging.getLogger(__name__)
@ -226,7 +225,11 @@ class Worker:
)
self.ip_address = 'unknown'
else:
self.ip_address = [client['addr'] for client in connection.client_list() if client['name'] == self.name][0]
self.ip_address = [
client['addr']
for client in connection.client_list()
if client['name'] == self.name
][0]
else:
self.hostname = None
self.pid = None
@ -971,6 +974,8 @@ class Worker:
job_class=self.job_class, serializer=job.serializer)
failed_job_registry.add(job, ttl=job.failure_ttl,
exc_string=exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError):
pipeline.execute()
self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline)
@ -981,9 +986,14 @@ class Worker:
if retry:
job.retry(queue, pipeline)
enqueue_dependents = False
else:
enqueue_dependents = True
try:
pipeline.execute()
if enqueue_dependents:
queue.enqueue_dependents(job)
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
@ -991,6 +1001,7 @@ class Worker:
def handle_job_success(self, job, queue, started_job_registry):
self.log.debug('Handling successful execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
while True:
try:
@ -1252,6 +1263,7 @@ class RoundRobinWorker(Worker):
"""
Modified version of Worker that dequeues jobs from the queues using a round-robin strategy.
"""
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1]

@ -0,0 +1,99 @@
from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
from rq import Queue, SimpleWorker
from rq.job import Job, JobStatus, Dependency
class TestDependencies(RQTestCase):
def test_allow_failure_is_persisted(self):
"""Ensure that job.allow_dependency_failures is properly set
when providing Dependency object to depends_on."""
dep_job = Job.create(func=say_hello)
# default to False, maintaining current behavior
job = Job.create(func=say_hello, depends_on=Dependency([dep_job]))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertFalse(job.allow_dependency_failures)
job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True))
job.save()
job = Job.fetch(job.id, connection=self.testconn)
self.assertTrue(job.allow_dependency_failures)
jobs = Job.fetch_many([job.id], connection=self.testconn)
self.assertTrue(jobs[0].allow_dependency_failures)
def test_job_dependency(self):
"""Enqueue dependent jobs only when appropriate"""
q = Queue(connection=self.testconn)
w = SimpleWorker([q], connection=q.connection)
# enqueue dependent job when parent successfully finishes
parent_job = q.enqueue(say_hello)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# don't enqueue dependent job when parent fails
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
q.empty()
# don't enqueue dependent job when Dependency.allow_failure=False (the default)
parent_job = q.enqueue(div_by_zero)
dependency = Dependency(jobs=parent_job)
job = q.enqueue_call(say_hello, depends_on=dependency)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
# enqueue dependent job when Dependency.allow_failure=True
parent_job = q.enqueue(div_by_zero)
dependency = Dependency(jobs=parent_job, allow_failure=True)
job = q.enqueue_call(say_hello, depends_on=dependency)
job = Job.fetch(job.id, connection=self.testconn)
self.assertTrue(job.allow_dependency_failures)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
# When a failing job has multiple dependents, only enqueue those
# with allow_failure=True
parent_job = q.enqueue(div_by_zero)
job_allow_failure = q.enqueue(say_hello,
depends_on=Dependency(jobs=parent_job, allow_failure=True))
job = q.enqueue(say_hello,
depends_on=Dependency(jobs=parent_job, allow_failure=False))
w.work(burst=True, max_jobs=1)
self.assertEqual(parent_job.get_status(), JobStatus.FAILED)
self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
q.empty()
# only enqueue dependent job when all dependencies have finished/failed
first_parent_job = q.enqueue(div_by_zero)
second_parent_job = q.enqueue(say_hello)
dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True)
job = q.enqueue_call(say_hello, depends_on=dependencies)
w.work(burst=True, max_jobs=1)
self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED)
self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
# When second job finishes, dependent job should be queued
w.work(burst=True, max_jobs=1)
self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED)
self.assertEqual(job.get_status(), JobStatus.QUEUED)
w.work(burst=True)
job = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.get_status(), JobStatus.FINISHED)

@ -9,7 +9,7 @@ from redis import WatchError
from rq.compat import as_text
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job
from rq.queue import Queue
from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry,
@ -434,6 +434,14 @@ class TestJob(RQTestCase):
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_dependency_parameter_constraints(self):
"""Ensures the proper constraints are in place for values passed in as job references."""
dep_job = Job.create(func=fixtures.say_hello)
# raise error on empty jobs
self.assertRaises(ValueError, Dependency, jobs=[])
# raise error on non-str/Job value in jobs iterable
self.assertRaises(ValueError, Dependency, jobs=[dep_job, 1])
def test_multiple_dependencies_are_accepted_and_persisted(self):
"""Ensure job._dependency_ids accepts different input formats, and
is set and restored properly"""
@ -456,6 +464,14 @@ class TestJob(RQTestCase):
[("A", "B"), ["A", "B"]],
[(job_A, job_B), ["A", "B"]],
[(job_A, "B"), ["A", "B"]],
[Dependency("A"), ["A"]],
[Dependency(job_A), ["A"]],
[Dependency(["A", "B"]), ["A", "B"]],
[Dependency([job_A, job_B]), ["A", "B"]],
[Dependency(["A", job_B]), ["A", "B"]],
[Dependency(("A", "B")), ["A", "B"]],
[Dependency((job_A, job_B)), ["A", "B"]],
[Dependency((job_A, "B")), ["A", "B"]],
]
for given, expected in cases:
job = Job.create(func=fixtures.say_hello, depends_on=given)
@ -865,19 +881,27 @@ class TestJob(RQTestCase):
queue = Queue(connection=self.testconn)
dependency = queue.enqueue(fixtures.raise_exc)
dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
print('# Post enqueue', self.testconn.smembers(dependency.dependents_key))
self.assertTrue(dependency.dependent_ids)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
w = Worker([queue])
w.work(burst=True, max_jobs=1)
self.assertTrue(dependency.dependent_ids)
print('# Post work', self.testconn.smembers(dependency.dependents_key))
dependency.refresh()
dependent.refresh()
self.assertEqual(0, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
self.assertEqual(1, len(queue.failed_job_registry))
print('# Pre cancel', self.testconn.smembers(dependency.dependents_key))
cancel_job(dependency.id, enqueue_dependents=True)
dependency.refresh()
dependent.refresh()
print('#Post cancel', self.testconn.smembers(dependency.dependents_key))
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(0, len(queue.deferred_job_registry))
self.assertEqual(0, len(queue.failed_job_registry))
@ -1119,14 +1143,14 @@ class TestJob(RQTestCase):
def test_dependencies_are_met_at_execution_time(self):
queue = Queue(connection=self.testconn)
queue.empty()
queue.enqueue(fixtures.say_hello, job_id="A")
queue.enqueue(fixtures.say_hello, job_id="B")
job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"])
job_C.dependencies_are_met()
w = Worker([queue])
w.work(burst=True)
assert job_C.result
def test_execution_order_with_sole_dependency(self):

@ -29,7 +29,7 @@ from tests.fixtures import (
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2
from rq.job import Job, JobStatus, Retry
from rq.job import Job, JobStatus, Dependency, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow
@ -468,7 +468,7 @@ class TestWorker(RQTestCase):
worker = Worker([queue])
# If job if configured to retry, it will be put back in the queue
# If job is configured to retry, it will be put back in the queue
# and not put in the FailedJobRegistry.
# This is the original execution
queue.empty()
@ -695,22 +695,6 @@ class TestWorker(RQTestCase):
self.assertEqual(job.is_finished, False)
self.assertEqual(job.is_failed, True)
def test_job_dependency(self):
"""Enqueue dependent jobs only if their parents don't fail"""
q = Queue()
w = Worker([q])
parent_job = q.enqueue(say_hello, result_ttl=0)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertEqual(job.get_status(), JobStatus.FINISHED)
parent_job = q.enqueue(div_by_zero)
job = q.enqueue_call(say_hello, depends_on=parent_job)
w.work(burst=True)
job = Job.fetch(job.id)
self.assertNotEqual(job.get_status(), JobStatus.FINISHED)
def test_get_current_job(self):
"""Ensure worker.get_current_job() works properly"""
q = Queue()

Loading…
Cancel
Save