diff --git a/rq/job.py b/rq/job.py index 188d47b..fa895fe 100644 --- a/rq/job.py +++ b/rq/job.py @@ -20,7 +20,7 @@ from redis import WatchError from rq.compat import as_text, decode_redis_hash, string_types from .connections import resolve_connection -from .exceptions import DeserializationError, NoSuchJobError +from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack from .serializers import resolve_serializer from .utils import (get_version, import_attribute, parse_timeout, str_to_date, @@ -690,6 +690,8 @@ class Job: Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in. """ + if self.is_canceled: + raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) from .registry import CanceledJobRegistry from .queue import Queue pipe = pipeline or self.connection.pipeline() @@ -706,7 +708,10 @@ class Job: if pipeline is None: pipe.watch(self.dependents_key) q.enqueue_dependents(self, pipeline=pipeline) - q.remove(self, pipeline=pipe) + self._remove_from_registries( + pipeline=pipe, + remove_from_queue=True + ) self.set_status(JobStatus.CANCELED, pipeline=pipe) @@ -733,13 +738,7 @@ class Job: """Requeues job.""" return self.failed_job_registry.requeue(self) - def delete(self, pipeline=None, remove_from_queue=True, - delete_dependents=False): - """Cancels the job and deletes the job hash from Redis. Jobs depending - on this job can optionally be deleted as well.""" - - connection = pipeline if pipeline is not None else self.connection - + def _remove_from_registries(self, pipeline=None, remove_from_queue=True): if remove_from_queue: from .queue import Queue q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) @@ -787,6 +786,15 @@ class Job: serializer=self.serializer) registry.remove(self, pipeline=pipeline) + def delete(self, pipeline=None, remove_from_queue=True, + delete_dependents=False): + """Cancels the job and deletes the job hash from Redis. Jobs depending + on this job can optionally be deleted as well.""" + + connection = pipeline if pipeline is not None else self.connection + + self._remove_from_registries(pipeline=pipeline, remove_from_queue=True) + if delete_dependents: self.delete_dependents(pipeline=pipeline) diff --git a/tests/test_job.py b/tests/test_job.py index 291e439..185c5ec 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -10,7 +10,7 @@ from datetime import datetime, timedelta from redis import WatchError from rq.compat import as_text -from rq.exceptions import DeserializationError, NoSuchJobError +from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, @@ -812,6 +812,27 @@ class TestJob(RQTestCase): job.delete() self.assertNotIn(job, registry) + def test_create_and_cancel_job_fails_already_canceled(self): + """Ensure job.cancel() fails on already canceld job""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(fixtures.say_hello, job_id='fake_job_id') + self.assertEqual(1, len(queue.get_jobs())) + + # First cancel should be fine + cancel_job(job.id) + self.assertEqual(0, len(queue.get_jobs())) + registry = CanceledJobRegistry(connection=self.testconn, queue=queue) + self.assertIn(job, registry) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + + # Second cancel should fail + self.assertRaisesRegex( + InvalidJobOperation, + r'Cannot cancel already canceled job: fake_job_id', + cancel_job, + job.id + ) + def test_create_and_cancel_job_enqueue_dependents(self): """Ensure job.cancel() works properly with enqueue_dependents=True""" queue = Queue(connection=self.testconn) @@ -832,6 +853,38 @@ class TestJob(RQTestCase): dependency.delete() self.assertNotIn(dependency, registry) + def test_create_and_cancel_job_enqueue_dependents_in_registry(self): + """Ensure job.cancel() works properly with enqueue_dependents=True and when the job is in a registry""" + queue = Queue(connection=self.testconn) + dependency = queue.enqueue(fixtures.raise_exc) + dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency) + + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(1, len(queue.deferred_job_registry)) + w = Worker([queue]) + w.work(burst=True, max_jobs=1) + dependency.refresh() + dependent.refresh() + self.assertEqual(0, len(queue.get_jobs())) + self.assertEqual(1, len(queue.deferred_job_registry)) + self.assertEqual(1, len(queue.failed_job_registry)) + cancel_job(dependency.id, enqueue_dependents=True) + dependency.refresh() + dependent.refresh() + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(0, len(queue.deferred_job_registry)) + self.assertEqual(0, len(queue.failed_job_registry)) + self.assertEqual(1, len(queue.canceled_job_registry)) + registry = CanceledJobRegistry(connection=self.testconn, queue=queue) + self.assertIn(dependency, registry) + self.assertEqual(dependency.get_status(), JobStatus.CANCELED) + self.assertNotIn(dependency, queue.failed_job_registry) + self.assertIn(dependent, queue.get_jobs()) + self.assertEqual(dependent.get_status(), JobStatus.QUEUED) + # If job is deleted, it's also removed from CanceledJobRegistry + dependency.delete() + self.assertNotIn(dependency, registry) + def test_create_and_cancel_job_enqueue_dependents_with_pipeline(self): """Ensure job.cancel() works properly with enqueue_dependents=True""" queue = Queue(connection=self.testconn)