diff --git a/rq/exceptions.py b/rq/exceptions.py index 530733d..ebb6b3b 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -7,6 +7,10 @@ class NoSuchJobError(Exception): pass +class InvalidJobDependency(Exception): + pass + + class InvalidJobOperationError(Exception): pass diff --git a/rq/queue.py b/rq/queue.py index e5d3df8..1b8aacc 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -9,8 +9,8 @@ from redis import WatchError from .compat import as_text, string_types, total_ordering from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL -from .exceptions import (DequeueTimeout, InvalidJobOperationError, - NoSuchJobError, UnpickleError) +from .exceptions import (DequeueTimeout, InvalidJobDependency, + InvalidJobOperationError, NoSuchJobError, UnpickleError) from .job import Job, JobStatus from .utils import import_attribute, utcnow @@ -202,6 +202,13 @@ class Queue(object): while True: try: pipe.watch(depends_on.key) + + # If the dependency does not exist, we raise an + # exception. So the caller is able to avoid an orphaned + # job. + if not self.job_class.exists(depends_on.id): + raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id)) + if depends_on.get_status() != JobStatus.FINISHED: pipe.multi() job.set_status(JobStatus.DEFERRED) diff --git a/tests/test_queue.py b/tests/test_queue.py index b62fddc..2edb163 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -7,7 +7,7 @@ from tests.fixtures import (div_by_zero, echo, Number, say_hello, some_calculation) from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobOperationError +from rq.exceptions import InvalidJobDependency, InvalidJobOperationError from rq.job import Job, JobStatus from rq.registry import DeferredJobRegistry from rq.worker import Worker @@ -401,6 +401,7 @@ class TestQueue(RQTestCase): """Jobs are enqueued only when their dependencies are finished.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job) self.assertEqual(q.job_ids, []) @@ -417,6 +418,7 @@ class TestQueue(RQTestCase): def test_enqueue_job_with_dependency_by_id(self): """Can specify job dependency with job object or job id.""" parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() q.enqueue_call(say_hello, depends_on=parent_job.id) @@ -433,6 +435,7 @@ class TestQueue(RQTestCase): """Jobs remember their timeout when enqueued as a dependency.""" # Job with unfinished dependency is not immediately enqueued parent_job = Job.create(func=say_hello) + parent_job.save() q = Queue() job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) self.assertEqual(q.job_ids, []) @@ -445,6 +448,20 @@ class TestQueue(RQTestCase): self.assertEqual(q.job_ids, [job.id]) self.assertEqual(job.timeout, 123) + def test_enqueue_job_with_invalid_dependency(self): + """Enqueuing a job fails, if the dependency does not exist at all.""" + parent_job = Job.create(func=say_hello) + # without save() the job is not visible to others + + q = Queue() + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job) + + with self.assertRaises(InvalidJobDependency): + q.enqueue_call(say_hello, depends_on=parent_job.id) + + self.assertEqual(q.job_ids, []) + class TestFailedQueue(RQTestCase): def test_requeue_job(self):