diff --git a/docs/docs/index.md b/docs/docs/index.md index e39f9b8..0748846 100644 --- a/docs/docs/index.md +++ b/docs/docs/index.md @@ -155,9 +155,33 @@ baz_job = queue.enqueue(baz, depends_on=[foo_job, bar_job]) ``` The ability to handle job dependencies allows you to split a big job into -several smaller ones. A job that is dependent on another is enqueued only when +several smaller ones. By default, a job that is dependent on another is enqueued only when its dependency finishes *successfully*. +_New in 1.11.0._ + +If you want a job's dependencies to execute regardless if the job completes or fails, RQ provides +the `Dependency` class that will allow you to dictate how to handle job failures. + +The `Dependency(jobs=...)` parameter accepts: +- a string representing a single job id +- a Job object +- an iteratable of job id strings and/or Job objects + +Example: + +```python +from redis import Redis +from rq.job import Dependency +from rq import Queue + +queue = Queue(connection=Redis()) +job_1 = queue.enqueue(div_by_zero) +dependency = Dependency(jobs=[job_1], allow_failure=True) # allow_failure defaults to False +job_2 = queue.enqueue(say_hello, depends_on=dependency) +# job_2 will execute even though its dependency (job_1) fails +``` + ## Job Callbacks _New in version 1.9.0._ diff --git a/rq/job.py b/rq/job.py index 5ac40dd..fe8df0d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -38,6 +38,22 @@ class JobStatus(str, Enum): CANCELED = 'canceled' +class Dependency: + def __init__(self, jobs, allow_failure: bool = False): + jobs = ensure_list(jobs) + if not all( + isinstance(job, Job) or isinstance(job, str) + for job in jobs + if job + ): + raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") + elif len(jobs) < 1: + raise ValueError("jobs: cannot be empty.") + + self.dependencies = jobs + self.allow_failure = allow_failure + + # Sentinel value to mark that some of our lazily evaluated properties have not # yet been evaluated. UNEVALUATED = object() @@ -134,8 +150,16 @@ class Job: # dependency could be job instance or id, or iterable thereof if depends_on is not None: - job._dependency_ids = [dep.id if isinstance(dep, Job) else dep - for dep in ensure_list(depends_on)] + if isinstance(depends_on, Dependency): + job.allow_dependency_failures = depends_on.allow_failure + depends_on_list = depends_on.dependencies + else: + depends_on_list = ensure_list(depends_on) + job._dependency_ids = [ + dep.id if isinstance(dep, Job) else dep + for dep in depends_on_list + ] + return job def get_position(self): @@ -404,6 +428,7 @@ class Job: self.retry_intervals = None self.redis_server_version = None self.last_heartbeat = None + self.allow_dependency_failures = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -560,7 +585,7 @@ class Job: dep_id = obj.get('dependency_id') # for backwards compatibility self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []) - + self.allow_dependency_failures = bool(int(obj.get('allow_dependency_failures'))) if obj.get('allow_dependency_failures') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} @@ -640,6 +665,10 @@ class Job: if self.ttl: obj['ttl'] = self.ttl + if self.allow_dependency_failures is not None: + # convert boolean to integer to avoid redis.exception.DataError + obj["allow_dependency_failures"] = int(self.allow_dependency_failures) + return obj def save(self, pipeline=None, include_meta=True): @@ -685,12 +714,12 @@ class Job: You can enqueue the jobs dependents optionally, Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in. """ - if self.is_canceled: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) from .registry import CanceledJobRegistry from .queue import Queue pipe = pipeline or self.connection.pipeline() + while True: try: q = Queue( @@ -699,6 +728,8 @@ class Job: job_class=self.__class__, serializer=self.serializer ) + + self.set_status(JobStatus.CANCELED, pipeline=pipe) if enqueue_dependents: # Only WATCH if no pipeline passed, otherwise caller is responsible if pipeline is None: @@ -709,13 +740,11 @@ class Job: remove_from_queue=True ) - self.set_status(JobStatus.CANCELED, pipeline=pipe) - registry = CanceledJobRegistry( self.origin, self.connection, job_class=self.__class__, - serializer=self.serializer + serializer=self.serializer ) registry.add(self, pipeline=pipe) if pipeline is None: @@ -726,7 +755,7 @@ class Job: continue else: # if the pipeline comes from the caller, we re-raise the - # exception as it it the responsibility of the caller to + # exception as it is the responsibility of the caller to # handle it raise @@ -953,17 +982,16 @@ class Job: return [Job.key_for(_id.decode()) for _id in dependencies] - def dependencies_are_met(self, exclude_job_id=None, pipeline=None): - """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ + def dependencies_are_met(self, parent_job=None, pipeline=None): + """Returns a boolean indicating if all of this job's dependencies are _FINISHED_ If a pipeline is passed, all dependencies are WATCHed. - `exclude` allows us to exclude some job id from the status check. This is useful - when enqueueing the dependents of a _successful_ job -- that status of + `parent_job` allows us to directly pass parent_job for the status check. + This is useful when enqueueing the dependents of a _successful_ job -- that status of `FINISHED` may not be yet set in redis, but said job is indeed _done_ and this - method is _called_ in the _stack_ of it's dependents are being enqueued. + method is _called_ in the _stack_ of its dependents are being enqueued. """ - connection = pipeline if pipeline is not None else self.connection if pipeline is not None: @@ -973,8 +1001,19 @@ class Job: dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} - if exclude_job_id: - dependencies_ids.discard(exclude_job_id) + if parent_job: + # If parent job is canceled, no need to check for status + # If parent job is not finished, we should only continue + # if this job allows parent job to fail + dependencies_ids.discard(parent_job.id) + if parent_job._status == JobStatus.CANCELED: + pass + elif parent_job._status == JobStatus.FAILED and not self.allow_dependency_failures: + return False + + # If the only dependency is parent job, dependency has been met + if not dependencies_ids: + return True with connection.pipeline() as pipeline: for key in dependencies_ids: @@ -982,8 +1021,13 @@ class Job: dependencies_statuses = pipeline.execute() + if self.allow_dependency_failures: + allowed_statuses = [JobStatus.FINISHED, JobStatus.FAILED] + else: + allowed_statuses = [JobStatus.FINISHED] + return all( - status.decode() == JobStatus.FINISHED + status.decode() in allowed_statuses for status in dependencies_statuses if status diff --git a/rq/queue.py b/rq/queue.py index 67c91a7..f7e6e3f 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -11,7 +11,7 @@ from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError -from .job import Job, JobStatus +from .job import Job, JobStatus, Dependency from .serializers import resolve_serializer from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow @@ -48,6 +48,7 @@ class Queue: return cls.from_queue_key(as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer) + return [to_queue(rq_key) for rq_key in connection.smembers(cls.redis_queues_keys) if rq_key] @@ -324,6 +325,9 @@ class Queue: job.retries_left = retry.max job.retry_intervals = retry.intervals + if isinstance(depends_on, Dependency): + job.allow_dependency_failures = depends_on.allow_failure + return job def setup_dependencies( @@ -386,9 +390,8 @@ class Queue: result_ttl=None, ttl=None, failure_ttl=None, description=None, depends_on=None, job_id=None, at_front=False, meta=None, retry=None, on_success=None, on_failure=None, pipeline=None): - """Creates a job to represent the delayed function call and enqueues - it. -nd + """Creates a job to represent the delayed function call and enqueues it. + It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. @@ -611,14 +614,19 @@ nd dependents_key = job.dependents_key while True: + try: # if a pipeline is passed, the caller is responsible for calling WATCH # to ensure all jobs are enqueued if pipeline is None: pipe.watch(dependents_key) - dependent_job_ids = [as_text(_id) - for _id in pipe.smembers(dependents_key)] + dependent_job_ids = {as_text(_id) + for _id in pipe.smembers(dependents_key)} + + # There's no dependents + if not dependent_job_ids: + break jobs_to_enqueue = [ dependent_job for dependent_job @@ -627,13 +635,16 @@ nd connection=self.connection, serializer=self.serializer ) if dependent_job and dependent_job.dependencies_are_met( - exclude_job_id=job.id, - pipeline=pipe + parent_job=job, + pipeline=pipe, ) ] pipe.multi() + if not jobs_to_enqueue: + break + for dependent in jobs_to_enqueue: registry = DeferredJobRegistry(dependent.origin, self.connection, @@ -646,11 +657,15 @@ nd queue = self.__class__(name=dependent.origin, connection=self.connection) queue.enqueue_job(dependent, pipeline=pipe) - pipe.delete(dependents_key) + # Only delete dependents_key if all dependents have been enqueued + if len(jobs_to_enqueue) == len(dependent_job_ids): + pipe.delete(dependents_key) + else: + enqueued_job_ids = [job.id for job in jobs_to_enqueue] + pipe.srem(dependents_key, *enqueued_job_ids) if pipeline is None: pipe.execute() - break except WatchError: if pipeline is None: diff --git a/rq/worker.py b/rq/worker.py index ff8f071..11a0acd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -18,7 +18,7 @@ try: from signal import SIGKILL except ImportError: from signal import SIGTERM as SIGKILL - +from contextlib import suppress import redis.exceptions from . import worker_registration @@ -54,7 +54,6 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') - logger = logging.getLogger(__name__) @@ -226,7 +225,11 @@ class Worker: ) self.ip_address = 'unknown' else: - self.ip_address = [client['addr'] for client in connection.client_list() if client['name'] == self.name][0] + self.ip_address = [ + client['addr'] + for client in connection.client_list() + if client['name'] == self.name + ][0] else: self.hostname = None self.pid = None @@ -971,6 +974,8 @@ class Worker: job_class=self.job_class, serializer=job.serializer) failed_job_registry.add(job, ttl=job.failure_ttl, exc_string=exc_string, pipeline=pipeline) + with suppress(redis.exceptions.ConnectionError): + pipeline.execute() self.set_current_job_id(None, pipeline=pipeline) self.increment_failed_job_count(pipeline) @@ -981,9 +986,14 @@ class Worker: if retry: job.retry(queue, pipeline) + enqueue_dependents = False + else: + enqueue_dependents = True try: pipeline.execute() + if enqueue_dependents: + queue.enqueue_dependents(job) except Exception: # Ensure that custom exception handlers are called # even if Redis is down @@ -991,6 +1001,7 @@ class Worker: def handle_job_success(self, job, queue, started_job_registry): self.log.debug('Handling successful execution of job %s', job.id) + with self.connection.pipeline() as pipeline: while True: try: @@ -1252,6 +1263,7 @@ class RoundRobinWorker(Worker): """ Modified version of Worker that dequeues jobs from the queues using a round-robin strategy. """ + def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1] diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py new file mode 100644 index 0000000..6d2f776 --- /dev/null +++ b/tests/test_dependencies.py @@ -0,0 +1,99 @@ +from tests import RQTestCase +from tests.fixtures import div_by_zero, say_hello + +from rq import Queue, SimpleWorker +from rq.job import Job, JobStatus, Dependency + + +class TestDependencies(RQTestCase): + + def test_allow_failure_is_persisted(self): + """Ensure that job.allow_dependency_failures is properly set + when providing Dependency object to depends_on.""" + dep_job = Job.create(func=say_hello) + + # default to False, maintaining current behavior + job = Job.create(func=say_hello, depends_on=Dependency([dep_job])) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertFalse(job.allow_dependency_failures) + + job = Job.create(func=say_hello, depends_on=Dependency([dep_job], allow_failure=True)) + job.save() + job = Job.fetch(job.id, connection=self.testconn) + self.assertTrue(job.allow_dependency_failures) + + jobs = Job.fetch_many([job.id], connection=self.testconn) + self.assertTrue(jobs[0].allow_dependency_failures) + + def test_job_dependency(self): + """Enqueue dependent jobs only when appropriate""" + q = Queue(connection=self.testconn) + w = SimpleWorker([q], connection=q.connection) + + # enqueue dependent job when parent successfully finishes + parent_job = q.enqueue(say_hello) + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + q.empty() + + # don't enqueue dependent job when parent fails + parent_job = q.enqueue(div_by_zero) + job = q.enqueue_call(say_hello, depends_on=parent_job) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) + q.empty() + + # don't enqueue dependent job when Dependency.allow_failure=False (the default) + parent_job = q.enqueue(div_by_zero) + dependency = Dependency(jobs=parent_job) + job = q.enqueue_call(say_hello, depends_on=dependency) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertNotEqual(job.get_status(), JobStatus.FINISHED) + + # enqueue dependent job when Dependency.allow_failure=True + parent_job = q.enqueue(div_by_zero) + dependency = Dependency(jobs=parent_job, allow_failure=True) + job = q.enqueue_call(say_hello, depends_on=dependency) + + job = Job.fetch(job.id, connection=self.testconn) + self.assertTrue(job.allow_dependency_failures) + + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) + + # When a failing job has multiple dependents, only enqueue those + # with allow_failure=True + parent_job = q.enqueue(div_by_zero) + job_allow_failure = q.enqueue(say_hello, + depends_on=Dependency(jobs=parent_job, allow_failure=True)) + job = q.enqueue(say_hello, + depends_on=Dependency(jobs=parent_job, allow_failure=False)) + w.work(burst=True, max_jobs=1) + self.assertEqual(parent_job.get_status(), JobStatus.FAILED) + self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + q.empty() + + # only enqueue dependent job when all dependencies have finished/failed + first_parent_job = q.enqueue(div_by_zero) + second_parent_job = q.enqueue(say_hello) + dependencies = Dependency(jobs=[first_parent_job, second_parent_job], allow_failure=True) + job = q.enqueue_call(say_hello, depends_on=dependencies) + w.work(burst=True, max_jobs=1) + self.assertEqual(first_parent_job.get_status(), JobStatus.FAILED) + self.assertEqual(second_parent_job.get_status(), JobStatus.QUEUED) + self.assertEqual(job.get_status(), JobStatus.DEFERRED) + + # When second job finishes, dependent job should be queued + w.work(burst=True, max_jobs=1) + self.assertEqual(second_parent_job.get_status(), JobStatus.FINISHED) + self.assertEqual(job.get_status(), JobStatus.QUEUED) + w.work(burst=True) + job = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.get_status(), JobStatus.FINISHED) diff --git a/tests/test_job.py b/tests/test_job.py index 7dccda3..6106afb 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -9,7 +9,7 @@ from redis import WatchError from rq.compat import as_text from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError -from rq.job import Job, JobStatus, cancel_job, get_current_job +from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job from rq.queue import Queue from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, @@ -434,6 +434,14 @@ class TestJob(RQTestCase): Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + def test_dependency_parameter_constraints(self): + """Ensures the proper constraints are in place for values passed in as job references.""" + dep_job = Job.create(func=fixtures.say_hello) + # raise error on empty jobs + self.assertRaises(ValueError, Dependency, jobs=[]) + # raise error on non-str/Job value in jobs iterable + self.assertRaises(ValueError, Dependency, jobs=[dep_job, 1]) + def test_multiple_dependencies_are_accepted_and_persisted(self): """Ensure job._dependency_ids accepts different input formats, and is set and restored properly""" @@ -456,6 +464,14 @@ class TestJob(RQTestCase): [("A", "B"), ["A", "B"]], [(job_A, job_B), ["A", "B"]], [(job_A, "B"), ["A", "B"]], + [Dependency("A"), ["A"]], + [Dependency(job_A), ["A"]], + [Dependency(["A", "B"]), ["A", "B"]], + [Dependency([job_A, job_B]), ["A", "B"]], + [Dependency(["A", job_B]), ["A", "B"]], + [Dependency(("A", "B")), ["A", "B"]], + [Dependency((job_A, job_B)), ["A", "B"]], + [Dependency((job_A, "B")), ["A", "B"]], ] for given, expected in cases: job = Job.create(func=fixtures.say_hello, depends_on=given) @@ -865,19 +881,27 @@ class TestJob(RQTestCase): queue = Queue(connection=self.testconn) dependency = queue.enqueue(fixtures.raise_exc) dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency) + print('# Post enqueue', self.testconn.smembers(dependency.dependents_key)) + self.assertTrue(dependency.dependent_ids) self.assertEqual(1, len(queue.get_jobs())) self.assertEqual(1, len(queue.deferred_job_registry)) w = Worker([queue]) w.work(burst=True, max_jobs=1) + self.assertTrue(dependency.dependent_ids) + print('# Post work', self.testconn.smembers(dependency.dependents_key)) dependency.refresh() dependent.refresh() self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(1, len(queue.deferred_job_registry)) self.assertEqual(1, len(queue.failed_job_registry)) + + print('# Pre cancel', self.testconn.smembers(dependency.dependents_key)) cancel_job(dependency.id, enqueue_dependents=True) dependency.refresh() dependent.refresh() + print('#Post cancel', self.testconn.smembers(dependency.dependents_key)) + self.assertEqual(1, len(queue.get_jobs())) self.assertEqual(0, len(queue.deferred_job_registry)) self.assertEqual(0, len(queue.failed_job_registry)) @@ -1119,14 +1143,14 @@ class TestJob(RQTestCase): def test_dependencies_are_met_at_execution_time(self): queue = Queue(connection=self.testconn) - + queue.empty() queue.enqueue(fixtures.say_hello, job_id="A") queue.enqueue(fixtures.say_hello, job_id="B") job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"]) + job_C.dependencies_are_met() w = Worker([queue]) w.work(burst=True) - assert job_C.result def test_execution_order_with_sole_dependency(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index 7bc8fc0..936658e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -29,7 +29,7 @@ from tests.fixtures import ( from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.compat import as_text, PY2 -from rq.job import Job, JobStatus, Retry +from rq.job import Job, JobStatus, Dependency, Retry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.suspension import resume, suspend from rq.utils import utcnow @@ -468,7 +468,7 @@ class TestWorker(RQTestCase): worker = Worker([queue]) - # If job if configured to retry, it will be put back in the queue + # If job is configured to retry, it will be put back in the queue # and not put in the FailedJobRegistry. # This is the original execution queue.empty() @@ -695,22 +695,6 @@ class TestWorker(RQTestCase): self.assertEqual(job.is_finished, False) self.assertEqual(job.is_failed, True) - def test_job_dependency(self): - """Enqueue dependent jobs only if their parents don't fail""" - q = Queue() - w = Worker([q]) - parent_job = q.enqueue(say_hello, result_ttl=0) - job = q.enqueue_call(say_hello, depends_on=parent_job) - w.work(burst=True) - job = Job.fetch(job.id) - self.assertEqual(job.get_status(), JobStatus.FINISHED) - - parent_job = q.enqueue(div_by_zero) - job = q.enqueue_call(say_hello, depends_on=parent_job) - w.work(burst=True) - job = Job.fetch(job.id) - self.assertNotEqual(job.get_status(), JobStatus.FINISHED) - def test_get_current_job(self): """Ensure worker.get_current_job() works properly""" q = Queue()