|
|
|
@ -474,13 +474,15 @@ class Worker(object):
|
|
|
|
|
"""Performs misc bookkeeping like updating states prior to
|
|
|
|
|
job execution.
|
|
|
|
|
"""
|
|
|
|
|
timeout = (job.timeout or 180) + 60
|
|
|
|
|
|
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
|
timeout = (job.timeout or 180) + 60
|
|
|
|
|
self.set_state('busy', pipeline=pipeline)
|
|
|
|
|
self.set_current_job_id(job.id, pipeline=pipeline)
|
|
|
|
|
self.heartbeat(timeout, pipeline=pipeline)
|
|
|
|
|
working_queue = WorkingQueue(job.origin, self.connection)
|
|
|
|
|
working_queue.add(job, timeout, pipeline=pipeline)
|
|
|
|
|
job.set_status(Status.STARTED, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
@ -497,7 +499,6 @@ class Worker(object):
|
|
|
|
|
working_queue = WorkingQueue(job.origin, self.connection)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
job.set_status(Status.STARTED)
|
|
|
|
|
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
|
|
|
|
|
rv = job.perform()
|
|
|
|
|
|
|
|
|
@ -518,9 +519,10 @@ class Worker(object):
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
|
job.set_status(Status.FAILED)
|
|
|
|
|
working_queue.remove(job)
|
|
|
|
|
job.set_status(Status.FAILED, pipeline=pipeline)
|
|
|
|
|
working_queue.remove(job, pipeline=pipeline)
|
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|