diff --git a/rq/job.py b/rq/job.py index 8eaa7a3..3cad5ab 100644 --- a/rq/job.py +++ b/rq/job.py @@ -403,18 +403,17 @@ class Job(object): watch is true, then set WATCH on all the keys of all dependencies. Returned jobs will use self's connection, not the pipeline supplied. + + If a job has been deleted from redis, it is not returned. """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: connection.watch(*self._dependency_ids) - jobs = self.fetch_many(self._dependency_ids, connection=self.connection) - - for i, job in enumerate(jobs): - if not job: - raise NoSuchJobError( - 'Dependency {0} does not exist'.format(self._dependency_ids[i])) + jobs = [job for + job in self.fetch_many(self._dependency_ids, connection=self.connection) + if job] return jobs @@ -739,6 +738,12 @@ class Job(object): connection.sadd(dependents_key, self.id) connection.sadd(self.dependencies_key, dependency_id) + @property + def dependencies_job_ids(self): + dependencies = self.connection.smembers(self.dependencies_key) + return [Job.key_for(as_text(_id)) + for _id in dependencies] + def dependencies_are_met( self, pipeline=None, @@ -758,14 +763,15 @@ class Job(object): pipe = pipeline if pipeline is not None else self.connection - dependencies = self.connection.smembers(self.dependencies_key) - if pipeline is not None: - pipe.watch(*[Job.key_for(as_text(_id)) - for _id in dependencies]) + pipe.watch(*self.dependencies_job_ids) sort_by = self.redis_job_namespace_prefix + '*->ended_at' - get_field = self.redis_job_namespace_prefix + '*->status' + get_fields = ( + '#', + self.redis_job_namespace_prefix + '*->created_at', + self.redis_job_namespace_prefix + '*->status' + ) # As a minor optimization to more quickly tell if all dependencies # are _FINISHED_, sort dependencies by the `ended_at` timestamp so @@ -774,16 +780,17 @@ class Job(object): # stored in an ISO 8601 format, so lexographic order is the same as # chronological order. dependencies_statuses = [ - (as_text(_id), as_text(status)) - for _id, status in pipe.sort(name=self.dependencies_key, by=sort_by, - get=['#', get_field], alpha=True, - groups=True, ) + tuple(map(as_text, result)) + for result in pipe.sort(name=self.dependencies_key, by=sort_by, + get=get_fields, alpha=True, + groups=True, ) ] - return all(status == JobStatus.FINISHED - for job_id, status + # if `created_at` is None, then this has been deleted! + return all(status == JobStatus.FINISHED or not created_at + for dependency_id, created_at, status in dependencies_statuses - if job_id not in exclude) + if dependency_id not in exclude) _job_stack = LocalStack() diff --git a/tests/test_job.py b/tests/test_job.py index 6ce717e..0a0f49d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -8,7 +8,6 @@ import queue import zlib from datetime import datetime, timedelta -import pytest from redis import WatchError from rq.compat import PY2, as_text @@ -774,8 +773,12 @@ class TestJob(RQTestCase): dependency_job.delete() - with self.assertRaises(NoSuchJobError): - dependent_job.fetch_dependencies(pipeline=self.testconn) + self.assertNotIn( + dependent_job.id, + [job.id for job in dependent_job.fetch_dependencies( + pipeline=self.testconn + )] + ) def test_fetch_dependencies_watches(self): queue = Queue(connection=self.testconn) @@ -877,7 +880,6 @@ class TestJob(RQTestCase): dependent_job.register_dependency() with self.testconn.pipeline() as pipeline: - dependent_job.dependencies_are_met( pipeline=pipeline, ) @@ -888,3 +890,25 @@ class TestJob(RQTestCase): with self.assertRaises(WatchError): pipeline.touch(Job.key_for(dependent_job.id)) pipeline.execute() + + def test_can_enqueue_job_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + + w = Worker([queue]) + w.work(burst=True) + + assert queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + def test_dependents_are_met_if_dependency_is_deleted(self): + queue = Queue(connection=self.testconn) + + dependency_job = queue.enqueue(fixtures.say_hello, result_ttl=0) + dependent_job = queue.enqueue(fixtures.say_hello, depends_on=dependency_job) + + w = Worker([queue]) + w.work(burst=True, max_jobs=1) + + assert dependent_job.get_status() == JobStatus.QUEUED + assert dependent_job.dependencies_are_met() diff --git a/tests/test_queue.py b/tests/test_queue.py index 1d14b3a..79dd646 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -531,20 +531,6 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) - def test_enqueue_job_with_invalid_dependency(self): - """Enqueuing a job fails, if the dependency does not exist at all.""" - parent_job = Job.create(func=say_hello) - # without save() the job is not visible to others - - q = Queue() - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job) - - with self.assertRaises(NoSuchJobError): - q.enqueue_call(say_hello, depends_on=parent_job.id) - - self.assertEqual(q.job_ids, []) - def test_enqueue_job_with_multiple_queued_dependencies(self): parent_jobs = [Job.create(func=say_hello) for _ in range(2)]