|
|
@ -416,28 +416,28 @@ class Worker(object):
|
|
|
|
job.func_name,
|
|
|
|
job.func_name,
|
|
|
|
job.origin, time.time()))
|
|
|
|
job.origin, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
with self.connection.pipeline() as pipeline:
|
|
|
|
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
|
|
|
|
try:
|
|
|
|
rv = job.perform()
|
|
|
|
with death_penalty_after(job.timeout or Queue.DEFAULT_TIMEOUT):
|
|
|
|
|
|
|
|
rv = job.perform()
|
|
|
|
# Pickle the result in the same try-except block since we need to
|
|
|
|
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
# Pickle the result in the same try-except block since we need to
|
|
|
|
job._result = rv
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
job._status = Status.FINISHED
|
|
|
|
job._result = rv
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
job._status = Status.FINISHED
|
|
|
|
|
|
|
|
job.ended_at = utcnow()
|
|
|
|
result_ttl = job.get_ttl(self.default_result_ttl)
|
|
|
|
|
|
|
|
pipeline = self.connection._pipeline()
|
|
|
|
result_ttl = job.get_ttl(self.default_result_ttl)
|
|
|
|
if result_ttl != 0:
|
|
|
|
if result_ttl != 0:
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline)
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline)
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
|
|
except:
|
|
|
|
except Exception:
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
job.status = Status.FAILED
|
|
|
|
job.status = Status.FAILED
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if rv is None:
|
|
|
|
if rv is None:
|
|
|
|
self.log.info('Job OK')
|
|
|
|
self.log.info('Job OK')
|
|
|
|