From 59d1b40d142ac0d70e07d306f3aea3d7ebae9417 Mon Sep 17 00:00:00 2001 From: skieffer Date: Tue, 2 Feb 2021 06:22:15 -0500 Subject: [PATCH] Multidependencies (#1397) * Also accept lists and tuples as value of `depends_on`. * The elements of the lists/tuples may be either Jobs or Job IDs. * A single Job / Job ID is still accepted as well. * Represent _all_ job dependencies in `Job.to_dict()`. We now represent the entire list, instead of just the first element. * Fix some doctext regarding plurality of dependencies. * Add unit tests for job dependencies. * One unit test establishes a pattern for checking execution order as affected by dependencies. * Another unit test applies this pattern to the new capability to name multiple dependencies. * Add unit test for new `depends_on` input formats. Also test that these are properly persisted. * Repair `Job.restore()`. Need to convert bytes back to strings when reloading `dependency_ids`. * Maintain backwards compat. in `Job.to_dict()`. Keep the old `dependency_id` (singular) key. * Provide coverage for new test fixture. * Simplify some code. Cut some superfluous `as_text()` calls left over from an earlier commit. * Check for `dependency_id` in `Job.restore()` for backwd. compat. Also eliminate use of `as_text()` here, in favor of `.decode()`. * Switch to snake case instead of camel case. * Eliminate some usages of `as_text()`. Also cut some `print` statements. * Cleanup. * Accept arbitrary iterables for `Job`'s `depends_on` kwarg. Instead of requiring a list or tuple, we now make use of `ensure_list()`. * Add test fixtures. * Provide a system to get two workers working simultaneously, using `multiprocessing`. * Define a simple job that just says whether its dependencies are met. * In `rpush`, make an option to record the name of the worker. * Improve unit tests on execution order with dependencies. These now actually have two workers going, which makes a more thorough test. * Add unit test examining `Job.dependencies_are_met()` at execution time. * Redesign dependency execution order unit tests. * Simplify assertions. * Improve doctext and formatting. * Move fixture tests to new, dedicated module `test_fixtures.py`. * Use `enqueue` instead of `enqueue_call` in new unit tests. --- rq/job.py | 26 ++++++----- tests/fixtures.py | 59 ++++++++++++++++++++++- tests/test_fixtures.py | 20 ++++++++ tests/test_job.py | 103 ++++++++++++++++++++++++++++++++++++++++- 4 files changed, 195 insertions(+), 13 deletions(-) create mode 100644 tests/test_fixtures.py diff --git a/rq/job.py b/rq/job.py index 29e9fc2..6075132 100644 --- a/rq/job.py +++ b/rq/job.py @@ -19,7 +19,7 @@ from .exceptions import NoSuchJobError from .local import LocalStack from .serializers import resolve_serializer from .utils import (enum, get_version, import_attribute, parse_timeout, str_to_date, - utcformat, utcnow) + utcformat, utcnow, ensure_list) # Serialize pickle dumps using the highest pickle protocol (binary, default # uses ascii) @@ -126,9 +126,10 @@ class Job(object): job._status = status job.meta = meta or {} - # dependency could be job instance or id + # dependency could be job instance or id, or iterable thereof if depends_on is not None: - job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] + job._dependency_ids = [dep.id if isinstance(dep, Job) else dep + for dep in ensure_list(depends_on)] return job def get_position(self): @@ -175,7 +176,7 @@ class Job(object): @property def _dependency_id(self): - """Returns the first item in self._dependency_ids. Present + """Returns the first item in self._dependency_ids. Present to preserve compatibility with third party packages.. """ if self._dependency_ids: @@ -183,7 +184,7 @@ class Job(object): @property def dependency(self): - """Returns a job's dependency. To avoid repeated Redis fetches, we cache + """Returns a job's first dependency. To avoid repeated Redis fetches, we cache job.dependency as job._dependency. """ if not self._dependency_ids: @@ -498,8 +499,10 @@ class Job(object): self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self._status = obj.get('status').decode() if obj.get('status') else None - dependency_id = obj.get('dependency_id', None) - self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] + dep_ids = obj.get('dependency_ids') + dep_id = obj.get('dependency_id') # for backwards compatibility + self._dependency_ids = ( json.loads(dep_ids.decode()) if dep_ids + else [dep_id.decode()] if dep_id else [] ) self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = self.serializer.loads(obj.get('meta')) if obj.get('meta') else {} @@ -571,7 +574,8 @@ class Job(object): if self._status is not None: obj['status'] = self._status if self._dependency_ids: - obj['dependency_id'] = self._dependency_ids[0] + obj['dependency_id'] = self._dependency_ids[0] # for backwards compatibility + obj['dependency_ids'] = json.dumps(self._dependency_ids) if self.meta and include_meta: obj['meta'] = self.serializer.dumps(self.meta) if self.ttl: @@ -786,14 +790,14 @@ class Job(object): return self.retry_intervals[index] def register_dependency(self, pipeline=None): - """Jobs may have dependencies. Jobs are enqueued only if the job they - depend on is successfully performed. We record this relation as + """Jobs may have dependencies. Jobs are enqueued only if the jobs they + depend on are successfully performed. We record this relation as a reverse dependency (a Redis set), with a key that looks something like: rq:job:job_id:dependents = {'job_id_1', 'job_id_2'} - This method adds the job in its dependency's dependents set + This method adds the job in its dependencies' dependents sets, and adds the job to DeferredJobRegistry. """ from .registry import DeferredJobRegistry diff --git a/tests/fixtures.py b/tests/fixtures.py index e50ecb0..b2d4af1 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -11,11 +11,14 @@ import time import signal import sys import subprocess +import contextlib +from multiprocessing import Process +from redis import Redis from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job from rq.compat import text_type -from rq.worker import HerokuWorker +from rq.worker import HerokuWorker, Worker def say_pid(): @@ -55,6 +58,18 @@ def some_calculation(x, y, z=1): """ return x * y / z +def rpush(key, value, append_worker_name=False, sleep=0): + """Push a value into a list in Redis. Useful for detecting the order in + which jobs were executed.""" + if sleep: + time.sleep(sleep) + if append_worker_name: + value += ':' + get_current_job().worker_name + redis = get_current_connection() + redis.rpush(key, value) + +def check_dependencies_are_met(): + return get_current_job().dependencies_are_met() def create_file(path): """Creates a file at the given path. Actually, leaves evidence that the @@ -196,3 +211,45 @@ class Serializer(object): def dumps(self): pass + +def start_worker(queue_name, conn_kwargs, worker_name, burst): + """ + Start a worker. We accept only serializable args, so that this can be + executed via multiprocessing. + """ + # Silence stdout (thanks to ) + with open(os.devnull, 'w') as devnull: + with contextlib.redirect_stdout(devnull): + w = Worker([queue_name], name=worker_name, connection=Redis(**conn_kwargs)) + w.work(burst=burst) + +def start_worker_process(queue_name, connection=None, worker_name=None, burst=False): + """ + Use multiprocessing to start a new worker in a separate process. + """ + connection = connection or get_current_connection() + conn_kwargs = connection.connection_pool.connection_kwargs + p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst)) + p.start() + return p + +def burst_two_workers(queue, timeout=2, tries=5, pause=0.1): + """ + Get two workers working simultaneously in burst mode, on a given queue. + Return after both workers have finished handling jobs, up to a fixed timeout + on the worker that runs in another process. + """ + w1 = start_worker_process(queue.name, worker_name='w1', burst=True) + w2 = Worker(queue, name='w2') + jobs = queue.jobs + if jobs: + first_job = jobs[0] + # Give the first worker process time to get started on the first job. + # This is helpful in tests where we want to control which worker takes which job. + n = 0 + while n < tries and not first_job.is_started: + time.sleep(pause) + n += 1 + # Now can start the second worker. + w2.work(burst=True) + w1.join(timeout) diff --git a/tests/test_fixtures.py b/tests/test_fixtures.py new file mode 100644 index 0000000..1f98a58 --- /dev/null +++ b/tests/test_fixtures.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +from rq import Queue + +from tests import RQTestCase, fixtures + + +class TestFixtures(RQTestCase): + def test_rpush_fixture(self): + fixtures.rpush('foo', 'bar') + assert self.testconn.lrange('foo', 0, 0)[0].decode() == 'bar' + + def test_start_worker_fixture(self): + queue = Queue(name='testing', connection=self.testconn) + queue.enqueue(fixtures.say_hello) + conn_kwargs = self.testconn.connection_pool.connection_kwargs + fixtures.start_worker(queue.name, conn_kwargs, 'w1', True) + assert not queue.jobs diff --git a/tests/test_job.py b/tests/test_job.py index f990afd..8b921d7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -432,6 +432,35 @@ class TestJob(RQTestCase): Job.fetch(job.id, connection=self.testconn) self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')") + def test_multiple_dependencies_are_accepted_and_persisted(self): + """Ensure job._dependency_ids accepts different input formats, and + is set and restored properly""" + job_A = Job.create(func=fixtures.some_calculation, args=(3, 1, 4), id="A") + job_B = Job.create(func=fixtures.some_calculation, args=(2, 7, 2), id="B") + + # No dependencies + job = Job.create(func=fixtures.say_hello) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job._dependency_ids, []) + + # Various ways of specifying dependencies + cases = [ + ["A", ["A"]], + [job_A, ["A"]], + [["A", "B"], ["A", "B"]], + [[job_A, job_B], ["A", "B"]], + [["A", job_B], ["A", "B"]], + [("A", "B"), ["A", "B"]], + [(job_A, job_B), ["A", "B"]], + [(job_A, "B"), ["A", "B"]], + ] + for given, expected in cases: + job = Job.create(func=fixtures.say_hello, depends_on=given) + job.save() + Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job._dependency_ids, expected) + def test_prepare_for_execution(self): """job.prepare_for_execution works properly""" job = Job.create(func=fixtures.say_hello) @@ -943,7 +972,79 @@ class TestJob(RQTestCase): assert dependent_job.dependencies_are_met() assert dependent_job.get_status() == JobStatus.QUEUED - + + def test_dependencies_are_met_at_execution_time(self): + queue = Queue(connection=self.testconn) + + queue.enqueue(fixtures.say_hello, job_id="A") + queue.enqueue(fixtures.say_hello, job_id="B") + job_C = queue.enqueue(fixtures.check_dependencies_are_met, job_id="C", depends_on=["A", "B"]) + + w = Worker([queue]) + w.work(burst=True) + + assert job_C.result + + def test_execution_order_with_sole_dependency(self): + queue = Queue(connection=self.testconn) + key = 'test_job:job_order' + + # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. + # Worker 1 will be busy with the slow job, so worker 2 will complete both fast jobs. + job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job') + job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True]) + job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True]) + fixtures.burst_two_workers(queue) + time.sleep(0.75) + jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)] + self.assertEqual(queue.count, 0) + self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B])) + self.assertEqual(jobs_completed, ["A:w2", "B:w2", "slow:w1"]) + self.testconn.delete(key) + + # When job "A" depends on the slow job, then job "B" finishes before "A". + # There is no clear requirement on which worker should take job "A", so we stay silent on that. + job_slow = queue.enqueue(fixtures.rpush, args=[key, "slow", True, 0.5], job_id='slow_job') + job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on='slow_job') + job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True]) + fixtures.burst_two_workers(queue) + time.sleep(0.75) + jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 2)] + self.assertEqual(queue.count, 0) + self.assertTrue(all(job.is_finished for job in [job_slow, job_A, job_B])) + self.assertEqual(jobs_completed, ["B:w2", "slow:w1", "A"]) + + def test_execution_order_with_dual_dependency(self): + queue = Queue(connection=self.testconn) + key = 'test_job:job_order' + + # When there are no dependencies, the two fast jobs ("A" and "B") run in the order enqueued. + job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1') + job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2') + job_A = queue.enqueue(fixtures.rpush, args=[key, "A", True]) + job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True]) + fixtures.burst_two_workers(queue) + time.sleep(1) + jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)] + self.assertEqual(queue.count, 0) + self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) + self.assertEqual(jobs_completed, ["slow_1:w1", "A:w1", "B:w1", "slow_2:w2"]) + self.testconn.delete(key) + + # This time job "A" depends on two slow jobs, while job "B" depends only on the faster of + # the two. Job "B" should be completed before job "A". + # There is no clear requirement on which worker should take job "A", so we stay silent on that. + job_slow_1 = queue.enqueue(fixtures.rpush, args=[key, "slow_1", True, 0.5], job_id='slow_1') + job_slow_2 = queue.enqueue(fixtures.rpush, args=[key, "slow_2", True, 0.75], job_id='slow_2') + job_A = queue.enqueue(fixtures.rpush, args=[key, "A", False], depends_on=['slow_1', 'slow_2']) + job_B = queue.enqueue(fixtures.rpush, args=[key, "B", True], depends_on=['slow_1']) + fixtures.burst_two_workers(queue) + time.sleep(1) + jobs_completed = [v.decode() for v in self.testconn.lrange(key, 0, 3)] + self.assertEqual(queue.count, 0) + self.assertTrue(all(job.is_finished for job in [job_slow_1, job_slow_2, job_A, job_B])) + self.assertEqual(jobs_completed, ["slow_1:w1", "B:w1", "slow_2:w2", "A"]) + def test_retry(self): """Retry parses `max` and `interval` correctly""" retry = Retry(max=1)