From 75644ba9486ef3d8b17e8b9cf4dac91fab552375 Mon Sep 17 00:00:00 2001 From: Thomas Matecki Date: Mon, 21 Oct 2019 21:34:47 -0400 Subject: [PATCH] Multi Dependency Support [Internal API Changes] (#1147) * Convert `_dependency_id` to `_dependency_ids` Change `Job`s tracking from a single id of it's dependencies from a single _id_ to a list of _id_s. This change should be private to `Job` - especially leaving `Job#to_dict` and `Job#restore`s treatment of a single 'dependency_id' intact. This change modifies existing tests. * Remove reliance upon dependency property in tests ... use dependency.id not `_dependency_id` * Re-add assertions for Falsey Values * Add _dependency_id property For backwards compatibility with other libs such as django-rq and rq-scheduler --- rq/job.py | 29 +++++++++++++++++++++-------- tests/test_decorator.py | 14 +++++++++----- tests/test_job.py | 28 ++++++++++++++++------------ 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/rq/job.py b/rq/job.py index 7c97e20..4981f12 100644 --- a/rq/job.py +++ b/rq/job.py @@ -137,7 +137,7 @@ class Job(object): # dependency could be job instance or id if depends_on is not None: - job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on + job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] return job def get_status(self): @@ -169,16 +169,24 @@ class Job(object): def is_deferred(self): return self.get_status() == JobStatus.DEFERRED + @property + def _dependency_id(self): + """Returns the first item in self._dependency_ids. Present + preserve compatibility with third party packages.. + """ + if self._dependency_ids: + return self._dependency_ids[0] + @property def dependency(self): """Returns a job's dependency. To avoid repeated Redis fetches, we cache job.dependency as job._dependency. """ - if self._dependency_id is None: + if not self._dependency_ids: return None if hasattr(self, '_dependency'): return self._dependency - job = self.fetch(self._dependency_id, connection=self.connection) + job = self.fetch(self._dependency_ids[0], connection=self.connection) self._dependency = job return job @@ -328,7 +336,7 @@ class Job(object): self.failure_ttl = None self.ttl = None self._status = None - self._dependency_id = None + self._dependency_ids = [] self.meta = {} def __repr__(self): # noqa # pragma: no cover @@ -437,7 +445,10 @@ class Job(object): self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self._status = obj.get('status') if obj.get('status') else None - self._dependency_id = as_text(obj.get('dependency_id', None)) + + dependency_id = obj.get('dependency_id', None) + self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] + self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} @@ -497,8 +508,8 @@ class Job(object): obj['failure_ttl'] = self.failure_ttl if self._status is not None: obj['status'] = self._status - if self._dependency_id is not None: - obj['dependency_id'] = self._dependency_id + if self._dependency_ids: + obj['dependency_id'] = self._dependency_ids[0] if self.meta and include_meta: obj['meta'] = dumps(self.meta) if self.ttl: @@ -683,7 +694,9 @@ class Job(object): registry.add(self, pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection - connection.sadd(self.dependents_key_for(self._dependency_id), self.id) + for dependency_id in self._dependency_ids: + dependents_key = self.dependents_key_for(dependency_id) + connection.sadd(dependents_key, self.id) _job_stack = LocalStack() diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 5b7f38e..47c4226 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -4,11 +4,11 @@ from __future__ import (absolute_import, division, print_function, import mock from redis import Redis + from rq.decorators import job from rq.job import Job -from rq.worker import DEFAULT_RESULT_TTL from rq.queue import Queue - +from rq.worker import DEFAULT_RESULT_TTL from tests import RQTestCase from tests.fixtures import decorated_job @@ -109,11 +109,12 @@ class TestDecorator(RQTestCase): bar_job = bar.delay() + self.assertEqual(foo_job._dependency_ids,[]) self.assertIsNone(foo_job._dependency_id) + self.assertEqual(foo_job.dependency, None) self.assertEqual(bar_job.dependency, foo_job) - - self.assertEqual(bar_job._dependency_id, foo_job.id) + self.assertEqual(bar_job.dependency.id, foo_job.id) def test_decorator_delay_accepts_depends_on_as_argument(self): """Ensure that passing in depends_on to the delay method of @@ -145,8 +146,11 @@ class TestDecorator(RQTestCase): self.assertIsNone(foo_job._dependency_id) self.assertIsNone(bar_job._dependency_id) - self.assertEqual(baz_job.dependency, bar_job) + self.assertEqual(foo_job._dependency_ids,[]) + self.assertEqual(bar_job._dependency_ids,[]) self.assertEqual(baz_job._dependency_id, bar_job.id) + self.assertEqual(baz_job.dependency, bar_job) + self.assertEqual(baz_job.dependency.id, bar_job.id) @mock.patch('rq.queue.resolve_connection') def test_decorator_connection_laziness(self, resolve_connection): diff --git a/tests/test_job.py b/tests/test_job.py index a990dd7..95a2358 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -2,28 +2,28 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -from datetime import datetime - -import time import sys +import time import zlib - -is_py2 = sys.version[0] == '2' -if is_py2: - import Queue as queue -else: - import queue as queue - -from tests import fixtures, RQTestCase +from datetime import datetime from rq.compat import PY2, as_text from rq.exceptions import NoSuchJobError, UnpickleError -from rq.job import Job, get_current_job, JobStatus, cancel_job +from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry) from rq.utils import utcformat from rq.worker import Worker +from tests import RQTestCase, fixtures + +is_py2 = sys.version[0] == '2' +if is_py2: + import Queue as queue +else: + import queue as queue + + try: from cPickle import loads, dumps @@ -227,12 +227,16 @@ class TestJob(RQTestCase): job.save() stored_job = Job.fetch(job.id) self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job._dependency_ids, [parent_job.id]) + self.assertEqual(stored_job.dependency.id, parent_job.id) self.assertEqual(stored_job.dependency, parent_job) job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id) job.save() stored_job = Job.fetch(job.id) self.assertEqual(stored_job._dependency_id, parent_job.id) + self.assertEqual(stored_job._dependency_ids, [parent_job.id]) + self.assertEqual(stored_job.dependency.id, parent_job.id) self.assertEqual(stored_job.dependency, parent_job) def test_store_then_fetch(self):