remove implicit cleanup call from job.save

main
Alexey Katichev 8 years ago
parent fbe4cafe2a
commit 3596449cc0

@ -480,18 +480,18 @@ class Job(object):
def save(self, pipeline=None, include_meta=True): def save(self, pipeline=None, include_meta=True):
""" """
Persists the current job instance to its corresponding Redis key. Dumps the current job instance to its corresponding Redis key.
Exclude persisting the `meta` dictionary by setting Exclude saving the `meta` dictionary by setting
`include_meta=False`. This is useful to prevent clobbering `include_meta=False`. This is useful to prevent clobbering
user metadata without an expensive `refresh()` call first. user metadata without an expensive `refresh()` call first.
Redis key persistence may be altered by `cleanup()` method.
""" """
key = self.key key = self.key
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict(include_meta=include_meta)) connection.hmset(key, self.to_dict(include_meta=include_meta))
self.cleanup(self.ttl, pipeline=connection)
def save_meta(self): def save_meta(self):
"""Stores job meta from the job instance to the corresponding Redis key.""" """Stores job meta from the job instance to the corresponding Redis key."""
@ -549,12 +549,10 @@ class Job(object):
connection.delete(self.key) connection.delete(self.key)
connection.delete(self.dependents_key) connection.delete(self.dependents_key)
# Job execution # Job execution
def perform(self): # noqa def perform(self): # noqa
"""Invokes the job function with the job arguments.""" """Invokes the job function with the job arguments."""
self.connection.persist(self.key) self.connection.persist(self.key)
self.ttl = -1
_job_stack.push(self.id) _job_stack.push(self.id)
try: try:
self._result = self._execute() self._result = self._execute()
@ -609,9 +607,12 @@ class Job(object):
self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue) self.delete(pipeline=pipeline, remove_from_queue=remove_from_queue)
elif not ttl: elif not ttl:
return return
elif ttl > 0: else:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) if ttl > 0:
connection.expire(self.key, ttl)
else:
connection.persist(self.key)
def register_dependency(self, pipeline=None): def register_dependency(self, pipeline=None):
"""Jobs may have dependencies. Jobs are enqueued only if the job they """Jobs may have dependencies. Jobs are enqueued only if the job they

@ -223,6 +223,7 @@ class Queue(object):
job.set_status(JobStatus.DEFERRED) job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe) job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe) job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute() pipe.execute()
return job return job
break break
@ -296,6 +297,7 @@ class Queue(object):
if job.timeout is None: if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipe) job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
if self._async: if self._async:
self.push_job_id(job.id, pipeline=pipe, at_front=at_front) self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
@ -492,6 +494,7 @@ class FailedQueue(Queue):
job.ended_at = utcnow() job.ended_at = utcnow()
job.exc_info = exc_info job.exc_info = exc_info
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
self.push_job_id(job.id, pipeline=pipeline) self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute() pipeline.execute()

@ -93,6 +93,7 @@ class StartedJobRegistry(BaseRegistry):
connection=self.connection) connection=self.connection)
job.set_status(JobStatus.FAILED) job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline, include_meta=False) job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline) failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError: except NoSuchJobError:
pass pass

@ -386,7 +386,7 @@ class TestJob(RQTestCase):
self.assertEqual(job.get_ttl(), ttl) self.assertEqual(job.get_ttl(), ttl)
job.save() job.save()
job.perform() job.perform()
self.assertEqual(job.get_ttl(), -1) self.assertEqual(job.get_ttl(), ttl)
self.assertTrue(job.exists(job.id)) self.assertTrue(job.exists(job.id))
self.assertEqual(job.result, 'Done sleeping...') self.assertEqual(job.result, 'Done sleeping...')

@ -650,3 +650,15 @@ class TestFailedQueue(RQTestCase):
job.delete() job.delete()
self.assertFalse(job.id in failed_queue.get_job_ids()) self.assertFalse(job.id in failed_queue.get_job_ids())
def test_job_in_failed_queue_persists(self):
"""Make sure failed job key does not expire"""
q = Queue('foo')
job = q.enqueue(div_by_zero, args=(1, 2, 3), ttl=5)
self.assertEqual(self.testconn.ttl(job.key), 5)
job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error'))
self.assertEqual(self.testconn.ttl(job.key), -1)

Loading…
Cancel
Save