Raise an exception if a given dependency does not exist

Adapted some tests to the change: the dependency has to be saved first.
main
Stefan Hammer 8 years ago
parent 69b43daa72
commit 301e5c927b

@ -7,6 +7,10 @@ class NoSuchJobError(Exception):
pass pass
class InvalidJobDependency(Exception):
pass
class InvalidJobOperationError(Exception): class InvalidJobOperationError(Exception):
pass pass

@ -9,8 +9,8 @@ from redis import WatchError
from .compat import as_text, string_types, total_ordering from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL from .defaults import DEFAULT_RESULT_TTL
from .exceptions import (DequeueTimeout, InvalidJobOperationError, from .exceptions import (DequeueTimeout, InvalidJobDependency,
NoSuchJobError, UnpickleError) InvalidJobOperationError, NoSuchJobError, UnpickleError)
from .job import Job, JobStatus from .job import Job, JobStatus
from .utils import import_attribute, utcnow from .utils import import_attribute, utcnow
@ -202,6 +202,13 @@ class Queue(object):
while True: while True:
try: try:
pipe.watch(depends_on.key) pipe.watch(depends_on.key)
# If the dependency does not exist, we raise an
# exception. So the caller is able to avoid an orphaned
# job.
if not self.job_class.exists(depends_on.id):
raise InvalidJobDependency('Job {0} does not exist'.format(depends_on.id))
if depends_on.get_status() != JobStatus.FINISHED: if depends_on.get_status() != JobStatus.FINISHED:
pipe.multi() pipe.multi()
job.set_status(JobStatus.DEFERRED) job.set_status(JobStatus.DEFERRED)

@ -7,7 +7,7 @@ from tests.fixtures import (div_by_zero, echo, Number, say_hello,
some_calculation) some_calculation)
from rq import get_failed_queue, Queue from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobDependency, InvalidJobOperationError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.worker import Worker from rq.worker import Worker
@ -401,6 +401,7 @@ class TestQueue(RQTestCase):
"""Jobs are enqueued only when their dependencies are finished.""" """Jobs are enqueued only when their dependencies are finished."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job) job = q.enqueue_call(say_hello, depends_on=parent_job)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
@ -417,6 +418,7 @@ class TestQueue(RQTestCase):
def test_enqueue_job_with_dependency_by_id(self): def test_enqueue_job_with_dependency_by_id(self):
"""Can specify job dependency with job object or job id.""" """Can specify job dependency with job object or job id."""
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
q.enqueue_call(say_hello, depends_on=parent_job.id) q.enqueue_call(say_hello, depends_on=parent_job.id)
@ -433,6 +435,7 @@ class TestQueue(RQTestCase):
"""Jobs remember their timeout when enqueued as a dependency.""" """Jobs remember their timeout when enqueued as a dependency."""
# Job with unfinished dependency is not immediately enqueued # Job with unfinished dependency is not immediately enqueued
parent_job = Job.create(func=say_hello) parent_job = Job.create(func=say_hello)
parent_job.save()
q = Queue() q = Queue()
job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123) job = q.enqueue_call(say_hello, depends_on=parent_job, timeout=123)
self.assertEqual(q.job_ids, []) self.assertEqual(q.job_ids, [])
@ -445,6 +448,20 @@ class TestQueue(RQTestCase):
self.assertEqual(q.job_ids, [job.id]) self.assertEqual(q.job_ids, [job.id])
self.assertEqual(job.timeout, 123) self.assertEqual(job.timeout, 123)
def test_enqueue_job_with_invalid_dependency(self):
"""Enqueuing a job fails, if the dependency does not exist at all."""
parent_job = Job.create(func=say_hello)
# without save() the job is not visible to others
q = Queue()
with self.assertRaises(InvalidJobDependency):
q.enqueue_call(say_hello, depends_on=parent_job)
with self.assertRaises(InvalidJobDependency):
q.enqueue_call(say_hello, depends_on=parent_job.id)
self.assertEqual(q.job_ids, [])
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):

Loading…
Cancel
Save