Merge pull request #810 from selwin/job-deletion-improvements

job.delete() should clean itself from FailedQueue and other job registries
main
Selwin Ong 8 years ago committed by GitHub
commit 518e4d1726

@ -501,13 +501,10 @@ class Job(object):
without worrying about the internals required to implement job without worrying about the internals required to implement job
cancellation. cancellation.
""" """
from .queue import Queue, get_failed_queue from .queue import Queue
pipeline = self.connection._pipeline() pipeline = self.connection._pipeline()
if self.origin: if self.origin:
q = (get_failed_queue(connection=self.connection, q = Queue(name=self.origin, connection=self.connection)
job_class=self.__class__)
if self.is_failed
else Queue(name=self.origin, connection=self.connection))
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
@ -516,9 +513,38 @@ class Job(object):
if remove_from_queue: if remove_from_queue:
self.cancel() self.cancel()
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if self.get_status() == JobStatus.FINISHED:
from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.DEFERRED:
from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.STARTED:
from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin,
connection=self.connection,
job_class=self.__class__)
registry.remove(self, pipeline=pipeline)
elif self.get_status() == JobStatus.FAILED:
from .queue import get_failed_queue
failed_queue = get_failed_queue(connection=self.connection,
job_class=self.__class__)
failed_queue.remove(self, pipeline=pipeline)
connection.delete(self.key) connection.delete(self.key)
connection.delete(self.dependents_key) connection.delete(self.dependents_key)
# Job execution # Job execution
def perform(self): # noqa def perform(self): # noqa
"""Invokes the job function with the job arguments.""" """Invokes the job function with the job arguments."""

@ -380,20 +380,6 @@ class TestJob(RQTestCase):
job.cleanup(ttl=0) job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_register_dependency(self):
"""Ensure dependency registration works properly."""
origin = 'some_queue'
registry = DeferredJobRegistry(origin, self.testconn)
job = Job.create(func=fixtures.say_hello, origin=origin)
job._dependency_id = 'id'
job.save()
self.assertEqual(registry.get_job_ids(), [])
job.register_dependency()
self.assertEqual(as_text(self.testconn.spop('rq:job:id:dependents')), job.id)
self.assertEqual(registry.get_job_ids(), [job.id])
def test_delete(self): def test_delete(self):
"""job.delete() deletes itself & dependents mapping from Redis.""" """job.delete() deletes itself & dependents mapping from Redis."""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)

@ -633,3 +633,20 @@ class TestFailedQueue(RQTestCase):
skip_job = q.enqueue(say_hello, at_front=True) skip_job = q.enqueue(say_hello, at_front=True)
assert q.dequeue() == skip_job assert q.dequeue() == skip_job
assert q.dequeue() == job2 assert q.dequeue() == job2
def test_job_deletion(self):
"""Ensure job.delete() removes itself from FailedQueue."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.timeout = 200
job.save()
job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error'))
self.assertTrue(job.id in failed_queue.get_job_ids())
job.delete()
self.assertFalse(job.id in failed_queue.get_job_ids())

@ -113,6 +113,21 @@ class TestRegistry(RQTestCase):
worker.perform_job(job, queue) worker.perform_job(job, queue)
self.assertNotIn(job.id, registry.get_job_ids()) self.assertNotIn(job.id, registry.get_job_ids())
def test_job_deletion(self):
"""Ensure job is removed from StartedJobRegistry when deleted."""
registry = StartedJobRegistry(connection=self.testconn)
queue = Queue(connection=self.testconn)
worker = Worker([queue])
job = queue.enqueue(say_hello)
self.assertTrue(job.is_queued)
worker.prepare_job_execution(job)
self.assertIn(job.id, registry.get_job_ids())
job.delete()
self.assertNotIn(job.id, registry.get_job_ids())
def test_get_job_count(self): def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count.""" """StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10 timestamp = current_timestamp() + 10
@ -170,10 +185,15 @@ class TestFinishedJobRegistry(RQTestCase):
worker.perform_job(job, queue) worker.perform_job(job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [job.id])
# When job is deleted, it should be removed from FinishedJobRegistry
self.assertEqual(job.get_status(), JobStatus.FINISHED)
job.delete()
self.assertEqual(self.registry.get_job_ids(), [])
# Failed jobs are not put in FinishedJobRegistry # Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero) failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job, queue) worker.perform_job(failed_job, queue)
self.assertEqual(self.registry.get_job_ids(), [job.id]) self.assertEqual(self.registry.get_job_ids(), [])
class TestDeferredRegistry(RQTestCase): class TestDeferredRegistry(RQTestCase):
@ -192,3 +212,16 @@ class TestDeferredRegistry(RQTestCase):
job_ids = [as_text(job_id) for job_id in job_ids = [as_text(job_id) for job_id in
self.testconn.zrange(self.registry.key, 0, -1)] self.testconn.zrange(self.registry.key, 0, -1)]
self.assertEqual(job_ids, [job.id]) self.assertEqual(job_ids, [job.id])
def test_register_dependency(self):
"""Ensure job creation and deletion works properly with DeferredJobRegistry."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
job2 = queue.enqueue(say_hello, depends_on=job)
registry = DeferredJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_job_ids(), [job2.id])
# When deleted, job removes itself from DeferredJobRegistry
job2.delete()
self.assertEqual(registry.get_job_ids(), [])

Loading…
Cancel
Save