diff --git a/rq/job.py b/rq/job.py index 8b20474..da57e1d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -75,11 +75,10 @@ def get_current_job(connection=None, job_class=None): """Returns the Job instance that is currently being executed. If this function is invoked from outside a job context, None is returned. """ - job_class = job_class or Job - job_id = _job_stack.top - if job_id is None: - return None - return job_class.fetch(job_id, connection=connection) + if job_class: + warnings.warn("job_class argument for get_current_job is deprecated.", + DeprecationWarning) + return _job_stack.top class Job(object): @@ -479,18 +478,18 @@ class Job(object): def save(self, pipeline=None, include_meta=True): """ - Persists the current job instance to its corresponding Redis key. + Dumps the current job instance to its corresponding Redis key. - Exclude persisting the `meta` dictionary by setting + Exclude saving the `meta` dictionary by setting `include_meta=False`. This is useful to prevent clobbering user metadata without an expensive `refresh()` call first. + Redis key persistence may be altered by `cleanup()` method. """ key = self.key connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict(include_meta=include_meta)) - self.cleanup(self.ttl, pipeline=connection) def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" @@ -548,17 +547,15 @@ class Job(object): connection.delete(self.key) connection.delete(self.dependents_key) - # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" self.connection.persist(self.key) - self.ttl = -1 - _job_stack.push(self.id) + _job_stack.push(self) try: self._result = self._execute() finally: - assert self.id == _job_stack.pop() + assert self is _job_stack.pop() return self._result def _execute(self): diff --git a/rq/queue.py b/rq/queue.py index cbef351..e4b6a47 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -225,6 +225,7 @@ class Queue(object): job.set_status(JobStatus.DEFERRED) job.register_dependency(pipeline=pipe) job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) pipe.execute() return job break @@ -298,6 +299,7 @@ class Queue(object): if job.timeout is None: job.timeout = self.DEFAULT_TIMEOUT job.save(pipeline=pipe) + job.cleanup(ttl=job.ttl, pipeline=pipe) if self._async: self.push_job_id(job.id, pipeline=pipe, at_front=at_front) @@ -494,6 +496,7 @@ class FailedQueue(Queue): job.ended_at = utcnow() job.exc_info = exc_info job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire self.push_job_id(job.id, pipeline=pipeline) pipeline.execute() diff --git a/rq/registry.py b/rq/registry.py index 168dd8e..51e6dd0 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -93,6 +93,7 @@ class StartedJobRegistry(BaseRegistry): connection=self.connection) job.set_status(JobStatus.FAILED) job.save(pipeline=pipeline, include_meta=False) + job.cleanup(ttl=-1, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline) except NoSuchJobError: pass diff --git a/tests/test_job.py b/tests/test_job.py index 1491f7f..46d7d39 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -386,7 +386,7 @@ class TestJob(RQTestCase): self.assertEqual(job.get_ttl(), ttl) job.save() job.perform() - self.assertEqual(job.get_ttl(), -1) + self.assertEqual(job.get_ttl(), ttl) self.assertTrue(job.exists(job.id)) self.assertEqual(job.result, 'Done sleeping...') diff --git a/tests/test_queue.py b/tests/test_queue.py index 294fb62..7129452 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -650,3 +650,16 @@ class TestFailedQueue(RQTestCase): job.delete() self.assertFalse(job.id in failed_queue.get_job_ids()) + + def test_job_in_failed_queue_persists(self): + """Make sure failed job key does not expire""" + q = Queue('foo') + job = q.enqueue(div_by_zero, args=(1,), ttl=5) + self.assertEqual(self.testconn.ttl(job.key), 5) + + self.assertRaises(ZeroDivisionError, job.perform) + job.set_status(JobStatus.FAILED) + failed_queue = get_failed_queue() + failed_queue.quarantine(job, Exception('Some fake error')) + + self.assertEqual(self.testconn.ttl(job.key), -1)