diff --git a/rq/job.py b/rq/job.py index eb83250..5fd744e 100644 --- a/rq/job.py +++ b/rq/job.py @@ -194,6 +194,12 @@ class Job(object): self._dependency = job return job + @property + def dependent_ids(self): + """Returns a list of ids of jobs whose execution depends on this + job's successful execution.""" + return list(map(as_text, self.connection.smembers(self.dependents_key))) + @property def func(self): func_name = self.func_name @@ -358,7 +364,7 @@ class Job(object): @classmethod def dependents_key_for(cls, job_id): - """The Redis key that is used to store job hash under.""" + """The Redis key that is used to store job dependents hash under.""" return 'rq:job:{0}:dependents'.format(job_id) @property @@ -368,7 +374,7 @@ class Job(object): @property def dependents_key(self): - """The Redis key that is used to store job hash under.""" + """The Redis key that is used to store job dependents hash under.""" return self.dependents_key_for(self.id) @property @@ -513,7 +519,7 @@ class Job(object): meta = dumps(self.meta) self.connection.hset(self.key, 'meta', meta) - def cancel(self): + def cancel(self, pipeline=None): """Cancels the given job, which will prevent the job from ever being ran (or inspected). @@ -522,16 +528,19 @@ class Job(object): cancellation. """ from .queue import Queue - pipeline = self.connection._pipeline() + pipeline = pipeline or self.connection._pipeline() if self.origin: q = Queue(name=self.origin, connection=self.connection) q.remove(self, pipeline=pipeline) pipeline.execute() - def delete(self, pipeline=None, remove_from_queue=True): - """Cancels the job and deletes the job hash from Redis.""" + def delete(self, pipeline=None, remove_from_queue=True, + delete_dependents=False): + """Cancels the job and deletes the job hash from Redis. Jobs depending + on this job can optionally be deleted as well.""" + if remove_from_queue: - self.cancel() + self.cancel(pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection if self.get_status() == JobStatus.FINISHED: @@ -561,9 +570,25 @@ class Job(object): job_class=self.__class__) failed_queue.remove(self, pipeline=pipeline) + if delete_dependents: + self.delete_dependents(pipeline=pipeline) + connection.delete(self.key) connection.delete(self.dependents_key) + def delete_dependents(self, pipeline=None): + """Delete jobs depending on this job.""" + connection = pipeline if pipeline is not None else self.connection + for dependent_id in self.dependent_ids: + try: + job = Job.fetch(dependent_id, connection=self.connection) + job.delete(pipeline=pipeline, + remove_from_queue=False) + except NoSuchJobError: + # It could be that the dependent job was never saved to redis + pass + connection.delete(self.dependents_key) + # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" diff --git a/tests/test_job.py b/tests/test_job.py index b3512a4..f12566e 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -462,16 +462,76 @@ class TestJob(RQTestCase): job.cleanup(ttl=0) self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn) - def test_delete(self): - """job.delete() deletes itself & dependents mapping from Redis.""" + def test_job_with_dependents_delete_parent(self): + """job.delete() deletes itself from Redis but not dependents. + Wthout a save, the dependent job is never saved into redis. The delete + method will get and pass a NoSuchJobError. + """ queue = Queue(connection=self.testconn) job = queue.enqueue(fixtures.say_hello) job2 = Job.create(func=fixtures.say_hello, depends_on=job) job2.register_dependency() + job.delete() self.assertFalse(self.testconn.exists(job.key)) self.assertFalse(self.testconn.exists(job.dependents_key)) + # By default, dependents are not deleted, but The job is in redis only + # if it was saved! + self.assertFalse(self.testconn.exists(job2.key)) + + self.assertNotIn(job.id, queue.get_job_ids()) + + def test_job_with_dependents_delete_parent_with_saved(self): + """job.delete() deletes itself from Redis but not dependents. If the + dependent job was saved, it will remain in redis.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(fixtures.say_hello) + job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2.register_dependency() + job2.save() + + job.delete() + self.assertFalse(self.testconn.exists(job.key)) + self.assertFalse(self.testconn.exists(job.dependents_key)) + + # By default, dependents are not deleted, but The job is in redis only + # if it was saved! + self.assertTrue(self.testconn.exists(job2.key)) + + self.assertNotIn(job.id, queue.get_job_ids()) + + def test_job_with_dependents_deleteall(self): + """job.delete() deletes itself from Redis. Dependents need to be + deleted explictely.""" + queue = Queue(connection=self.testconn) + job = queue.enqueue(fixtures.say_hello) + job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2.register_dependency() + + job.delete(delete_dependents=True) + self.assertFalse(self.testconn.exists(job.key)) + self.assertFalse(self.testconn.exists(job.dependents_key)) + self.assertFalse(self.testconn.exists(job2.key)) + + self.assertNotIn(job.id, queue.get_job_ids()) + + def test_job_with_dependents_delete_all_with_saved(self): + """job.delete() deletes itself from Redis. Dependents need to be + deleted explictely. Without a save, the dependent job is never saved + into redis. The delete method will get and pass a NoSuchJobError. + """ + queue = Queue(connection=self.testconn) + job = queue.enqueue(fixtures.say_hello) + job2 = Job.create(func=fixtures.say_hello, depends_on=job) + job2.register_dependency() + job2.save() + + job.delete(delete_dependents=True) + self.assertFalse(self.testconn.exists(job.key)) + self.assertFalse(self.testconn.exists(job.dependents_key)) + self.assertFalse(self.testconn.exists(job2.key)) + self.assertNotIn(job.id, queue.get_job_ids()) def test_create_job_with_id(self):