diff --git a/rq/job.py b/rq/job.py index 0ccf736..cabd3f9 100644 --- a/rq/job.py +++ b/rq/job.py @@ -480,18 +480,18 @@ class Job(object): def save(self, pipeline=None, include_meta=True): """ - Persists the current job instance to its corresponding Redis key. + Dumps the current job instance to its corresponding Redis key. - Exclude persisting the `meta` dictionary by setting + Exclude saving the `meta` dictionary by setting `include_meta=False`. This is useful to prevent clobbering user metadata without an expensive `refresh()` call first. + Redis key persistence may be altered by `cleanup()` method. """ key = self.key connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict(include_meta=include_meta)) - self.cleanup(self.ttl, pipeline=connection) def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" @@ -549,12 +549,10 @@ class Job(object): connection.delete(self.key) connection.delete(self.dependents_key) - # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" self.connection.persist(self.key) - self.ttl = -1 _job_stack.push(self.id) try: self._result = self._execute() @@ -609,9 +607,12 @@ class Job(object): self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) elif not ttl: return - elif ttl > 0: + else: connection = pipeline if pipeline is not None else self.connection - connection.expire(self.key, ttl) + if ttl > 0: + connection.expire(self.key, ttl) + else: + connection.persist(self.key) def register_dependency(self, pipeline=None): """Jobs may have dependencies. Jobs are enqueued only if the job they diff --git a/rq/queue.py b/rq/queue.py index 090997e..bd3a6ec 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -223,6 +223,7 @@ class Queue(object): job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) pipe.execute() return job break @@ -296,6 +297,7 @@ class Queue(object): if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) if self._async: self.push_job_id(job.id, pipeline=pipe, at_front=at_front) @@ -492,6 +494,7 @@ class FailedQueue(Queue): job.ended_at = utcnow() job.exc_info = exc_info job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire self.push_job_id(job.id, pipeline=pipeline) pipeline.execute() diff --git a/rq/registry.py b/rq/registry.py index 168dd8e..51e6dd0 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -93,6 +93,7 @@ class StartedJobRegistry(BaseRegistry): connection=self.connection) job.set_status(JobStatus.FAILED) job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline) except NoSuchJobError: pass diff --git a/tests/test_job.py b/tests/test_job.py index 1491f7f..46d7d39 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -386,7 +386,7 @@ class TestJob(RQTestCase): self.assertEqual(job.get_ttl(), ttl) job.save() job.perform() - self.assertEqual(job.get_ttl(), -1) + self.assertEqual(job.get_ttl(), ttl) self.assertTrue(job.exists(job.id)) self.assertEqual(job.result, 'Done sleeping...') diff --git a/tests/test_queue.py b/tests/test_queue.py index 294fb62..3eea3fb 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -650,3 +650,15 @@ class TestFailedQueue(RQTestCase): job.delete() self.assertFalse(job.id in failed_queue.get_job_ids()) + + def test_job_in_failed_queue_persists(self): + """Make sure failed job key does not expire""" + q = Queue('foo') + job = q.enqueue(div_by_zero, args=(1, 2, 3), ttl=5) + self.assertEqual(self.testconn.ttl(job.key), 5) + + job.set_status(JobStatus.FAILED) + failed_queue = get_failed_queue() + failed_queue.quarantine(job, Exception('Some fake error')) + + self.assertEqual(self.testconn.ttl(job.key), -1)