From 246d52b977275e1ae4848a56a07bac0266e6595a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sun, 22 Aug 2021 07:19:15 +0700 Subject: [PATCH] job.cancel() puts job into CanceledJobRegistry. (#1546) * job.cancel() puts job into CanceledJobRegistry. * Improve test coverage --- docs/docs/job_registries.md | 1 + docs/docs/jobs.md | 35 ++++++++++++++++++++++--- rq/job.py | 28 +++++++++++++++++--- rq/queue.py | 3 +-- rq/registry.py | 51 +++++++++++++++++++++---------------- tests/test_job.py | 11 ++++++-- tests/test_registry.py | 6 ++++- tests/test_worker.py | 2 +- 8 files changed, 103 insertions(+), 34 deletions(-) diff --git a/docs/docs/job_registries.md b/docs/docs/job_registries.md index 9ec5ca3..5740c6d 100644 --- a/docs/docs/job_registries.md +++ b/docs/docs/job_registries.md @@ -11,6 +11,7 @@ executed and removed right after completion (success or failure). * `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that job to finish). * `ScheduledJobRegistry` Holds scheduled jobs. +* `CanceledJobRegistry` Holds canceled jobs. You can get the number of jobs in a registry, the ids of the jobs in the registry, and more. Below is an example using a `StartedJobRegistry`. diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index 44559bc..91252a4 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -84,7 +84,7 @@ job = Job.create(count_words_at_url, }) ``` -### Retrieving a Job from Redis +### Retrieving Jobs All job information is stored in Redis. You can inspect a job and its attributes by using `Job.fetch()`. @@ -100,7 +100,7 @@ print('Status: %s' % job.get_status()) Some interesting job attributes include: * `job.get_status(refresh=True)` Possible values are `queued`, `started`, - `deferred`, `finished`, `stopped`, `scheduled` and `failed`. If `refresh` is + `deferred`, `finished`, `stopped`, `scheduled`, `canceled` and `failed`. If `refresh` is `True` fresh values are fetched from Redis. * `job.get_meta(refresh=True)` Returns custom `job.meta` dict containing user stored data. If `refresh` is `True` fresh values are fetched from Redis. @@ -126,7 +126,7 @@ for job in jobs: ``` ## Stopping a Currently Executing Job -_New in version 1.7.0._ +_New in version 1.7.0_ You can use `send_stop_job_command()` to tell a worker to immediately stop a currently executing job. A job that's stopped will be sent to [FailedJobRegistry](https://python-rq.org/docs/results/#dealing-with-exceptions). @@ -142,6 +142,35 @@ send_stop_job_command(redis, job_id) Unlike failed jobs, stopped jobs will *not* be automatically retried if retry is configured. Subclasses of `Worker` which override `handle_job_failure()` should likewise take care to handle jobs with a `stopped` status appropriately. +## Canceling a Job +_New in version 1.10.0_ + +To prevent a job from running, cancel a job, use `job.cancel()`. + +```python +from redis import Redis +from rq.job import Job +from rq.registry import CanceledJobRegistry +from .queue import Queue + +redis = Redis() +job = Job.fetch('my_job_id', connection=redis) +job.cancel() + +job.get_status() # Job status is CANCELED + +registry = CanceledJobRegistry(job.origin, connection=job.connection) +print(job in registry) # Job is in CanceledJobRegistry +``` + +Canceling a job will remove: +1. Sets job status to `CANCELED` +2. Removes job from queue +3. Puts job into `CanceledJobRegistry` + +Note that `job.cancel()` does **not** delete the job itself from Redis. If you want to +delete the job from Redis and reclaim memory, use `job.delete()`. + ## Job / Queue Creation with Custom Serializer When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. diff --git a/rq/job.py b/rq/job.py index c8ca211..be308ad 100644 --- a/rq/job.py +++ b/rq/job.py @@ -38,6 +38,7 @@ class JobStatus(str, Enum): DEFERRED = 'deferred' SCHEDULED = 'scheduled' STOPPED = 'stopped' + CANCELED = 'canceled' # Sentinel value to mark that some of our lazily evaluated properties have not @@ -185,6 +186,10 @@ class Job: def is_deferred(self): return self.get_status() == JobStatus.DEFERRED + @property + def is_canceled(self): + return self.get_status() == JobStatus.CANCELED + @property def is_scheduled(self): return self.get_status() == JobStatus.SCHEDULED @@ -679,11 +684,18 @@ class Job: without worrying about the internals required to implement job cancellation. """ - from .queue import Queue pipeline = pipeline or self.connection.pipeline() if self.origin: + from .registry import CanceledJobRegistry + from .queue import Queue + q = Queue(name=self.origin, connection=self.connection) q.remove(self, pipeline=pipeline) + + self.set_status(JobStatus.CANCELED, pipeline=pipeline) + + registry = CanceledJobRegistry(self.origin, self.connection, job_class=self.__class__) + registry.add(self, pipeline=pipeline) pipeline.execute() def requeue(self): @@ -694,10 +706,14 @@ class Job: delete_dependents=False): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well.""" - if remove_from_queue: - self.cancel(pipeline=pipeline) + connection = pipeline if pipeline is not None else self.connection + if remove_from_queue: + from .queue import Queue + q = Queue(name=self.origin, connection=self.connection) + q.remove(self, pipeline=pipeline) + if self.is_finished: from .registry import FinishedJobRegistry registry = FinishedJobRegistry(self.origin, @@ -729,6 +745,12 @@ class Job: elif self.is_failed: self.failed_job_registry.remove(self, pipeline=pipeline) + elif self.is_canceled: + from .registry import CanceledJobRegistry + registry = CanceledJobRegistry(self.origin, connection=self.connection, + job_class=self.__class__) + registry.remove(self, pipeline=pipeline) + if delete_dependents: self.delete_dependents(pipeline=pipeline) diff --git a/rq/queue.py b/rq/queue.py index 99f651f..febc361 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -260,8 +260,7 @@ class Queue: job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id if pipeline is not None: - pipeline.lrem(self.key, 1, job_id) - return + return pipeline.lrem(self.key, 1, job_id) return self.connection.lrem(self.key, 1, job_id) diff --git a/rq/registry.py b/rq/registry.py index 4d55d74..38bdb3c 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -107,6 +107,28 @@ class BaseRegistry: score = self.connection.zscore(self.key, job.id) return datetime.utcfromtimestamp(score) + def requeue(self, job_or_id): + """Requeues the job with the given job ID.""" + if isinstance(job_or_id, self.job_class): + job = job_or_id + else: + job = self.job_class.fetch(job_or_id, connection=self.connection) + + result = self.connection.zrem(self.key, job.id) + if not result: + raise InvalidJobOperation + + with self.connection.pipeline() as pipeline: + queue = Queue(job.origin, connection=self.connection, + job_class=self.job_class) + job.started_at = None + job.ended_at = None + job.exc_info = '' + job.save() + job = queue.enqueue_job(job, pipeline=pipeline) + pipeline.execute() + return job + class StartedJobRegistry(BaseRegistry): """ @@ -215,28 +237,6 @@ class FailedJobRegistry(BaseRegistry): if not pipeline: p.execute() - def requeue(self, job_or_id): - """Requeues the job with the given job ID.""" - if isinstance(job_or_id, self.job_class): - job = job_or_id - else: - job = self.job_class.fetch(job_or_id, connection=self.connection) - - result = self.connection.zrem(self.key, job.id) - if not result: - raise InvalidJobOperation - - with self.connection.pipeline() as pipeline: - queue = Queue(job.origin, connection=self.connection, - job_class=self.job_class) - job.started_at = None - job.ended_at = None - job.exc_info = '' - job.save() - job = queue.enqueue_job(job, pipeline=pipeline) - pipeline.execute() - return job - class DeferredJobRegistry(BaseRegistry): """ @@ -315,6 +315,13 @@ class ScheduledJobRegistry(BaseRegistry): return datetime.fromtimestamp(score, tz=timezone.utc) +class CanceledJobRegistry(BaseRegistry): + key_template = 'rq:canceled:{0}' + + def get_expired_job_ids(self, timestamp=None): + raise NotImplementedError + + def clean_registries(queue): """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" registry = FinishedJobRegistry(name=queue.name, diff --git a/tests/test_job.py b/tests/test_job.py index fb5bef9..e18c07e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -12,7 +12,7 @@ from rq.compat import as_text from rq.exceptions import DeserializationError, NoSuchJobError from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue -from rq.registry import (DeferredJobRegistry, FailedJobRegistry, +from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry, ScheduledJobRegistry) from rq.utils import utcformat, utcnow @@ -797,12 +797,19 @@ class TestJob(RQTestCase): self.assertEqual(0, len(queue.get_jobs())) def test_create_and_cancel_job(self): - """test creating and using cancel_job deletes job properly""" + """Ensure job.cancel() works properly""" queue = Queue(connection=self.testconn) job = queue.enqueue(fixtures.say_hello) self.assertEqual(1, len(queue.get_jobs())) cancel_job(job.id) self.assertEqual(0, len(queue.get_jobs())) + registry = CanceledJobRegistry(connection=self.testconn, queue=queue) + self.assertIn(job, registry) + self.assertEqual(job.get_status(), JobStatus.CANCELED) + + # If job is deleted, it's also removed from CanceledJobRegistry + job.delete() + self.assertNotIn(job, registry) def test_dependents_key_for_should_return_prefixed_job_id(self): """test redis key to store job dependents hash under""" diff --git a/tests/test_registry.py b/tests/test_registry.py index 9603bca..07872b8 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -10,7 +10,7 @@ from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import (clean_registries, DeferredJobRegistry, +from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry) @@ -133,6 +133,10 @@ class TestRegistry(RQTestCase): self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), ['foo', 'bar']) + # CanceledJobRegistry does not implement get_expired_job_ids() + registry = CanceledJobRegistry(connection=self.testconn) + self.assertRaises(NotImplementedError, registry.get_expired_job_ids) + def test_cleanup_moves_jobs_to_failed_job_registry(self): """Moving expired jobs to FailedJobRegistry.""" queue = Queue(connection=self.testconn) diff --git a/tests/test_worker.py b/tests/test_worker.py index 9a4c1e1..327c617 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -580,7 +580,7 @@ class TestWorker(RQTestCase): self.assertTrue(job.meta['first_handler']) self.assertEqual(job.meta.get('second_handler'), None) - def test_cancelled_jobs_arent_executed(self): + def test_deleted_jobs_arent_executed(self): """Cancelling jobs.""" SENTINEL_FILE = '/tmp/rq-tests.txt' # noqa