Merge pull request #1259 from rq/multi-dependencies

Multi dependencies
main
Selwin Ong 5 years ago committed by GitHub
commit 91a22949c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -3,20 +3,26 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import inspect import inspect
import pickle
import warnings import warnings
import zlib import zlib
from functools import partial
from uuid import uuid4 from uuid import uuid4
from rq.compat import (as_text, decode_redis_hash, hmset, string_types, from rq.compat import (as_text, decode_redis_hash, hmset, string_types,
text_type) text_type)
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError from .exceptions import NoSuchJobError
from .local import LocalStack from .local import LocalStack
from .serializers import resolve_serializer
from .utils import (enum, import_attribute, parse_timeout, str_to_date, from .utils import (enum, import_attribute, parse_timeout, str_to_date,
utcformat, utcnow) utcformat, utcnow)
from .serializers import resolve_serializer
# Serialize pickle dumps using the highest pickle protocol (binary, default
# uses ascii)
dumps = partial(pickle.dumps, protocol=pickle.HIGHEST_PROTOCOL)
loads = pickle.loads
JobStatus = enum( JobStatus = enum(
'JobStatus', 'JobStatus',
@ -125,7 +131,7 @@ class Job(object):
def set_status(self, status, pipeline=None): def set_status(self, status, pipeline=None):
self._status = status self._status = status
connection = pipeline or self.connection connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status) connection.hset(self.key, 'status', self._status)
@property @property
@ -392,21 +398,20 @@ class Job(object):
watch is true, then set WATCH on all the keys of all dependencies. watch is true, then set WATCH on all the keys of all dependencies.
Returned jobs will use self's connection, not the pipeline supplied. Returned jobs will use self's connection, not the pipeline supplied.
If a job has been deleted from redis, it is not returned.
""" """
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids: if watch and self._dependency_ids:
connection.watch(*self._dependency_ids) connection.watch(*self._dependency_ids)
jobs = self.fetch_many(self._dependency_ids, connection=self.connection) jobs = [job
for job in self.fetch_many(self._dependency_ids, connection=self.connection)
for i, job in enumerate(jobs): if job]
if not job:
raise NoSuchJobError('Dependency {0} does not exist'.format(self._dependency_ids[i]))
return jobs return jobs
@property @property
def result(self): def result(self):
"""Returns the return value of the job. """Returns the return value of the job.
@ -726,4 +731,45 @@ class Job(object):
connection.sadd(dependents_key, self.id) connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id) connection.sadd(self.dependencies_key, dependency_id)
@property
def dependency_ids(self):
dependencies = self.connection.smembers(self.dependencies_key)
return [Job.key_for(_id.decode())
for _id in dependencies]
def dependencies_are_met(self, exclude_job_id=None, pipeline=None):
"""Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
`exclude` allows us to exclude some job id from the status check. This is useful
when enqueueing the dependents of a _successful_ job -- that status of
`FINISHED` may not be yet set in redis, but said job is indeed _done_ and this
method is _called_ in the _stack_ of it's dependents are being enqueued.
"""
connection = pipeline if pipeline is not None else self.connection
if pipeline is not None:
connection.watch(*self.dependency_ids)
dependencies_ids = {_id.decode()
for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
with connection.pipeline() as pipeline:
for key in dependencies_ids:
pipeline.hget(self.key_for(key), 'status')
dependencies_statuses = pipeline.execute()
return all(
status.decode() == JobStatus.FINISHED
for status
in dependencies_statuses
if status
)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -4,7 +4,6 @@ from __future__ import (absolute_import, division, print_function,
import uuid import uuid
import warnings import warnings
from datetime import datetime from datetime import datetime
from redis import WatchError from redis import WatchError
@ -14,8 +13,8 @@ from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus from .job import Job, JobStatus
from .utils import backend_class, import_attribute, parse_timeout, utcnow
from .serializers import resolve_serializer from .serializers import resolve_serializer
from .utils import backend_class, import_attribute, parse_timeout, utcnow
def compact(lst): def compact(lst):
@ -283,7 +282,7 @@ class Queue(object):
at_front=False, meta=None): at_front=False, meta=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
nd
It is much like `.enqueue()`, except that it takes the function's args It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself. contain options for RQ itself.
@ -463,12 +462,23 @@ class Queue(object):
if pipeline is None: if pipeline is None:
pipe.watch(dependents_key) pipe.watch(dependents_key)
dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) dependent_job_ids = [as_text(_id)
for job_id in pipe.smembers(dependents_key)] for _id in pipe.smembers(dependents_key)]
jobs_to_enqueue = [
dependent_job for dependent_job
in self.job_class.fetch_many(
dependent_job_ids,
connection=self.connection
) if dependent_job.dependencies_are_met(
exclude_job_id=job.id,
pipeline=pipe
)
]
pipe.multi() pipe.multi()
for dependent in dependent_jobs: for dependent in jobs_to_enqueue:
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(dependent.origin,
self.connection, self.connection,
job_class=self.job_class) job_class=self.job_class)

