Delete dependents of job explicitely (#916)

* Initial take on delete_dependents

* Add tests including corner cases

* No need to canel dependents since they are not in a queue yet anyway

* The dependents keys can be deleted in all cases

* Update tests to included saved jobs in the deletion tests

* Correctly use pipeline in cancel method

* Unused connection

* Include dependents into dict format of job

* Add TODO

* Address comments from selwin

* Delete dependents key in redis if delete_dependents is called on its own

* Address recent comments from selwin

* Small change to trigger travis

* Remove TODO referring to canceled job state

* Remove dependent_ids from to_dict

* Address recent comments from selwin
main
Christophe Olinger 7 years ago committed by Selwin Ong
parent 0a0e37bc50
commit a6eb5d37ee

@ -194,6 +194,12 @@ class Job(object):
self._dependency = job self._dependency = job
return job return job
@property
def dependent_ids(self):
"""Returns a list of ids of jobs whose execution depends on this
job's successful execution."""
return list(map(as_text, self.connection.smembers(self.dependents_key)))
@property @property
def func(self): def func(self):
func_name = self.func_name func_name = self.func_name
@ -358,7 +364,7 @@ class Job(object):
@classmethod @classmethod
def dependents_key_for(cls, job_id): def dependents_key_for(cls, job_id):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job dependents hash under."""
return 'rq:job:{0}:dependents'.format(job_id) return 'rq:job:{0}:dependents'.format(job_id)
@property @property
@ -368,7 +374,7 @@ class Job(object):
@property @property
def dependents_key(self): def dependents_key(self):
"""The Redis key that is used to store job hash under.""" """The Redis key that is used to store job dependents hash under."""
return self.dependents_key_for(self.id) return self.dependents_key_for(self.id)
@property @property
@ -513,7 +519,7 @@ class Job(object):
meta = dumps(self.meta) meta = dumps(self.meta)
self.connection.hset(self.key, 'meta', meta) self.connection.hset(self.key, 'meta', meta)
def cancel(self): def cancel(self, pipeline=None):
"""Cancels the given job, which will prevent the job from ever being """Cancels the given job, which will prevent the job from ever being
ran (or inspected). ran (or inspected).
@ -522,16 +528,19 @@ class Job(object):
cancellation. cancellation.
""" """
from .queue import Queue from .queue import Queue
pipeline = self.connection._pipeline() pipeline = pipeline or self.connection._pipeline()
if self.origin: if self.origin:
q = Queue(name=self.origin, connection=self.connection) q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
pipeline.execute() pipeline.execute()
def delete(self, pipeline=None, remove_from_queue=True): def delete(self, pipeline=None, remove_from_queue=True,
"""Cancels the job and deletes the job hash from Redis.""" delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
if remove_from_queue: if remove_from_queue:
self.cancel() self.cancel(pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if self.get_status() == JobStatus.FINISHED: if self.get_status() == JobStatus.FINISHED:
@ -561,9 +570,25 @@ class Job(object):
job_class=self.__class__) job_class=self.__class__)
failed_queue.remove(self, pipeline=pipeline) failed_queue.remove(self, pipeline=pipeline)
if delete_dependents:
self.delete_dependents(pipeline=pipeline)
connection.delete(self.key) connection.delete(self.key)
connection.delete(self.dependents_key) connection.delete(self.dependents_key)
def delete_dependents(self, pipeline=None):
"""Delete jobs depending on this job."""
connection = pipeline if pipeline is not None else self.connection
for dependent_id in self.dependent_ids:
try:
job = Job.fetch(dependent_id, connection=self.connection)
job.delete(pipeline=pipeline,
remove_from_queue=False)
except NoSuchJobError:
# It could be that the dependent job was never saved to redis
pass
connection.delete(self.dependents_key)
# Job execution # Job execution
def perform(self): # noqa def perform(self): # noqa
"""Invokes the job function with the job arguments.""" """Invokes the job function with the job arguments."""

@ -462,16 +462,76 @@ class TestJob(RQTestCase):
job.cleanup(ttl=0) job.cleanup(ttl=0)
self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)
def test_delete(self): def test_job_with_dependents_delete_parent(self):
"""job.delete() deletes itself & dependents mapping from Redis.""" """job.delete() deletes itself from Redis but not dependents.
Wthout a save, the dependent job is never saved into redis. The delete
method will get and pass a NoSuchJobError.
"""
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello) job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency() job2.register_dependency()
job.delete() job.delete()
self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key)) self.assertFalse(self.testconn.exists(job.dependents_key))
# By default, dependents are not deleted, but The job is in redis only
# if it was saved!
self.assertFalse(self.testconn.exists(job2.key))
self.assertNotIn(job.id, queue.get_job_ids())
def test_job_with_dependents_delete_parent_with_saved(self):
"""job.delete() deletes itself from Redis but not dependents. If the
dependent job was saved, it will remain in redis."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency()
job2.save()
job.delete()
self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key))
# By default, dependents are not deleted, but The job is in redis only
# if it was saved!
self.assertTrue(self.testconn.exists(job2.key))
self.assertNotIn(job.id, queue.get_job_ids())
def test_job_with_dependents_deleteall(self):
"""job.delete() deletes itself from Redis. Dependents need to be
deleted explictely."""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency()
job.delete(delete_dependents=True)
self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key))
self.assertFalse(self.testconn.exists(job2.key))
self.assertNotIn(job.id, queue.get_job_ids())
def test_job_with_dependents_delete_all_with_saved(self):
"""job.delete() deletes itself from Redis. Dependents need to be
deleted explictely. Without a save, the dependent job is never saved
into redis. The delete method will get and pass a NoSuchJobError.
"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
job2 = Job.create(func=fixtures.say_hello, depends_on=job)
job2.register_dependency()
job2.save()
job.delete(delete_dependents=True)
self.assertFalse(self.testconn.exists(job.key))
self.assertFalse(self.testconn.exists(job.dependents_key))
self.assertFalse(self.testconn.exists(job2.key))
self.assertNotIn(job.id, queue.get_job_ids()) self.assertNotIn(job.id, queue.get_job_ids())
def test_create_job_with_id(self): def test_create_job_with_id(self):

Loading…
Cancel
Save