Create get_dependencies_statuses method on Job

This method shall be used in Queue#enqueue_dependendents to determine if all of a dependents' dependencies have been _FINISHED_.
main
Thomas Matecki 5 years ago
parent 0dd9ff0ec9
commit ee215a1853

@ -124,7 +124,7 @@ class Job(object):
def set_status(self, status, pipeline=None): def set_status(self, status, pipeline=None):
self._status = status self._status = status
connection = pipeline or self.connection connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status) connection.hset(self.key, 'status', self._status)
@property @property
@ -405,7 +405,6 @@ class Job(object):
return jobs return jobs
@property @property
def result(self): def result(self):
"""Returns the return value of the job. """Returns the return value of the job.
@ -725,4 +724,43 @@ class Job(object):
connection.sadd(dependents_key, self.id) connection.sadd(dependents_key, self.id)
connection.sadd(self.dependencies_key, dependency_id) connection.sadd(self.dependencies_key, dependency_id)
def get_dependencies_statuses(
self,
watch=False,
pipeline=None
):
"""Returns a list of tuples containing the job ids and status of all
dependencies; e.g:
[('14462606-09c4-41c2-8bf1-fbd109092318', 'started'),
('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...]
As a minor optimization allowing callers to more quickly tell if all
dependencies are _FINISHED_, the returned list is sorted by the
`ended_at` timestamp, so those jobs which are not yet finished are at
the start of the list.
"""
pipe = pipeline if pipeline is not None else self.connection
if watch:
pipe.watch(self.dependencies_key)
pipe.watch(*[self.redis_job_namespace_prefix + as_text(_id)
for _id in pipe.smembers(self.dependencies_key)])
sort_by = self.redis_job_namespace_prefix + '*->ended_at'
get_field = self.redis_job_namespace_prefix + '*->status'
# Sorting here lexographically works because these dates are stored in
# an ISO 8601 format, so lexographic order is the same as
# chronological order.
dependencies_statuses = [
(as_text(_id), as_text(status))
for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by,
get=['#', get_field], alpha=True, groups=True, )
]
return dependencies_statuses
_job_stack = LocalStack() _job_stack = LocalStack()

@ -466,7 +466,8 @@ class Queue(object):
pipe.multi() pipe.multi()
for dependent in dependent_jobs: for dependent, dependents_dependencies in dependent_jobs:
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(dependent.origin,
self.connection, self.connection,
job_class=self.job_class) job_class=self.job_class)

@ -6,8 +6,9 @@ import json
import time import time
import queue import queue
import zlib import zlib
from datetime import datetime from datetime import datetime, timedelta
import pytest
from redis import WatchError from redis import WatchError
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
@ -17,7 +18,7 @@ from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry, FinishedJobRegistry, StartedJobRegistry,
ScheduledJobRegistry) ScheduledJobRegistry)
from rq.utils import utcformat from rq.utils import utcformat, utcnow
from rq.worker import Worker from rq.worker import Worker
from tests import RQTestCase, fixtures from tests import RQTestCase, fixtures
@ -796,3 +797,137 @@ class TestJob(RQTestCase):
self.testconn.set(dependency_job.id, 'somethingelsehappened') self.testconn.set(dependency_job.id, 'somethingelsehappened')
pipeline.touch(dependency_job.id) pipeline.touch(dependency_job.id)
pipeline.execute() pipeline.execute()
def test_get_dependencies_statuses_returns_ids_and_statuses(self):
queue = Queue(connection=self.testconn)
dependency_job_ids = [
queue.enqueue(fixtures.say_hello).id
for _ in range(5)
]
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = dependency_job_ids
dependent_job.register_dependency()
dependencies_statuses = dependent_job.get_dependencies_statuses()
self.assertSetEqual(
set(dependencies_statuses),
{(_id, JobStatus.QUEUED) for _id in dependency_job_ids}
)
def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self):
queue = Queue(connection=self.testconn)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job.register_dependency()
dependencies_statuses = dependent_job.get_dependencies_statuses()
self.assertListEqual(
dependencies_statuses,
[]
)
def test_get_dependencies_statuses_returns_ordered_by_end_time(self):
dependency_jobs = [
Job.create(fixtures.say_hello)
for _ in range(5)
]
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()
now = utcnow()
for i, job in enumerate(dependency_jobs):
job._status = JobStatus.FINISHED
job.ended_at = now - timedelta(seconds=i)
job.save()
dependencies_statuses = dependent_job.get_dependencies_statuses()
self.assertListEqual(
dependencies_statuses,
[(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)]
)
def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self):
dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)]
dependency_jobs[0]._status = JobStatus.FINISHED
dependency_jobs[0].ended_at = utcnow()
dependency_jobs[0].save()
dependency_jobs[1]._status = JobStatus.STARTED
dependency_jobs[1].ended_at = None
dependency_jobs[1].save()
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [job.id for job in dependency_jobs]
dependent_job.register_dependency()
now = utcnow()
dependencies_statuses = dependent_job.get_dependencies_statuses()
self.assertEqual(
dependencies_statuses[0],
(dependency_jobs[1].id, JobStatus.STARTED)
)
self.assertEqual(
dependencies_statuses[1],
(dependency_jobs[0].id, JobStatus.FINISHED)
)
def test_get_dependencies_statuses_watches_job(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [dependency_job.id]
dependent_job.register_dependency()
with self.testconn.pipeline() as pipeline:
dependent_job.get_dependencies_statuses(
pipeline=pipeline,
watch=True
)
dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)
pipeline.multi()
with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()
def test_get_dependencies_statuses_watches_dependency_set(self):
queue = Queue(connection=self.testconn)
dependency_job = queue.enqueue(fixtures.say_hello)
dependent_job = Job.create(func=fixtures.say_hello)
dependent_job._dependency_ids = [dependency_job.id]
dependent_job.register_dependency()
with self.testconn.pipeline() as pipeline:
dependent_job.get_dependencies_statuses(
pipeline=pipeline,
watch=True
)
self.testconn.sadd(
dependent_job.dependencies_key,
queue.enqueue(fixtures.say_hello).id,
)
pipeline.multi()
with self.assertRaises(WatchError):
pipeline.touch(Job.key_for(dependent_job.id))
pipeline.execute()

Loading…
Cancel
Save