@ -6,7 +6,7 @@ import json
import time import time
import queue import queue
import zlib import zlib
from datetime import datetime from datetime import datetime, timedelta
from redis import WatchError from redis import WatchError
@ -17,7 +17,7 @@ from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry, FinishedJobRegistry, StartedJobRegistry,
ScheduledJobRegistry) ScheduledJobRegistry)
from rq.utils import utcformat from rq.utils import utcformat, utcnow
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase, fixtures from tests import RQTestCase, fixtures
@ -773,8 +773,12 @@ class TestJob(RQTestCase):
dependency_job.delete() dependency_job.delete()
with self.assertRaises(NoSuchJobError): self.assertNotIn(
dependent_job.fetch_dependencies(pipeline=self.testconn) dependent_job.id,
[job.id for job in dependent_job.fetch_dependencies(
pipeline=self.testconn
)]
)
def test_fetch_dependencies_watches(self): def test_fetch_dependencies_watches(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
@ -796,3 +800,115 @@ class TestJob(RQTestCase):
self.testconn.set(dependency_job.id, 'somethingelsehappened') self.testconn.set(dependency_job.id, 'somethingelsehappened')
pipeline.touch(dependency_job.id) pipeline.touch(dependency_job.id)
pipeline.execute() pipeline.execute()
def test_dependencies_finished_returns_false_if_dependencies_queued(self):
queue = Queue(connection=self.testconn)
dependency_job_ids = [
queue.enqueue(fixtures.say_hello).id
for _ in range(5)
]
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = dependency_job_ids
dependent_job.register_dependency()
dependencies_finished = dependent_job.dependencies_are_met()
self.assertFalse(dependencies_finished)
def test_dependencies_finished_returns_true_if_no_dependencies(self):
queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency()
dependencies_finished = dependent_job.dependencies_are_met()
self.assertTrue(dependencies_finished)
def test_dependencies_finished_returns_true_if_all_dependencies_finished(self):
dependency_jobs = [
Job.create(fixtures.say_hello)
for _ in range(5)
]
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()
now = utcnow()
# Set ended_at timestamps
for i, job in enumerate(dependency_jobs):
job._status = JobStatus.FINISHED
job.ended_at = now - timedelta(seconds=i)
job.save()
dependencies_finished = dependent_job.dependencies_are_met()
self.assertTrue(dependencies_finished)
def test_dependencies_finished_returns_false_if_unfinished_job(self):
dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)]
dependency_jobs[0]._status = JobStatus.FINISHED
dependency_jobs[0].ended_at = utcnow()
dependency_jobs[0].save()
dependency_jobs[1]._status = JobStatus.STARTED
dependency_jobs[1].ended_at = None
dependency_jobs[1].save()
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()
now = utcnow()
dependencies_finished = dependent_job.dependencies_are_met()
self.assertFalse(dependencies_finished)
def test_dependencies_finished_watches_job(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [dependency_job.id]
dependent_job.register_dependency()
with self.testconn.pipeline() as pipeline:
dependent_job.dependencies_are_met(
pipeline=pipeline,
)
dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)
pipeline.multi()
with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()
def test_can_enqueue_job_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
w = Worker([queue])
w.work(burst=True)
assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
def test_dependents_are_met_if_dependency_is_deleted(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0)
dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job)
w = Worker([queue])
w.work(burst=True, max_jobs=1)
assert dependent_job.dependencies_are_met()
assert dependent_job.get_status() == JobStatus.QUEUED

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import json import json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from mock.mock import patch
from rq import Queue from rq import Queue
from rq.compat import utc from rq.compat import utc
@ -23,6 +24,20 @@ class CustomJob(Job):
pass pass
class MultipleDependencyJob(Job):
"""
Allows for the patching of `_dependency_ids` to simulate multi-dependency
support without modifying the public interface of `Job`
"""
create_job = Job.create
@classmethod
def create(cls, *args, **kwargs):
dependency_ids = kwargs.pop('kwargs').pop('_dependency_ids')
_job = cls.create_job(*args, **kwargs)
_job._dependency_ids = dependency_ids
return _job
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
def test_create_queue(self): def test_create_queue(self):
"""Creating queues.""" """Creating queues."""
@ -410,6 +425,9 @@ class TestQueue(RQTestCase):
job_2 = q.enqueue(say_hello, depends_on=parent_job) job_2 = q.enqueue(say_hello, depends_on=parent_job)
registry = DeferredJobRegistry(q.name, connection=self.testconn) registry = DeferredJobRegistry(q.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual( self.assertEqual(
set(registry.get_job_ids()), set(registry.get_job_ids()),
set([job_1.id, job_2.id]) set([job_1.id, job_2.id])
@ -440,6 +458,9 @@ class TestQueue(RQTestCase):
set([job_1.id]) set([job_1.id])
) )
registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn) registry_2 = DeferredJobRegistry(q_2.name, connection=self.testconn)
parent_job.set_status(JobStatus.FINISHED)
self.assertEqual( self.assertEqual(
set(registry_2.get_job_ids()), set(registry_2.get_job_ids()),
set([job_2.id]) set([job_2.id])
@ -510,18 +531,83 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123) self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_invalid_dependency(self): def test_enqueue_job_with_multiple_queued_dependencies(self):
"""Enqueuing a job fails, if the dependency does not exist at all."""
parent_job = Job.create(func=say_hello) parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
# without save() the job is not visible to others
for job in parent_jobs:
job._status = JobStatus.QUEUED
job.save()
q = Queue() q = Queue()
with self.assertRaises(NoSuchJobError): with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue(say_hello, depends_on=parent_jobs[0],
_dependency_ids = [job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, [])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
with self.assertRaises(NoSuchJobError): def test_enqueue_job_with_multiple_finished_dependencies(self):
q.enqueue_call(say_hello, depends_on=parent_job.id)
parent_jobs = [Job.create(func=say_hello) for _ in range(2)]
for job in parent_jobs:
job._status = JobStatus.FINISHED
job.save()
q = Queue()
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
job = q.enqueue(say_hello, depends_on=parent_jobs[0],
_dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.fetch_dependencies(), parent_jobs)
def test_enqueues_dependent_if_other_dependencies_finished(self):
parent_jobs = [Job.create(func=say_hello) for _ in
range(3)]
parent_jobs[0]._status = JobStatus.STARTED
parent_jobs[0].save()
parent_jobs[1]._status = JobStatus.FINISHED
parent_jobs[1].save()
parent_jobs[2]._status = JobStatus.FINISHED
parent_jobs[2].save()
q = Queue()
with patch('rq.queue.Job.create',
new=MultipleDependencyJob.create):
# dependent job deferred, b/c parent_job 0 is still 'started'
dependent_job = q.enqueue(say_hello, depends_on=parent_jobs[0],
_dependency_ids=[job.id for job in parent_jobs])
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
# now set parent job 0 to 'finished'
parent_jobs[0].set_status(JobStatus.FINISHED)
q.enqueue_dependents(parent_jobs[0])
self.assertEqual(dependent_job.get_status(), JobStatus.QUEUED)
self.assertEqual(q.job_ids, [dependent_job.id])
def test_does_not_enqueue_dependent_if_other_dependencies_not_finished(self):
started_dependency = Job.create(func=say_hello, status=JobStatus.STARTED)
started_dependency.save()
queued_dependency = Job.create(func=say_hello, status=JobStatus.QUEUED)
queued_dependency.save()
q = Queue()
with patch('rq.queue.Job.create', new=MultipleDependencyJob.create):
dependent_job = q.enqueue(say_hello, depends_on=[started_dependency],
_dependency_ids=[started_dependency.id, queued_dependency.id])
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
q.enqueue_dependents(started_dependency)
self.assertEqual(dependent_job.get_status(), JobStatus.DEFERRED)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
def test_fetch_job_successful(self): def test_fetch_job_successful(self):

Loading…
Cancel
Save