diff --git a/docs/contrib/index.md b/docs/contrib/index.md index f503812..d7576b9 100644 --- a/docs/contrib/index.md +++ b/docs/contrib/index.md @@ -55,9 +55,12 @@ Any job ID that is encountered by a worker for which no job hash is found in Redis is simply ignored. This makes it easy to cancel jobs by simply removing the job hash. In Python: +```python from rq import cancel_job cancel_job('2eafc1e6-48c2-464b-a0ff-88fd199d039c') +``` Note that it is irrelevant on which queue the job resides. When a worker eventually pops the job ID from the queue and notes that the Job hash does not exist (anymore), it simply discards the job ID and continues with the next. + diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index 91252a4..54b993a 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -171,6 +171,17 @@ Canceling a job will remove: Note that `job.cancel()` does **not** delete the job itself from Redis. If you want to delete the job from Redis and reclaim memory, use `job.delete()`. +Note: if you want to enqueue the dependents of the job you +are trying to cancel use the following: + +```python +from rq import cancel_job +cancel_job( + '2eafc1e6-48c2-464b-a0ff-88fd199d039c', + enqueue_dependents=True +) +``` + ## Job / Queue Creation with Custom Serializer When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. diff --git a/rq/job.py b/rq/job.py index ebb87c1..0b128b4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -16,6 +16,8 @@ from enum import Enum from functools import partial from uuid import uuid4 +from redis import WatchError + from rq.compat import as_text, decode_redis_hash, string_types from .connections import resolve_connection from .exceptions import DeserializationError, NoSuchJobError @@ -46,11 +48,11 @@ class JobStatus(str, Enum): UNEVALUATED = object() -def cancel_job(job_id, connection=None, serializer=None): +def cancel_job(job_id, connection=None, serializer=None, enqueue_dependents=False): """Cancels the job with the given job ID, preventing execution. Discards any job info (i.e. it can't be requeued later). """ - Job.fetch(job_id, connection=connection, serializer=serializer).cancel() + Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents) def get_current_job(connection=None, job_class=None): @@ -676,32 +678,56 @@ class Job: meta = self.serializer.dumps(self.meta) self.connection.hset(self.key, 'meta', meta) - def cancel(self, pipeline=None): + def cancel(self, pipeline=None, enqueue_dependents=False): """Cancels the given job, which will prevent the job from ever being ran (or inspected). This method merely exists as a high-level API call to cancel jobs without worrying about the internals required to implement job cancellation. - """ - pipeline = pipeline or self.connection.pipeline() - if self.origin: - from .registry import CanceledJobRegistry - from .queue import Queue - q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) - q.remove(self, pipeline=pipeline) - - self.set_status(JobStatus.CANCELED, pipeline=pipeline) + You can enqueue the jobs dependents optionally, + Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in. + """ - registry = CanceledJobRegistry( - self.origin, - self.connection, - job_class=self.__class__, - serializer=self.serializer - ) - registry.add(self, pipeline=pipeline) - pipeline.execute() + from .registry import CanceledJobRegistry + from .queue import Queue + pipe = pipeline or self.connection.pipeline() + while True: + try: + q = Queue( + name=self.origin, + connection=self.connection, + job_class=self.__class__, + serializer=self.serializer + ) + if enqueue_dependents: + # Only WATCH if no pipeline passed, otherwise caller is responsible + if pipeline is None: + pipe.watch(self.dependents_key) + q.enqueue_dependents(self, pipeline=pipeline) + q.remove(self, pipeline=pipe) + + self.set_status(JobStatus.CANCELED, pipeline=pipe) + + registry = CanceledJobRegistry( + self.origin, + self.connection, + job_class=self.__class__, + serializer=self.serializer + ) + registry.add(self, pipeline=pipe) + if pipeline is None: + pipe.execute() + break + except WatchError: + if pipeline is None: + continue + else: + # if the pipeline comes from the caller, we re-raise the + # exception as it it the responsibility of the caller to + # handle it + raise def requeue(self): """Requeues job.""" diff --git a/tests/test_job.py b/tests/test_job.py index cdf0dae..291e439 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -812,6 +812,54 @@ class TestJob(RQTestCase): job.delete() self.assertNotIn(job, registry) + def test_create_and_cancel_job_enqueue_dependents(self): + """Ensure job.cancel() works properly with enqueue_dependents=True""" + queue = Queue(connection=self.testconn) + dependency = queue.enqueue(fixtures.say_hello) + dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency) + + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(1, len(queue.deferred_job_registry)) + cancel_job(dependency.id, enqueue_dependents=True) + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(0, len(queue.deferred_job_registry)) + registry = CanceledJobRegistry(connection=self.testconn, queue=queue) + self.assertIn(dependency, registry) + self.assertEqual(dependency.get_status(), JobStatus.CANCELED) + self.assertIn(dependent, queue.get_jobs()) + self.assertEqual(dependent.get_status(), JobStatus.QUEUED) + # If job is deleted, it's also removed from CanceledJobRegistry + dependency.delete() + self.assertNotIn(dependency, registry) + + def test_create_and_cancel_job_enqueue_dependents_with_pipeline(self): + """Ensure job.cancel() works properly with enqueue_dependents=True""" + queue = Queue(connection=self.testconn) + dependency = queue.enqueue(fixtures.say_hello) + dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency) + + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(1, len(queue.deferred_job_registry)) + self.testconn.set('some:key', b'some:value') + + with self.testconn.pipeline() as pipe: + pipe.watch('some:key') + self.assertEqual(self.testconn.get('some:key'), b'some:value') + dependency.cancel(pipeline=pipe, enqueue_dependents=True) + pipe.set('some:key', b'some:other:value') + pipe.execute() + self.assertEqual(self.testconn.get('some:key'), b'some:other:value') + self.assertEqual(1, len(queue.get_jobs())) + self.assertEqual(0, len(queue.deferred_job_registry)) + registry = CanceledJobRegistry(connection=self.testconn, queue=queue) + self.assertIn(dependency, registry) + self.assertEqual(dependency.get_status(), JobStatus.CANCELED) + self.assertIn(dependent, queue.get_jobs()) + self.assertEqual(dependent.get_status(), JobStatus.QUEUED) + # If job is deleted, it's also removed from CanceledJobRegistry + dependency.delete() + self.assertNotIn(dependency, registry) + def test_create_and_cancel_job_with_serializer(self): """test creating and using cancel_job (with serializer) deletes job properly""" queue = Queue(connection=self.testconn, serializer=JSONSerializer)