diff --git a/rq/job.py b/rq/job.py index c45bb07..810e1f6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -724,34 +724,29 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) - def get_dependencies_statuses( + def dependencies_finished( self, - watch=False, pipeline=None ): - """Returns a list of tuples containing the job ids and status of all - dependencies; e.g: + """Returns a boolean indicating if all of this jobs dependencies are _FINISHED_ - [('14462606-09c4-41c2-8bf1-fbd109092318', 'started'), - ('e207328f-d5bc-4ea9-8d61-b449891e3230', 'finished'), ...] - - As a minor optimization allowing callers to more quickly tell if all - dependencies are _FINISHED_, the returned list is sorted by the - `ended_at` timestamp, so those jobs which are not yet finished are at - the start of the list. + If a pipeline is passed, all dependencies are WATCHed. """ pipe = pipeline if pipeline is not None else self.connection - if watch: - pipe.watch(*[Job.key_for(_id) + if pipeline is not None: + pipe.watch(*[Job.key_for(as_text(_id)) for _id in self.connection.smembers(self.dependencies_key)]) sort_by = self.redis_job_namespace_prefix + '*->ended_at' get_field = self.redis_job_namespace_prefix + '*->status' - # Sorting here lexographically works because these dates are stored in - # an ISO 8601 format, so lexographic order is the same as + # As a minor optimization to more quickly tell if all dependencies + # are _FINISHED_, sort dependencies by the `ended_at` timestamp so + # those jobs which are not yet finished are at the start of the + # list. Sorting here lexographically works because these dates are + # stored in an ISO 8601 format, so lexographic order is the same as # chronological order. dependencies_statuses = [ (as_text(_id), as_text(status)) @@ -759,6 +754,8 @@ class Job(object): get=['#', get_field], alpha=True, groups=True, ) ] - return dependencies_statuses + return all(status == JobStatus.FINISHED + for job_id, status + in dependencies_statuses) _job_stack = LocalStack() diff --git a/rq/queue.py b/rq/queue.py index 57aa3a6..afe7f48 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -461,28 +461,18 @@ class Queue(object): if pipeline is None: pipe.watch(dependents_key) - dependent_jobs = [self.job_class.fetch(as_text(job_id), connection=self.connection) - for job_id in pipe.smembers(dependents_key)] + dependent_job_ids = [as_text(_id) + for _id in pipe.smembers(dependents_key)] - dependencies_statuses = [ - dependent.get_dependencies_statuses(watch=True, pipeline=pipe) - for dependent in dependent_jobs + dependent_jobs = [ + job for job in self.job_class.fetch_many(dependent_job_ids, + connection=self.connection) + if job.dependencies_finished(pipeline=pipe) ] pipe.multi() - for dependent, dependents_dependencies in zip(dependent_jobs, - dependencies_statuses): - - # Enqueue this dependent job only if all of it's _other_ - # dependencies are FINISHED. - if not all( - status == JobStatus.FINISHED - for job_id, status - in dependents_dependencies - ): - continue - + for dependent in dependent_jobs: registry = DeferredJobRegistry(dependent.origin, self.connection, job_class=self.job_class) diff --git a/tests/test_job.py b/tests/test_job.py index e8f965a..394140a 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -798,7 +798,7 @@ class TestJob(RQTestCase): pipeline.touch(dependency_job.id) pipeline.execute() - def test_get_dependencies_statuses_returns_ids_and_statuses(self): + def test_dependencies_finished_returns_false_if_dependencies_queued(self): queue = Queue(connection=self.testconn) dependency_job_ids = [ @@ -810,27 +810,21 @@ class TestJob(RQTestCase): dependent_job._dependency_ids = dependency_job_ids dependent_job.register_dependency() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertSetEqual( - set(dependencies_statuses), - {(_id, JobStatus.QUEUED) for _id in dependency_job_ids} - ) + self.assertFalse(dependencies_finished) - def test_get_dependencies_statuses_returns_empty_list_if_no_dependencies(self): + def test_dependencies_finished_returns_true_if_no_dependencies(self): queue = Queue(connection=self.testconn) dependent_job = Job.create(func=fixtures.say_hello) dependent_job.register_dependency() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertListEqual( - dependencies_statuses, - [] - ) + self.assertTrue(dependencies_finished) - def test_get_dependencies_statuses_returns_ordered_by_end_time(self): + def test_dependencies_finished_returns_true_if_all_dependencies_finished(self): dependency_jobs = [ Job.create(fixtures.say_hello) for _ in range(5) @@ -842,19 +836,17 @@ class TestJob(RQTestCase): now = utcnow() + # Set ended_at timestamps for i, job in enumerate(dependency_jobs): job._status = JobStatus.FINISHED job.ended_at = now - timedelta(seconds=i) job.save() - dependencies_statuses = dependent_job.get_dependencies_statuses() + dependencies_finished = dependent_job.dependencies_finished() - self.assertListEqual( - dependencies_statuses, - [(job.id, JobStatus.FINISHED) for job in reversed(dependency_jobs)] - ) + self.assertTrue(dependencies_finished) - def test_get_dependencies_statuses_returns_not_finished_job_ordered_first(self): + def test_dependencies_finished_returns_false_if_unfinished_job(self): dependency_jobs = [Job.create(fixtures.say_hello) for _ in range(2)] dependency_jobs[0]._status = JobStatus.FINISHED @@ -871,19 +863,11 @@ class TestJob(RQTestCase): now = utcnow() - dependencies_statuses = dependent_job.get_dependencies_statuses() - - self.assertEqual( - dependencies_statuses[0], - (dependency_jobs[1].id, JobStatus.STARTED) - ) + dependencies_finished = dependent_job.dependencies_finished() - self.assertEqual( - dependencies_statuses[1], - (dependency_jobs[0].id, JobStatus.FINISHED) - ) + self.assertFalse(dependencies_finished) - def test_get_dependencies_statuses_watches_job(self): + def test_dependencies_finished_watches_job(self): queue = Queue(connection=self.testconn) dependency_job = queue.enqueue(fixtures.say_hello) @@ -894,9 +878,8 @@ class TestJob(RQTestCase): with self.testconn.pipeline() as pipeline: - dependent_job.get_dependencies_statuses( + dependent_job.dependencies_finished( pipeline=pipeline, - watch=True ) dependency_job.set_status(JobStatus.FAILED, pipeline=self.testconn)