job.cancel() puts job into CanceledJobRegistry. (#1546)

* job.cancel() puts job into CanceledJobRegistry.

* Improve test coverage
main
Selwin Ong 3 years ago committed by GitHub
parent 31dafb9e5c
commit 246d52b977
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,6 +11,7 @@ executed and removed right after completion (success or failure).
* `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that * `DeferredJobRegistry` Holds deferred jobs (jobs that depend on another job and are waiting for that
job to finish). job to finish).
* `ScheduledJobRegistry` Holds scheduled jobs. * `ScheduledJobRegistry` Holds scheduled jobs.
* `CanceledJobRegistry` Holds canceled jobs.
You can get the number of jobs in a registry, the ids of the jobs in the registry, and more. You can get the number of jobs in a registry, the ids of the jobs in the registry, and more.
Below is an example using a `StartedJobRegistry`. Below is an example using a `StartedJobRegistry`.

@ -84,7 +84,7 @@ job = Job.create(count_words_at_url,
}) })
``` ```
### Retrieving a Job from Redis ### Retrieving Jobs
All job information is stored in Redis. You can inspect a job and its attributes All job information is stored in Redis. You can inspect a job and its attributes
by using `Job.fetch()`. by using `Job.fetch()`.
@ -100,7 +100,7 @@ print('Status: %s' % job.get_status())
Some interesting job attributes include: Some interesting job attributes include:
* `job.get_status(refresh=True)` Possible values are `queued`, `started`, * `job.get_status(refresh=True)` Possible values are `queued`, `started`,
`deferred`, `finished`, `stopped`, `scheduled` and `failed`. If `refresh` is `deferred`, `finished`, `stopped`, `scheduled`, `canceled` and `failed`. If `refresh` is
`True` fresh values are fetched from Redis. `True` fresh values are fetched from Redis.
* `job.get_meta(refresh=True)` Returns custom `job.meta` dict containing user * `job.get_meta(refresh=True)` Returns custom `job.meta` dict containing user
stored data. If `refresh` is `True` fresh values are fetched from Redis. stored data. If `refresh` is `True` fresh values are fetched from Redis.
@ -126,7 +126,7 @@ for job in jobs:
``` ```
## Stopping a Currently Executing Job ## Stopping a Currently Executing Job
_New in version 1.7.0._ _New in version 1.7.0_
You can use `send_stop_job_command()` to tell a worker to immediately stop a currently executing job. A job that's stopped will be sent to [FailedJobRegistry](https://python-rq.org/docs/results/#dealing-with-exceptions). You can use `send_stop_job_command()` to tell a worker to immediately stop a currently executing job. A job that's stopped will be sent to [FailedJobRegistry](https://python-rq.org/docs/results/#dealing-with-exceptions).
@ -142,6 +142,35 @@ send_stop_job_command(redis, job_id)
Unlike failed jobs, stopped jobs will *not* be automatically retried if retry is configured. Subclasses of `Worker` which override `handle_job_failure()` should likewise take care to handle jobs with a `stopped` status appropriately. Unlike failed jobs, stopped jobs will *not* be automatically retried if retry is configured. Subclasses of `Worker` which override `handle_job_failure()` should likewise take care to handle jobs with a `stopped` status appropriately.
## Canceling a Job
_New in version 1.10.0_
To prevent a job from running, cancel a job, use `job.cancel()`.
```python
from redis import Redis
from rq.job import Job
from rq.registry import CanceledJobRegistry
from .queue import Queue
redis = Redis()
job = Job.fetch('my_job_id', connection=redis)
job.cancel()
job.get_status() # Job status is CANCELED
registry = CanceledJobRegistry(job.origin, connection=job.connection)
print(job in registry) # Job is in CanceledJobRegistry
```
Canceling a job will remove:
1. Sets job status to `CANCELED`
2. Removes job from queue
3. Puts job into `CanceledJobRegistry`
Note that `job.cancel()` does **not** delete the job itself from Redis. If you want to
delete the job from Redis and reclaim memory, use `job.delete()`.
## Job / Queue Creation with Custom Serializer ## Job / Queue Creation with Custom Serializer
When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments. When creating a job or queue, you can pass in a custom serializer that will be used for serializing / de-serializing job arguments.

@ -38,6 +38,7 @@ class JobStatus(str, Enum):
DEFERRED = 'deferred' DEFERRED = 'deferred'
SCHEDULED = 'scheduled' SCHEDULED = 'scheduled'
STOPPED = 'stopped' STOPPED = 'stopped'
CANCELED = 'canceled'
# Sentinel value to mark that some of our lazily evaluated properties have not # Sentinel value to mark that some of our lazily evaluated properties have not
@ -185,6 +186,10 @@ class Job:
def is_deferred(self): def is_deferred(self):
return self.get_status() == JobStatus.DEFERRED return self.get_status() == JobStatus.DEFERRED
@property
def is_canceled(self):
return self.get_status() == JobStatus.CANCELED
@property @property
def is_scheduled(self): def is_scheduled(self):
return self.get_status() == JobStatus.SCHEDULED return self.get_status() == JobStatus.SCHEDULED
@ -679,11 +684,18 @@ class Job:
without worrying about the internals required to implement job without worrying about the internals required to implement job
cancellation. cancellation.
""" """
from .queue import Queue
pipeline = pipeline or self.connection.pipeline() pipeline = pipeline or self.connection.pipeline()
if self.origin: if self.origin:
from .registry import CanceledJobRegistry
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection) q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
self.set_status(JobStatus.CANCELED, pipeline=pipeline)
registry = CanceledJobRegistry(self.origin, self.connection, job_class=self.__class__)
registry.add(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
def requeue(self): def requeue(self):
@ -694,10 +706,14 @@ class Job:
delete_dependents=False): delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending """Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well.""" on this job can optionally be deleted as well."""
if remove_from_queue:
self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if remove_from_queue:
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline)
if self.is_finished: if self.is_finished:
from .registry import FinishedJobRegistry from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin, registry = FinishedJobRegistry(self.origin,
@ -729,6 +745,12 @@ class Job:
elif self.is_failed: elif self.is_failed:
self.failed_job_registry.remove(self, pipeline=pipeline) self.failed_job_registry.remove(self, pipeline=pipeline)
elif self.is_canceled:
from .registry import CanceledJobRegistry
registry = CanceledJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
if delete_dependents: if delete_dependents:
self.delete_dependents(pipeline=pipeline) self.delete_dependents(pipeline=pipeline)

@ -260,8 +260,7 @@ class Queue:
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None: if pipeline is not None:
pipeline.lrem(self.key, 1, job_id) return pipeline.lrem(self.key, 1, job_id)
return
return self.connection.lrem(self.key, 1, job_id) return self.connection.lrem(self.key, 1, job_id)

@ -107,6 +107,28 @@ class BaseRegistry:
score = self.connection.zscore(self.key, job.id) score = self.connection.zscore(self.key, job.id)
return datetime.utcfromtimestamp(score) return datetime.utcfromtimestamp(score)
def requeue(self, job_or_id):
"""Requeues the job with the given job ID."""
if isinstance(job_or_id, self.job_class):
job = job_or_id
else:
job = self.job_class.fetch(job_or_id, connection=self.connection)
result = self.connection.zrem(self.key, job.id)
if not result:
raise InvalidJobOperation
with self.connection.pipeline() as pipeline:
queue = Queue(job.origin, connection=self.connection,
job_class=self.job_class)
job.started_at = None
job.ended_at = None
job.exc_info = ''
job.save()
job = queue.enqueue_job(job, pipeline=pipeline)
pipeline.execute()
return job
class StartedJobRegistry(BaseRegistry): class StartedJobRegistry(BaseRegistry):
""" """
@ -215,28 +237,6 @@ class FailedJobRegistry(BaseRegistry):
if not pipeline: if not pipeline:
p.execute() p.execute()
def requeue(self, job_or_id):
"""Requeues the job with the given job ID."""
if isinstance(job_or_id, self.job_class):
job = job_or_id
else:
job = self.job_class.fetch(job_or_id, connection=self.connection)
result = self.connection.zrem(self.key, job.id)
if not result:
raise InvalidJobOperation
with self.connection.pipeline() as pipeline:
queue = Queue(job.origin, connection=self.connection,
job_class=self.job_class)
job.started_at = None
job.ended_at = None
job.exc_info = ''
job.save()
job = queue.enqueue_job(job, pipeline=pipeline)
pipeline.execute()
return job
class DeferredJobRegistry(BaseRegistry): class DeferredJobRegistry(BaseRegistry):
""" """
@ -315,6 +315,13 @@ class ScheduledJobRegistry(BaseRegistry):
return datetime.fromtimestamp(score, tz=timezone.utc) return datetime.fromtimestamp(score, tz=timezone.utc)
class CanceledJobRegistry(BaseRegistry):
key_template = 'rq:canceled:{0}'
def get_expired_job_ids(self, timestamp=None):
raise NotImplementedError
def clean_registries(queue): def clean_registries(queue):
"""Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, registry = FinishedJobRegistry(name=queue.name,

@ -12,7 +12,7 @@ from rq.compat import as_text
from rq.exceptions import DeserializationError, NoSuchJobError from rq.exceptions import DeserializationError, NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, StartedJobRegistry, FinishedJobRegistry, StartedJobRegistry,
ScheduledJobRegistry) ScheduledJobRegistry)
from rq.utils import utcformat, utcnow from rq.utils import utcformat, utcnow
@ -797,12 +797,19 @@ class TestJob(RQTestCase):
self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(0, len(queue.get_jobs()))
def test_create_and_cancel_job(self): def test_create_and_cancel_job(self):
"""test creating and using cancel_job deletes job properly""" """Ensure job.cancel() works properly"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
self.assertEqual(1, len(queue.get_jobs())) self.assertEqual(1, len(queue.get_jobs()))
cancel_job(job.id) cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(0, len(queue.get_jobs()))
registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
self.assertIn(job, registry)
self.assertEqual(job.get_status(), JobStatus.CANCELED)
# If job is deleted, it's also removed from CanceledJobRegistry
job.delete()
self.assertNotIn(job, registry)
def test_dependents_key_for_should_return_prefixed_job_id(self): def test_dependents_key_for_should_return_prefixed_job_id(self):
"""test redis key to store job dependents hash under""" """test redis key to store job dependents hash under"""

@ -10,7 +10,7 @@ from rq.job import Job, JobStatus, requeue_job
from rq.queue import Queue from rq.queue import Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.registry import (clean_registries, DeferredJobRegistry, from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry,
FailedJobRegistry, FinishedJobRegistry, FailedJobRegistry, FinishedJobRegistry,
StartedJobRegistry) StartedJobRegistry)
@ -133,6 +133,10 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
['foo', 'bar']) ['foo', 'bar'])
# CanceledJobRegistry does not implement get_expired_job_ids()
registry = CanceledJobRegistry(connection=self.testconn)
self.assertRaises(NotImplementedError, registry.get_expired_job_ids)
def test_cleanup_moves_jobs_to_failed_job_registry(self): def test_cleanup_moves_jobs_to_failed_job_registry(self):
"""Moving expired jobs to FailedJobRegistry.""" """Moving expired jobs to FailedJobRegistry."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)

@ -580,7 +580,7 @@ class TestWorker(RQTestCase):
self.assertTrue(job.meta['first_handler']) self.assertTrue(job.meta['first_handler'])
self.assertEqual(job.meta.get('second_handler'), None) self.assertEqual(job.meta.get('second_handler'), None)
def test_cancelled_jobs_arent_executed(self): def test_deleted_jobs_arent_executed(self):
"""Cancelling jobs.""" """Cancelling jobs."""
SENTINEL_FILE = '/tmp/rq-tests.txt' # noqa SENTINEL_FILE = '/tmp/rq-tests.txt' # noqa

Loading…
Cancel
Save