diff --git a/rq/job.py b/rq/job.py index cd4426e..d90a1e8 100644 --- a/rq/job.py +++ b/rq/job.py @@ -501,13 +501,10 @@ class Job(object): without worrying about the internals required to implement job cancellation. """ - from .queue import Queue, get_failed_queue + from .queue import Queue pipeline = self.connection._pipeline() if self.origin: - q = (get_failed_queue(connection=self.connection, - job_class=self.__class__) - if self.is_failed - else Queue(name=self.origin, connection=self.connection)) + q = Queue(name=self.origin, connection=self.connection) q.remove(self, pipeline=pipeline) pipeline.execute() @@ -516,9 +513,38 @@ class Job(object): if remove_from_queue: self.cancel() connection = pipeline if pipeline is not None else self.connection + + if self.get_status() == JobStatus.FINISHED: + from .registry import FinishedJobRegistry + registry = FinishedJobRegistry(self.origin, + connection=self.connection, + job_class=self.__class__) + registry.remove(self, pipeline=pipeline) + + elif self.get_status() == JobStatus.DEFERRED: + from .registry import DeferredJobRegistry + registry = DeferredJobRegistry(self.origin, + connection=self.connection, + job_class=self.__class__) + registry.remove(self, pipeline=pipeline) + + elif self.get_status() == JobStatus.STARTED: + from .registry import StartedJobRegistry + registry = StartedJobRegistry(self.origin, + connection=self.connection, + job_class=self.__class__) + registry.remove(self, pipeline=pipeline) + + elif self.get_status() == JobStatus.FAILED: + from .queue import get_failed_queue + failed_queue = get_failed_queue(connection=self.connection, + job_class=self.__class__) + failed_queue.remove(self, pipeline=pipeline) + connection.delete(self.key) connection.delete(self.dependents_key) + # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" diff --git a/tests/test_job.py b/tests/test_job.py index a41a199..660fd8d 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -380,20 +380,6 @@ class TestJob(RQTestCase): job.cleanup(ttl=0) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) - def test_register_dependency(self): - """Ensure dependency registration works properly.""" - origin = 'some_queue' - registry = DeferredJobRegistry(origin, self.testconn) - - job = Job.create(func=fixtures.say_hello, origin=origin) - job._dependency_id = 'id' - job.save() - - self.assertEqual(registry.get_job_ids(), []) - job.register_dependency() - self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id) - self.assertEqual(registry.get_job_ids(), [job.id]) - def test_delete(self): """job.delete() deletes itself & dependents mapping from Redis.""" queue = Queue(connection=self.testconn) diff --git a/tests/test_queue.py b/tests/test_queue.py index 8081313..294fb62 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -633,3 +633,20 @@ class TestFailedQueue(RQTestCase): skip_job = q.enqueue(say_hello, at_front=True) assert q.dequeue() == skip_job assert q.dequeue() == job2 + + def test_job_deletion(self): + """Ensure job.delete() removes itself from FailedQueue.""" + job = Job.create(func=div_by_zero, args=(1, 2, 3)) + job.origin = 'fake' + job.timeout = 200 + job.save() + + job.set_status(JobStatus.FAILED) + + failed_queue = get_failed_queue() + failed_queue.quarantine(job, Exception('Some fake error')) + + self.assertTrue(job.id in failed_queue.get_job_ids()) + + job.delete() + self.assertFalse(job.id in failed_queue.get_job_ids()) diff --git a/tests/test_registry.py b/tests/test_registry.py index 726f79b..3b2885d 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -113,6 +113,21 @@ class TestRegistry(RQTestCase): worker.perform_job(job, queue) self.assertNotIn(job.id, registry.get_job_ids()) + def test_job_deletion(self): + """Ensure job is removed from StartedJobRegistry when deleted.""" + registry = StartedJobRegistry(connection=self.testconn) + queue = Queue(connection=self.testconn) + worker = Worker([queue]) + + job = queue.enqueue(say_hello) + self.assertTrue(job.is_queued) + + worker.prepare_job_execution(job) + self.assertIn(job.id, registry.get_job_ids()) + + job.delete() + self.assertNotIn(job.id, registry.get_job_ids()) + def test_get_job_count(self): """StartedJobRegistry returns the right number of job count.""" timestamp = current_timestamp() + 10 @@ -170,10 +185,15 @@ class TestFinishedJobRegistry(RQTestCase): worker.perform_job(job, queue) self.assertEqual(self.registry.get_job_ids(), [job.id]) + # When job is deleted, it should be removed from FinishedJobRegistry + self.assertEqual(job.get_status(), JobStatus.FINISHED) + job.delete() + self.assertEqual(self.registry.get_job_ids(), []) + # Failed jobs are not put in FinishedJobRegistry failed_job = queue.enqueue(div_by_zero) worker.perform_job(failed_job, queue) - self.assertEqual(self.registry.get_job_ids(), [job.id]) + self.assertEqual(self.registry.get_job_ids(), []) class TestDeferredRegistry(RQTestCase): @@ -192,3 +212,16 @@ class TestDeferredRegistry(RQTestCase): job_ids = [as_text(job_id) for job_id in self.testconn.zrange(self.registry.key, 0, -1)] self.assertEqual(job_ids, [job.id]) + + def test_register_dependency(self): + """Ensure job creation and deletion works properly with DeferredJobRegistry.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello) + job2 = queue.enqueue(say_hello, depends_on=job) + + registry = DeferredJobRegistry(connection=self.testconn) + self.assertEqual(registry.get_job_ids(), [job2.id]) + + # When deleted, job removes itself from DeferredJobRegistry + job2.delete() + self.assertEqual(registry.get_job_ids(), [])