Merge pull request #831 from katichev/explicit_cleanup

Replace job id with job instance in the job stack. Remove implicit "cleanup" call from "job.save"
main
Selwin Ong 8 years ago committed by GitHub
commit 5f5e113790

@ -75,11 +75,10 @@ def get_current_job(connection=None, job_class=None):
"""Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context, None is returned.
"""
job_class = job_class or Job
job_id = _job_stack.top
if job_id is None:
return None
return job_class.fetch(job_id, connection=connection)
if job_class:
warnings.warn("job_class argument for get_current_job is deprecated.",
DeprecationWarning)
return _job_stack.top
class Job(object):
@ -479,18 +478,18 @@ class Job(object):
def save(self, pipeline=None, include_meta=True):
"""
Persists the current job instance to its corresponding Redis key.
Dumps the current job instance to its corresponding Redis key.
Exclude persisting the `meta` dictionary by setting
Exclude saving the `meta` dictionary by setting
`include_meta=False`. This is useful to prevent clobbering
user metadata without an expensive `refresh()` call first.
Redis key persistence may be altered by `cleanup()` method.
"""
key = self.key
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict(include_meta=include_meta))
self.cleanup(self.ttl, pipeline=connection)
def save_meta(self):
"""Stores job meta from the job instance to the corresponding Redis key."""
@ -548,17 +547,15 @@ class Job(object):
connection.delete(self.key)
connection.delete(self.dependents_key)
# Job execution
def perform(self): # noqa
"""Invokes the job function with the job arguments."""
self.connection.persist(self.key)
self.ttl = -1
_job_stack.push(self.id)
_job_stack.push(self)
try:
self._result = self._execute()
finally:
assert self.id == _job_stack.pop()
assert self is _job_stack.pop()
return self._result
def _execute(self):

@ -225,6 +225,7 @@ class Queue(object):
job.set_status(JobStatus.DEFERRED)
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
pipe.execute()
return job
break
@ -298,6 +299,7 @@ class Queue(object):
if job.timeout is None:
job.timeout = self.DEFAULT_TIMEOUT
job.save(pipeline=pipe)
job.cleanup(ttl=job.ttl, pipeline=pipe)
if self._async:
self.push_job_id(job.id, pipeline=pipe, at_front=at_front)
@ -494,6 +496,7 @@ class FailedQueue(Queue):
job.ended_at = utcnow()
job.exc_info = exc_info
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
self.push_job_id(job.id, pipeline=pipeline)
pipeline.execute()

@ -93,6 +93,7 @@ class StartedJobRegistry(BaseRegistry):
connection=self.connection)
job.set_status(JobStatus.FAILED)
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline)
failed_queue.push_job_id(job_id, pipeline=pipeline)
except NoSuchJobError:
pass

@ -386,7 +386,7 @@ class TestJob(RQTestCase):
self.assertEqual(job.get_ttl(), ttl)
job.save()
job.perform()
self.assertEqual(job.get_ttl(), -1)
self.assertEqual(job.get_ttl(), ttl)
self.assertTrue(job.exists(job.id))
self.assertEqual(job.result, 'Done sleeping...')

@ -650,3 +650,16 @@ class TestFailedQueue(RQTestCase):
job.delete()
self.assertFalse(job.id in failed_queue.get_job_ids())
def test_job_in_failed_queue_persists(self):
"""Make sure failed job key does not expire"""
q = Queue('foo')
job = q.enqueue(div_by_zero, args=(1,), ttl=5)
self.assertEqual(self.testconn.ttl(job.key), 5)
self.assertRaises(ZeroDivisionError, job.perform)
job.set_status(JobStatus.FAILED)
failed_queue = get_failed_queue()
failed_queue.quarantine(job, Exception('Some fake error'))
self.assertEqual(self.testconn.ttl(job.key), -1)

Loading…
Cancel
Save