|
|
@ -530,7 +530,7 @@ class Worker(object):
|
|
|
|
break
|
|
|
|
break
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
self.handle_current_job_failure(
|
|
|
|
self.handle_job_failure(
|
|
|
|
job=job,
|
|
|
|
job=job,
|
|
|
|
pipeline=pipeline
|
|
|
|
pipeline=pipeline
|
|
|
|
)
|
|
|
|
)
|
|
|
@ -611,7 +611,7 @@ class Worker(object):
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
self.procline(msg.format(job.func_name, job.origin, time.time()))
|
|
|
|
self.procline(msg.format(job.func_name, job.origin, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
def handle_current_job_failure(
|
|
|
|
def handle_job_failure(
|
|
|
|
self,
|
|
|
|
self,
|
|
|
|
job,
|
|
|
|
job,
|
|
|
|
started_job_registry=None,
|
|
|
|
started_job_registry=None,
|
|
|
@ -672,7 +672,7 @@ class Worker(object):
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|
self.handle_current_job_failure(
|
|
|
|
self.handle_job_failure(
|
|
|
|
job=job,
|
|
|
|
job=job,
|
|
|
|
started_job_registry=started_job_registry,
|
|
|
|
started_job_registry=started_job_registry,
|
|
|
|
pipeline=pipeline
|
|
|
|
pipeline=pipeline
|
|
|
|