Fix `job.cancel` to remove job from registries if not in queue (#1564)

* Fix `job.cancel` to remove job from registiries if not in queue

* Remove old queue remove call

* Block the ability to cancel job that are already canceled

* Fix py35 compat

* Rename helper method
main
Josh Cohen 3 years ago committed by GitHub
parent 47110806d1
commit e71fcb952e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,7 +20,7 @@ from redis import WatchError
from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import DeserializationError, NoSuchJobError
from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from .local import LocalStack
from .serializers import resolve_serializer
from .utils import (get_version, import_attribute, parse_timeout, str_to_date,
@ -690,6 +690,8 @@ class Job:
Same pipelining behavior as Queue.enqueue_dependents on whether or not a pipeline is passed in.
"""
if self.is_canceled:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
from .registry import CanceledJobRegistry
from .queue import Queue
pipe = pipeline or self.connection.pipeline()
@ -706,7 +708,10 @@ class Job:
if pipeline is None:
pipe.watch(self.dependents_key)
q.enqueue_dependents(self, pipeline=pipeline)
q.remove(self, pipeline=pipe)
self._remove_from_registries(
pipeline=pipe,
remove_from_queue=True
)
self.set_status(JobStatus.CANCELED, pipeline=pipe)
@ -733,13 +738,7 @@ class Job:
"""Requeues job."""
return self.failed_job_registry.requeue(self)
def delete(self, pipeline=None, remove_from_queue=True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
connection = pipeline if pipeline is not None else self.connection
def _remove_from_registries(self, pipeline=None, remove_from_queue=True):
if remove_from_queue:
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
@ -787,6 +786,15 @@ class Job:
serializer=self.serializer)
registry.remove(self, pipeline=pipeline)
def delete(self, pipeline=None, remove_from_queue=True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
connection = pipeline if pipeline is not None else self.connection
self._remove_from_registries(pipeline=pipeline, remove_from_queue=True)
if delete_dependents:
self.delete_dependents(pipeline=pipeline)

@ -10,7 +10,7 @@ from datetime import datetime, timedelta
from redis import WatchError
from rq.compat import as_text
from rq.exceptions import DeserializationError, NoSuchJobError
from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
from rq.job import Job, JobStatus, cancel_job, get_current_job
from rq.queue import Queue
from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
@ -812,6 +812,27 @@ class TestJob(RQTestCase):
job.delete()
self.assertNotIn(job, registry)
def test_create_and_cancel_job_fails_already_canceled(self):
"""Ensure job.cancel() fails on already canceld job"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello, job_id='fake_job_id')
self.assertEqual(1, len(queue.get_jobs()))
# First cancel should be fine
cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs()))
registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
self.assertIn(job, registry)
self.assertEqual(job.get_status(), JobStatus.CANCELED)
# Second cancel should fail
self.assertRaisesRegex(
InvalidJobOperation,
r'Cannot cancel already canceled job: fake_job_id',
cancel_job,
job.id
)
def test_create_and_cancel_job_enqueue_dependents(self):
"""Ensure job.cancel() works properly with enqueue_dependents=True"""
queue = Queue(connection=self.testconn)
@ -832,6 +853,38 @@ class TestJob(RQTestCase):
dependency.delete()
self.assertNotIn(dependency, registry)
def test_create_and_cancel_job_enqueue_dependents_in_registry(self):
"""Ensure job.cancel() works properly with enqueue_dependents=True and when the job is in a registry"""
queue = Queue(connection=self.testconn)
dependency = queue.enqueue(fixtures.raise_exc)
dependent = queue.enqueue(fixtures.say_hello, depends_on=dependency)
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
w = Worker([queue])
w.work(burst=True, max_jobs=1)
dependency.refresh()
dependent.refresh()
self.assertEqual(0, len(queue.get_jobs()))
self.assertEqual(1, len(queue.deferred_job_registry))
self.assertEqual(1, len(queue.failed_job_registry))
cancel_job(dependency.id, enqueue_dependents=True)
dependency.refresh()
dependent.refresh()
self.assertEqual(1, len(queue.get_jobs()))
self.assertEqual(0, len(queue.deferred_job_registry))
self.assertEqual(0, len(queue.failed_job_registry))
self.assertEqual(1, len(queue.canceled_job_registry))
registry = CanceledJobRegistry(connection=self.testconn, queue=queue)
self.assertIn(dependency, registry)
self.assertEqual(dependency.get_status(), JobStatus.CANCELED)
self.assertNotIn(dependency, queue.failed_job_registry)
self.assertIn(dependent, queue.get_jobs())
self.assertEqual(dependent.get_status(), JobStatus.QUEUED)
# If job is deleted, it's also removed from CanceledJobRegistry
dependency.delete()
self.assertNotIn(dependency, registry)
def test_create_and_cancel_job_enqueue_dependents_with_pipeline(self):
"""Ensure job.cancel() works properly with enqueue_dependents=True"""
queue = Queue(connection=self.testconn)

Loading…
Cancel
Save