Change get_dependency_statuses to dependencies_finished

Convert method on Job to return a boolean and rename. Also use
fetch_many in Queue#enqueue_dependents.
main
Thomas Matecki 5 years ago
parent a69d91d2b2
commit d5921814e4

@ -724,34 +724,29 @@ class Job(object):
connection.sadd(dependents_key, self.id) connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id) connection.sadd(self.dependencies_key, dependency_id)
def get_dependencies_statuses( def dependencies_finished(
self, self,
watch=False,
pipeline=None pipeline=None
): ):
"""Returns a list of tuples containing the job ids and status of all """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_
dependencies; e.g:
[('14462606-09c4-41c2-8bf1-fbd109092318', 'started'), If a pipeline is passed, all dependencies are WATCHed.
('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...]
As a minor optimization allowing callers to more quickly tell if all
dependencies are _FINISHED_, the returned list is sorted by the
`ended_at` timestamp, so those jobs which are not yet finished are at
the start of the list.
""" """
pipe = pipeline if pipeline is not None else self.connection pipe = pipeline if pipeline is not None else self.connection
if watch: if pipeline is not None:
pipe.watch(*[Job.key_for(_id) pipe.watch(*[Job.key_for(as_text(_id))
for _id in self.connection.smembers(self.dependencies_key)]) for _id in self.connection.smembers(self.dependencies_key)])
sort_by = self.redis_job_namespace_prefix + '*->ended_at' sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status' get_field = self.redis_job_namespace_prefix + '*->status'
# Sorting here lexographically works because these dates are stored in # As a minor optimization to more quickly tell if all dependencies
# an ISO 8601 format, so lexographic order is the same as # are _FINISHED_, sort dependencies by the `ended_at` timestamp so
# those jobs which are not yet finished are at the start of the
# list. Sorting here lexographically works because these dates are
# stored in an ISO 8601 format, so lexographic order is the same as
# chronological order. # chronological order.
dependencies_statuses = [ dependencies_statuses = [
(as_text(_id), as_text(status)) (as_text(_id), as_text(status))
@ -759,6 +754,8 @@ class Job(object):
get=['#', get_field], alpha=True, groups=True, ) get=['#', get_field], alpha=True, groups=True, )
] ]
return dependencies_statuses return all(status == JobStatus.FINISHED
for job_id, status
in dependencies_statuses)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -461,28 +461,18 @@ class Queue(object):
if pipeline is None: if pipeline is None:
pipe.watch(dependents_key) pipe.watch(dependents_key)
dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) dependent_job_ids = [as_text(_id)
for job_id in pipe.smembers(dependents_key)] for _id in pipe.smembers(dependents_key)]
dependencies_statuses = [ dependent_jobs = [
dependent.get_dependencies_statuses(watch=True, pipeline=pipe) job for job in self.job_class.fetch_many(dependent_job_ids,
for dependent in dependent_jobs connection=self.connection)
if job.dependencies_finished(pipeline=pipe)
] ]
pipe.multi() pipe.multi()
for dependent, dependents_dependencies in zip(dependent_jobs, for dependent in dependent_jobs:
dependencies_statuses):
# Enqueue this dependent job only if all of it's _other_
# dependencies are FINISHED.
if not all(
status == JobStatus.FINISHED
for job_id, status
in dependents_dependencies
):
continue
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(dependent.origin,
self.connection, self.connection,
job_class=self.job_class) job_class=self.job_class)

@ -798,7 +798,7 @@ class TestJob(RQTestCase):
pipeline.touch(dependency_job.id) pipeline.touch(dependency_job.id)
pipeline.execute() pipeline.execute()
def test_get_dependencies_statuses_returns_ids_and_statuses(self): def test_dependencies_finished_returns_false_if_dependencies_queued(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
dependency_job_ids = [ dependency_job_ids = [
@ -810,27 +810,21 @@ class TestJob(RQTestCase):
dependent_job._dependency_ids = dependency_job_ids dependent_job._dependency_ids = dependency_job_ids
dependent_job.register_dependency() dependent_job.register_dependency()
dependencies_statuses = dependent_job.get_dependencies_statuses() dependencies_finished = dependent_job.dependencies_finished()
self.assertSetEqual( self.assertFalse(dependencies_finished)
set(dependencies_statuses),
{(_id, JobStatus.QUEUED) for _id in dependency_job_ids}
)
def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self): def test_dependencies_finished_returns_true_if_no_dependencies(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello) dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency() dependent_job.register_dependency()
dependencies_statuses = dependent_job.get_dependencies_statuses() dependencies_finished = dependent_job.dependencies_finished()
self.assertListEqual( self.assertTrue(dependencies_finished)
dependencies_statuses,
[]
)
def test_get_dependencies_statuses_returns_ordered_by_end_time(self): def test_dependencies_finished_returns_true_if_all_dependencies_finished(self):
dependency_jobs = [ dependency_jobs = [
Job.create(fixtures.say_hello) Job.create(fixtures.say_hello)
for _ in range(5) for _ in range(5)
@ -842,19 +836,17 @@ class TestJob(RQTestCase):
now = utcnow() now = utcnow()
# Set ended_at timestamps
for i, job in enumerate(dependency_jobs): for i, job in enumerate(dependency_jobs):
job._status = JobStatus.FINISHED job._status = JobStatus.FINISHED
job.ended_at = now - timedelta(seconds=i) job.ended_at = now - timedelta(seconds=i)
job.save() job.save()
dependencies_statuses = dependent_job.get_dependencies_statuses() dependencies_finished = dependent_job.dependencies_finished()
self.assertListEqual( self.assertTrue(dependencies_finished)
dependencies_statuses,
[(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)]
)
def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self): def test_dependencies_finished_returns_false_if_unfinished_job(self):
dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)]
dependency_jobs[0]._status = JobStatus.FINISHED dependency_jobs[0]._status = JobStatus.FINISHED
@ -871,19 +863,11 @@ class TestJob(RQTestCase):
now = utcnow() now = utcnow()
dependencies_statuses = dependent_job.get_dependencies_statuses() dependencies_finished = dependent_job.dependencies_finished()
self.assertEqual(
dependencies_statuses[0],
(dependency_jobs[1].id, JobStatus.STARTED)
)
self.assertEqual( self.assertFalse(dependencies_finished)
dependencies_statuses[1],
(dependency_jobs[0].id, JobStatus.FINISHED)
)
def test_get_dependencies_statuses_watches_job(self): def test_dependencies_finished_watches_job(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello) dependency_job = queue.enqueue(fixtures.say_hello)
@ -894,9 +878,8 @@ class TestJob(RQTestCase):
with self.testconn.pipeline() as pipeline: with self.testconn.pipeline() as pipeline:
dependent_job.get_dependencies_statuses( dependent_job.dependencies_finished(
pipeline=pipeline, pipeline=pipeline,
watch=True
) )
dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn) dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)

Loading…
Cancel
Save