diff --git a/rq/job.py b/rq/job.py index 15e706b..0661f7a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -124,7 +124,7 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - connection = pipeline or self.connection + connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @property @@ -405,7 +405,6 @@ class Job(object): return jobs - @property def result(self): """Returns the return value of the job. @@ -725,4 +724,43 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) + def get_dependencies_statuses( + self, + watch=False, + pipeline=None + ): + """Returns a list of tuples containing the job ids and status of all + dependencies; e.g: + + [('14462606-09c4-41c2-8bf1-fbd109092318', 'started'), + ('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...] + + As a minor optimization allowing callers to more quickly tell if all + dependencies are _FINISHED_, the returned list is sorted by the + `ended_at` timestamp, so those jobs which are not yet finished are at + the start of the list. + """ + + pipe = pipeline if pipeline is not None else self.connection + + if watch: + pipe.watch(self.dependencies_key) + pipe.watch(*[self.redis_job_namespace_prefix + as_text(_id) + for _id in pipe.smembers(self.dependencies_key)]) + + sort_by = self.redis_job_namespace_prefix + '*->ended_at' + get_field = self.redis_job_namespace_prefix + '*->status' + + # Sorting here lexographically works because these dates are stored in + # an ISO 8601 format, so lexographic order is the same as + # chronological order. + dependencies_statuses = [ + (as_text(_id), as_text(status)) + for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, + get=['#', get_field], alpha=True, groups=True, ) + ] + + return dependencies_statuses + + _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 4316653..56ec31a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -466,7 +466,8 @@ class Queue(object): pipe.multi() - for dependent in dependent_jobs: + for dependent, dependents_dependencies in dependent_jobs: + registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) diff --git a/tests/test_job.py b/tests/test_job.py index 25c37e2..835dc6f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -6,8 +6,9 @@ import json import time import queue import zlib -from datetime import datetime +from datetime import datetime, timedelta +import pytest from redis import WatchError from rq.compat import PY2, as_text @@ -17,7 +18,7 @@ from rq.queue import Queue from rq.registry import (DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, ScheduledJobRegistry) -from rq.utils import utcformat +from rq.utils import utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures @@ -796,3 +797,137 @@ class TestJob(RQTestCase): self.testconn.set(dependency_job.id, 'somethingelsehappened') pipeline.touch(dependency_job.id) pipeline.execute() + + def test_get_dependencies_statuses_returns_ids_and_statuses(self): + queue = Queue(connection=self.testconn) + + dependency_job_ids = [ + queue.enqueue(fixtures.say_hello).id + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = dependency_job_ids + dependent_job.register_dependency() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertSetEqual( + set(dependencies_statuses), + {(_id, JobStatus.QUEUED) for _id in dependency_job_ids} + ) + + def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self): + queue = Queue(connection=self.testconn) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job.register_dependency() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertListEqual( + dependencies_statuses, + [] + ) + + def test_get_dependencies_statuses_returns_ordered_by_end_time(self): + dependency_jobs = [ + Job.create(fixtures.say_hello) + for _ in range(5) + ] + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + for i, job in enumerate(dependency_jobs): + job._status = JobStatus.FINISHED + job.ended_at = now - timedelta(seconds=i) + job.save() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertListEqual( + dependencies_statuses, + [(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)] + ) + + def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self): + dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] + + dependency_jobs[0]._status = JobStatus.FINISHED + dependency_jobs[0].ended_at = utcnow() + dependency_jobs[0].save() + + dependency_jobs[1]._status = JobStatus.STARTED + dependency_jobs[1].ended_at = None + dependency_jobs[1].save() + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [job.id for job in dependency_jobs] + dependent_job.register_dependency() + + now = utcnow() + + dependencies_statuses = dependent_job.get_dependencies_statuses() + + self.assertEqual( + dependencies_statuses[0], + (dependency_jobs[1].id, JobStatus.STARTED) + ) + + self.assertEqual( + dependencies_statuses[1], + (dependency_jobs[0].id, JobStatus.FINISHED) + ) + + def test_get_dependencies_statuses_watches_job(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello) + + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [dependency_job.id] + dependent_job.register_dependency() + + with self.testconn.pipeline() as pipeline: + + dependent_job.get_dependencies_statuses( + pipeline=pipeline, + watch=True + ) + + dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn) + pipeline.multi() + + with self.assertRaises(WatchError): + pipeline.touch(Job.key_for(dependent_job.id)) + pipeline.execute() + + def test_get_dependencies_statuses_watches_dependency_set(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello) + dependent_job = Job.create(func=fixtures.say_hello) + dependent_job._dependency_ids = [dependency_job.id] + dependent_job.register_dependency() + + with self.testconn.pipeline() as pipeline: + + dependent_job.get_dependencies_statuses( + pipeline=pipeline, + watch=True + ) + + self.testconn.sadd( + dependent_job.dependencies_key, + queue.enqueue(fixtures.say_hello).id, + ) + + pipeline.multi() + + with self.assertRaises(WatchError): + pipeline.touch(Job.key_for(dependent_job.id)) + pipeline.execute()