Add missing functionality for CanceledJobRegistry (#1560)

main
Josh Cohen 3 years ago committed by GitHub
parent 9c3afb87e4
commit a3fba1ca1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -256,6 +256,12 @@ class Queue:
from rq.registry import ScheduledJobRegistry from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property
def canceled_job_registry(self):
"""Returns this queue's CanceledJobRegistry."""
from rq.registry import CanceledJobRegistry
return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id, pipeline=None): def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id

@ -327,6 +327,12 @@ class CanceledJobRegistry(BaseRegistry):
def get_expired_job_ids(self, timestamp=None): def get_expired_job_ids(self, timestamp=None):
raise NotImplementedError raise NotImplementedError
def cleanup(self):
"""This method is only here to prevent errors because this method is
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass
def clean_registries(queue): def clean_registries(queue):
"""Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue."""

@ -9,7 +9,7 @@ from mock.mock import patch
from rq import Retry, Queue from rq import Retry, Queue
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import (DeferredJobRegistry, FailedJobRegistry, from rq.registry import (CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry,
FinishedJobRegistry, ScheduledJobRegistry, FinishedJobRegistry, ScheduledJobRegistry,
StartedJobRegistry) StartedJobRegistry)
from rq.worker import Worker from rq.worker import Worker
@ -776,6 +776,7 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
def test_getting_registries_with_serializer(self): def test_getting_registries_with_serializer(self):
"""Getting job registries from queue object (with custom serializer)""" """Getting job registries from queue object (with custom serializer)"""
@ -785,6 +786,7 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue)) self.assertEqual(queue.failed_job_registry, FailedJobRegistry(queue=queue))
self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue)) self.assertEqual(queue.deferred_job_registry, DeferredJobRegistry(queue=queue))
self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue)) self.assertEqual(queue.finished_job_registry, FinishedJobRegistry(queue=queue))
self.assertEqual(queue.canceled_job_registry, CanceledJobRegistry(queue=queue))
# Make sure we don't use default when queue has custom # Make sure we don't use default when queue has custom
self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer) self.assertEqual(queue.scheduled_job_registry.serializer, JSONSerializer)
@ -792,6 +794,7 @@ class TestQueue(RQTestCase):
self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer) self.assertEqual(queue.failed_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer) self.assertEqual(queue.deferred_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer) self.assertEqual(queue.finished_job_registry.serializer, JSONSerializer)
self.assertEqual(queue.canceled_job_registry.serializer, JSONSerializer)
def test_enqueue_with_retry(self): def test_enqueue_with_retry(self):
"""Enqueueing with retry_strategy works""" """Enqueueing with retry_strategy works"""

@ -220,6 +220,8 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.count, 2) self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2) self.assertEqual(len(self.registry), 2)
# Make sure
def test_clean_registries(self): def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries.""" """clean_registries() cleans Started and Finished job registries."""
@ -290,6 +292,10 @@ class TestFinishedJobRegistry(RQTestCase):
self.registry.cleanup(timestamp + 20) self.registry.cleanup(timestamp + 20)
self.assertEqual(self.registry.get_job_ids(), ['baz']) self.assertEqual(self.registry.get_job_ids(), ['baz'])
# CanceledJobRegistry now implements noop cleanup, should not raise exception
registry = CanceledJobRegistry(connection=self.testconn)
registry.cleanup()
def test_jobs_are_put_in_registry(self): def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry.""" """Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), []) self.assertEqual(self.registry.get_job_ids(), [])

Loading…
Cancel
Save