Multi Dependency Support [Internal API Changes] (#1147)

* Convert `_dependency_id` to `_dependency_ids`

Change `Job`s tracking from a single id of it's dependencies from a single _id_ to a list of _id_s. This change should be private to `Job` - especially leaving `Job#to_dict` and `Job#restore`s treatment of a single 'dependency_id' intact.

This change modifies existing tests.

* Remove reliance upon dependency property in tests

... use dependency.id not  `_dependency_id`

* Re-add assertions for Falsey Values

* Add _dependency_id property

For backwards compatibility with other libs such as django-rq and rq-scheduler
main
Thomas Matecki 5 years ago committed by Selwin Ong
parent cfc02816ea
commit 75644ba948

@ -137,7 +137,7 @@ class Job(object):
# dependency could be job instance or id # dependency could be job instance or id
if depends_on is not None: if depends_on is not None:
job._dependency_id = depends_on.id if isinstance(depends_on, Job) else depends_on job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
return job return job
def get_status(self): def get_status(self):
@ -169,16 +169,24 @@ class Job(object):
def is_deferred(self): def is_deferred(self):
return self.get_status() == JobStatus.DEFERRED return self.get_status() == JobStatus.DEFERRED
@property
def _dependency_id(self):
"""Returns the first item in self._dependency_ids. Present
preserve compatibility with third party packages..
"""
if self._dependency_ids:
return self._dependency_ids[0]
@property @property
def dependency(self): def dependency(self):
"""Returns a job's dependency. To avoid repeated Redis fetches, we cache """Returns a job's dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency. job.dependency as job._dependency.
""" """
if self._dependency_id is None: if not self._dependency_ids:
return None return None
if hasattr(self, '_dependency'): if hasattr(self, '_dependency'):
return self._dependency return self._dependency
job = self.fetch(self._dependency_id, connection=self.connection) job = self.fetch(self._dependency_ids[0], connection=self.connection)
self._dependency = job self._dependency = job
return job return job
@ -328,7 +336,7 @@ class Job(object):
self.failure_ttl = None self.failure_ttl = None
self.ttl = None self.ttl = None
self._status = None self._status = None
self._dependency_id = None self._dependency_ids = []
self.meta = {} self.meta = {}
def __repr__(self): # noqa # pragma: no cover def __repr__(self): # noqa # pragma: no cover
@ -437,7 +445,10 @@ class Job(object):
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None self._status = obj.get('status') if obj.get('status') else None
self._dependency_id = as_text(obj.get('dependency_id', None))
dependency_id = obj.get('dependency_id', None)
self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None self.ttl = int(obj.get('ttl')) if obj.get('ttl') else None
self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {}
@ -497,8 +508,8 @@ class Job(object):
obj['failure_ttl'] = self.failure_ttl obj['failure_ttl'] = self.failure_ttl
if self._status is not None: if self._status is not None:
obj['status'] = self._status obj['status'] = self._status
if self._dependency_id is not None: if self._dependency_ids:
obj['dependency_id'] = self._dependency_id obj['dependency_id'] = self._dependency_ids[0]
if self.meta and include_meta: if self.meta and include_meta:
obj['meta'] = dumps(self.meta) obj['meta'] = dumps(self.meta)
if self.ttl: if self.ttl:
@ -683,7 +694,9 @@ class Job(object):
registry.add(self, pipeline=pipeline) registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.sadd(self.dependents_key_for(self._dependency_id), self.id)
for dependency_id in self._dependency_ids:
dependents_key = self.dependents_key_for(dependency_id)
connection.sadd(dependents_key, self.id)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -4,11 +4,11 @@ from __future__ import (absolute_import, division, print_function,
import mock import mock
from redis import Redis from redis import Redis
from rq.decorators import job from rq.decorators import job
from rq.job import Job from rq.job import Job
from rq.worker import DEFAULT_RESULT_TTL
from rq.queue import Queue from rq.queue import Queue
from rq.worker import DEFAULT_RESULT_TTL
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import decorated_job from tests.fixtures import decorated_job
@ -109,11 +109,12 @@ class TestDecorator(RQTestCase):
bar_job = bar.delay() bar_job = bar.delay()
self.assertEqual(foo_job._dependency_ids,[])
self.assertIsNone(foo_job._dependency_id) self.assertIsNone(foo_job._dependency_id)
self.assertEqual(foo_job.dependency, None)
self.assertEqual(bar_job.dependency, foo_job) self.assertEqual(bar_job.dependency, foo_job)
self.assertEqual(bar_job.dependency.id, foo_job.id)
self.assertEqual(bar_job._dependency_id, foo_job.id)
def test_decorator_delay_accepts_depends_on_as_argument(self): def test_decorator_delay_accepts_depends_on_as_argument(self):
"""Ensure that passing in depends_on to the delay method of """Ensure that passing in depends_on to the delay method of
@ -145,8 +146,11 @@ class TestDecorator(RQTestCase):
self.assertIsNone(foo_job._dependency_id) self.assertIsNone(foo_job._dependency_id)
self.assertIsNone(bar_job._dependency_id) self.assertIsNone(bar_job._dependency_id)
self.assertEqual(baz_job.dependency, bar_job) self.assertEqual(foo_job._dependency_ids,[])
self.assertEqual(bar_job._dependency_ids,[])
self.assertEqual(baz_job._dependency_id, bar_job.id) self.assertEqual(baz_job._dependency_id, bar_job.id)
self.assertEqual(baz_job.dependency, bar_job)
self.assertEqual(baz_job.dependency.id, bar_job.id)
@mock.patch('rq.queue.resolve_connection') @mock.patch('rq.queue.resolve_connection')
def test_decorator_connection_laziness(self, resolve_connection): def test_decorator_connection_laziness(self, resolve_connection):

@ -2,28 +2,28 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import datetime
import time
import sys import sys
import time
import zlib import zlib
from datetime import datetime
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
else:
import queue as queue
from tests import fixtures, RQTestCase
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry) FinishedJobRegistry, StartedJobRegistry)
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase, fixtures
is_py2 = sys.version[0] == '2'
if is_py2:
import Queue as queue
else:
import queue as queue
try: try:
from cPickle import loads, dumps from cPickle import loads, dumps
@ -227,12 +227,16 @@ class TestJob(RQTestCase):
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job._dependency_ids, [parent_job.id])
self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job) self.assertEqual(stored_job.dependency, parent_job)
job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id) job = Job.create(func=fixtures.some_calculation, depends_on=parent_job.id)
job.save() job.save()
stored_job = Job.fetch(job.id) stored_job = Job.fetch(job.id)
self.assertEqual(stored_job._dependency_id, parent_job.id) self.assertEqual(stored_job._dependency_id, parent_job.id)
self.assertEqual(stored_job._dependency_ids, [parent_job.id])
self.assertEqual(stored_job.dependency.id, parent_job.id)
self.assertEqual(stored_job.dependency, parent_job) self.assertEqual(stored_job.dependency, parent_job)
def test_store_then_fetch(self): def test_store_then_fetch(self):

Loading…
Cancel
Save