Multi Dependency Support - Registration & Enqueue Call (#1155)

* Multi Dependency Support - Registration & Enqueue Call

Internal API changes to support multiple dependencies.
* Store all of a job's _dependencies_ in a redis set. Delete that set when a job is deleted.
* Add Job#fetch_dependencies method - which return all jobs a job is dependent upon and optionally _WATCHES_ all dependency ids.
* Use Job#fetch_dependencies in Queue#call_enqueue. `fetch_dependencies` now sets WATCH and raises InvalidJobDependency, rather than call_enqueue.

`Queue` and `Job` public APIs still expect single ids of jobs for `depends_on` but internally register them in a way that could support multiple jobs being passed as dependencies.

Next up: need to update Queue#enqueue_dependents

* Use existing fetch_many method to get dependencies.

Modify fetch_dependencies to use fetch_many.

* Remove default value for fetch_many's connection parameter

* PR review housekeeping

* Remove a duplicate test
* Oneline something
* Fix missing colon in dependencies key
* Delete job key, dependents and dependencies at once

* More Fixes From Code Review

Updates to Job, Queue and associated tests.

* When Checking dependencies Avoid, trip to Redis

* When checking the status of a job, we have a 'clean' status of all dependencies(returned from Job#fetch_dependencies) and the job keys are WATCHed, so there's no reason to go back to Redis to get the status _again_.
* Looks as though, the `_status` set in `Job#restore` was bytes while it was converted to text(`as_text`) in `Job#get_status` - for consistency(and tests) converting to text in `restore` as well.
* In `Queue#enqueue_call`, moved WATCH of dependencies_key to before fetching dependencies. This doesn't really matter but seems more _correct_ - one can imagine some rogue API adding a dependency after they've been fetched but before they've been WATCHEed.

* Update Job#get_status to get _local_ status

* If refresh=False is passed, don't get status from Redis; return the value of _status. This is to avoid a trip to Redis if the caller can guarantee that the value of `_status` is _clean_.

* More Fixups

* Expire dependency keys in Job#cleanup
* Consistency in Job#fetch_dependencies
main
Thomas Matecki 5 years ago committed by Selwin Ong
parent af678243e1
commit 80c82f731f

@ -11,7 +11,7 @@ from uuid import uuid4
from rq.compat import as_text, decode_redis_hash, string_types, text_type from rq.compat import as_text, decode_redis_hash, string_types, text_type
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError from .exceptions import InvalidJobDependency, NoSuchJobError, UnpickleError
from .local import LocalStack from .local import LocalStack
from .utils import (enum, import_attribute, parse_timeout, str_to_date, from .utils import (enum, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow) utcformat, utcnow)
@ -140,8 +140,10 @@ class Job(object):
job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on] job._dependency_ids = [depends_on.id if isinstance(depends_on, Job) else depends_on]
return job return job
def get_status(self): def get_status(self, refresh=True):
self._status = as_text(self.connection.hget(self.key, 'status')) if refresh:
self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status return self._status
def set_status(self, status, pipeline=None): def set_status(self, status, pipeline=None):
@ -297,8 +299,13 @@ class Job(object):
return job return job
@classmethod @classmethod
def fetch_many(cls, job_ids, connection=None): def fetch_many(cls, job_ids, connection):
"""Bulk version of Job.fetch""" """
Bulk version of Job.fetch
For any job_ids which a job does not exist, the corresponding item in
the returned list will be None.
"""
with connection.pipeline() as pipeline: with connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
pipeline.hgetall(cls.key_for(job_id)) pipeline.hgetall(cls.key_for(job_id))
@ -393,6 +400,31 @@ class Job(object):
"""The Redis key that is used to store job dependents hash under.""" """The Redis key that is used to store job dependents hash under."""
return self.dependents_key_for(self.id) return self.dependents_key_for(self.id)
@property
def dependencies_key(self):
return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id)
def fetch_dependencies(self, watch=False, pipeline=None):
"""
Fetch all of a job's dependencies. If a pipeline is supplied, and
watch is true, then set WATCH on all the keys of all dependencies.
Returned jobs will use self's connection, not the pipeline supplied.
"""
connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids:
connection.watch(*self._dependency_ids)
jobs = self.fetch_many(self._dependency_ids, connection=self.connection)
for i, job in enumerate(jobs):
if not job:
raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i]))
return jobs
@property @property
def result(self): def result(self):
"""Returns the return value of the job. """Returns the return value of the job.
@ -444,7 +476,7 @@ class Job(object):
self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None self.timeout = parse_timeout(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa self.failure_ttl = int(obj.get('failure_ttl')) if obj.get('failure_ttl') else None # noqa
self._status = obj.get('status') if obj.get('status') else None self._status = as_text(obj.get('status')) if obj.get('status') else None
dependency_id = obj.get('dependency_id', None) dependency_id = obj.get('dependency_id', None)
self._dependency_ids = [as_text(dependency_id)] if dependency_id else [] self._dependency_ids = [as_text(dependency_id)] if dependency_id else []
@ -591,8 +623,7 @@ class Job(object):
if delete_dependents: if delete_dependents:
self.delete_dependents(pipeline=pipeline) self.delete_dependents(pipeline=pipeline)
connection.delete(self.key) connection.delete(self.key, self.dependents_key, self.dependencies_key)
connection.delete(self.dependents_key)
def delete_dependents(self, pipeline=None): def delete_dependents(self, pipeline=None):
"""Delete jobs depending on this job.""" """Delete jobs depending on this job."""
@ -668,6 +699,8 @@ class Job(object):
elif ttl > 0: elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)
connection.expire(self.dependents_key, ttl)
connection.expire(self.dependencies_key, ttl)
@property @property
def failed_job_registry(self): def failed_job_registry(self):
@ -698,5 +731,6 @@ class Job(object):
for dependency_id in self._dependency_ids: for dependency_id in self._dependency_ids:
dependents_key = self.dependents_key_for(dependency_id) dependents_key = self.dependents_key_for(dependency_id)
connection.sadd(dependents_key, self.id) connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -13,7 +13,7 @@ from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError, from .exceptions import (DequeueTimeout, InvalidJobDependency, NoSuchJobError,
UnpickleError) UnpickleError)
from .job import Job, JobStatus from .job import Job, JobStatus
from .utils import backend_class, import_attribute, utcnow, parse_timeout from .utils import backend_class, import_attribute, parse_timeout, utcnow
def compact(lst): def compact(lst):
@ -252,39 +252,41 @@ class Queue(object):
depends_on=depends_on, timeout=timeout, id=job_id, depends_on=depends_on, timeout=timeout, id=job_id,
origin=self.name, meta=meta) origin=self.name, meta=meta)
# If job depends on an unfinished job, register itself on it's # If a _dependent_ job depends on any unfinished job, register all the
# parent's dependents instead of enqueueing it. #_dependent_ job's dependencies instead of enqueueing it.
# If WatchError is raised in the process, that means something else is #
# modifying the dependency. In this case we simply retry # `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
if depends_on is not None: if depends_on is not None:
if not isinstance(depends_on, self.job_class):
depends_on = self.job_class(id=depends_on,
connection=self.connection)
with self.connection.pipeline() as pipe: with self.connection.pipeline() as pipe:
while True: while True:
try: try:
pipe.watch(depends_on.key)
pipe.watch(job.dependencies_key)
# If the dependency does not exist, raise an
# exception to avoid creating an orphaned job. dependencies = job.fetch_dependencies(
if not self.job_class.exists(depends_on.id, watch=True,
self.connection): pipeline=pipe
raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id)) )
if depends_on.get_status() != JobStatus.FINISHED: pipe.multi()
pipe.multi()
job.set_status(JobStatus.DEFERRED) for dependency in dependencies:
job.register_dependency(pipeline=pipe) if dependency.get_status(refresh=False) != JobStatus.FINISHED:
job.save(pipeline=pipe) job.set_status(JobStatus.DEFERRED, pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe) job.register_dependency(pipeline=pipe)
pipe.execute() job.save(pipeline=pipe)
return job job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute()
return job
break break
except WatchError: except WatchError:
continue continue
job = self.enqueue_job(job, at_front=at_front) job = self.enqueue_job(job, at_front=at_front)
return job return job
def run_job(self, job): def run_job(self, job):

@ -7,6 +7,8 @@ import time
import zlib import zlib
from datetime import datetime from datetime import datetime
from redis import WatchError
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.job import Job, JobStatus, cancel_job, get_current_job
@ -23,8 +25,6 @@ if is_py2:
else: else:
import queue as queue import queue as queue
try: try:
from cPickle import loads, dumps from cPickle import loads, dumps
except ImportError: except ImportError:
@ -45,7 +45,8 @@ class TestJob(RQTestCase):
expected_string = "myfunc(12, '', null=None, snowman='')" expected_string = "myfunc(12, '', null=None, snowman='')"
else: else:
# Python 2 # Python 2
expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode('utf-8') expected_string = u"myfunc(12, u'\\u2603', null=None, snowman=u'\\u2603')".decode(
'utf-8')
self.assertEqual( self.assertEqual(
job.description, job.description,
@ -241,7 +242,8 @@ class TestJob(RQTestCase):
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch.""" """Store, then fetch."""
job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4), kwargs=dict(z=2)) job = Job.create(func=fixtures.some_calculation, timeout='1h', args=(3, 4),
kwargs=dict(z=2))
job.save() job.save()
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
@ -331,7 +333,6 @@ class TestJob(RQTestCase):
job.refresh() job.refresh()
self.assertEqual(job.data, job_data) self.assertEqual(job.data, job_data)
def test_custom_meta_is_persisted(self): def test_custom_meta_is_persisted(self):
"""Additional meta data on jobs are stored persisted correctly.""" """Additional meta data on jobs are stored persisted correctly."""
job = Job.create(func=fixtures.say_hello, args=('Lionel',)) job = Job.create(func=fixtures.say_hello, args=('Lionel',))
@ -397,7 +398,8 @@ class TestJob(RQTestCase):
def test_description_is_persisted(self): def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly""" """Ensure that job's custom description is set properly"""
job = Job.create(func=fixtures.say_hello, args=('Lionel',), description='Say hello!') job = Job.create(func=fixtures.say_hello, args=('Lionel',),
description='Say hello!')
job.save() job.save()
Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, 'Say hello!') self.assertEqual(job.description, 'Say hello!')
@ -499,6 +501,22 @@ class TestJob(RQTestCase):
job.cleanup(ttl=0) job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_cleanup_expires_dependency_keys(self):
dependency_job = Job.create(func=fixtures.say_hello)
dependency_job.save()
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependent_job.cleanup(ttl=100)
dependency_job.cleanup(ttl=100)
self.assertEqual(self.testconn.ttl(dependent_job.dependencies_key), 100)
self.assertEqual(self.testconn.ttl(dependency_job.dependents_key), 100)
def test_job_with_dependents_delete_parent(self): def test_job_with_dependents_delete_parent(self):
"""job.delete() deletes itself from Redis but not dependents. """job.delete() deletes itself from Redis but not dependents.
Wthout a save, the dependent job is never saved into redis. The delete Wthout a save, the dependent job is never saved into redis. The delete
@ -613,6 +631,33 @@ class TestJob(RQTestCase):
self.assertNotIn(job.id, queue.get_job_ids()) self.assertNotIn(job.id, queue.get_job_ids())
def test_dependent_job_creates_dependencies_key(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
self.assertTrue(self.testconn.exists(dependent_job.dependencies_key))
def test_dependent_job_deletes_dependencies_key(self):
"""
job.delete() deletes itself from Redis.
"""
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependent_job.delete()
self.assertTrue(self.testconn.exists(dependency_job.key))
self.assertFalse(self.testconn.exists(dependent_job.dependencies_key))
self.assertFalse(self.testconn.exists(dependent_job.key))
def test_create_job_with_id(self): def test_create_job_with_id(self):
"""test creating jobs with a custom ID""" """test creating jobs with a custom ID"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
@ -626,7 +671,8 @@ class TestJob(RQTestCase):
"""test call string with unicode keyword arguments""" """test call string with unicode keyword arguments"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.echo, arg_with_unicode=fixtures.UnicodeStringObject()) job = queue.enqueue(fixtures.echo,
arg_with_unicode=fixtures.UnicodeStringObject())
self.assertIsNotNone(job.get_call_string()) self.assertIsNotNone(job.get_call_string())
job.perform() job.perform()
@ -665,3 +711,67 @@ class TestJob(RQTestCase):
key = Job.key_for(job_id=job_id) key = Job.key_for(job_id=job_id)
assert key == (Job.redis_job_namespace_prefix + job_id).encode('utf-8') assert key == (Job.redis_job_namespace_prefix + job_id).encode('utf-8')
def test_dependencies_key_should_have_prefixed_job_id(self):
job_id = 'random'
job = Job(id=job_id)
expected_key = Job.redis_job_namespace_prefix + ":" + job_id + ':dependencies'
assert job.dependencies_key == expected_key
def test_fetch_dependencies_returns_dependency_jobs(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn)
self.assertListEqual(dependencies, [dependency_job])
def test_fetch_dependencies_returns_empty_if_not_dependent_job(self):
queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency()
dependent_job.save()
dependencies = dependent_job.fetch_dependencies(pipeline=self.testconn)
self.assertListEqual(dependencies, [])
def test_fetch_dependencies_raises_if_dependency_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
dependency_job.delete()
with self.assertRaises(NoSuchJobError):
dependent_job.fetch_dependencies(pipeline=self.testconn)
def test_fetch_dependencies_watches(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello, depends_on=dependency_job)
dependent_job.register_dependency()
dependent_job.save()
with self.testconn.pipeline() as pipeline:
dependent_job.fetch_dependencies(
watch=True,
pipeline=pipeline
)
pipeline.multi()
with self.assertRaises(WatchError):
self.testconn.set(dependency_job.id, 'somethingelsehappened')
pipeline.touch(dependency_job.id)
pipeline.execute()

@ -2,14 +2,13 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from tests import RQTestCase
from tests.fixtures import echo, say_hello
from rq import Queue from rq import Queue
from rq.exceptions import InvalidJobDependency from rq.exceptions import InvalidJobDependency, NoSuchJobError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase
from tests.fixtures import echo, say_hello
class CustomJob(Job): class CustomJob(Job):
@ -488,10 +487,10 @@ class TestQueue(RQTestCase):
# without save() the job is not visible to others # without save() the job is not visible to others
q = Queue() q = Queue()
with self.assertRaises(InvalidJobDependency): with self.assertRaises(NoSuchJobError):
q.enqueue_call(say_hello, depends_on=parent_job) q.enqueue_call(say_hello, depends_on=parent_job)
with self.assertRaises(InvalidJobDependency): with self.assertRaises(NoSuchJobError):
q.enqueue_call(say_hello, depends_on=parent_job.id) q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])

Loading…
Cancel
Save