|
|
@ -530,10 +530,10 @@ class Worker(object):
|
|
|
|
break
|
|
|
|
break
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
with self.connection._pipeline() as pipeline:
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
self.handle_current_job_failure(
|
|
|
|
started_job_registry = StartedJobRegistry(job.origin, self.connection)
|
|
|
|
job=job,
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
pipeline=pipeline
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
@ -611,6 +611,27 @@ class Worker(object):
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
self.procline(msg.format(job.func_name, job.origin, time.time()))
|
|
|
|
self.procline(msg.format(job.func_name, job.origin, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_current_job_failure(
|
|
|
|
|
|
|
|
self,
|
|
|
|
|
|
|
|
job,
|
|
|
|
|
|
|
|
started_job_registry=None,
|
|
|
|
|
|
|
|
pipeline=None
|
|
|
|
|
|
|
|
):
|
|
|
|
|
|
|
|
"""Handles the failure or an executing job by:
|
|
|
|
|
|
|
|
1. Setting the job status to failed
|
|
|
|
|
|
|
|
2. Removing the job from the started_job_registry
|
|
|
|
|
|
|
|
3. Setting the workers current job to None
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if started_job_registry is None:
|
|
|
|
|
|
|
|
started_job_registry = StartedJobRegistry(
|
|
|
|
|
|
|
|
job.origin,
|
|
|
|
|
|
|
|
self.connection
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
def perform_job(self, job, queue):
|
|
|
|
def perform_job(self, job, queue):
|
|
|
|
"""Performs the actual work of a job. Will/should only be called
|
|
|
|
"""Performs the actual work of a job. Will/should only be called
|
|
|
|
inside the work horse's process.
|
|
|
|
inside the work horse's process.
|
|
|
@ -651,9 +672,11 @@ class Worker(object):
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|
job.set_status(JobStatus.FAILED, pipeline=pipeline)
|
|
|
|
self.handle_current_job_failure(
|
|
|
|
started_job_registry.remove(job, pipeline=pipeline)
|
|
|
|
job=job,
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
started_job_registry=started_job_registry,
|
|
|
|
|
|
|
|
pipeline=pipeline
|
|
|
|
|
|
|
|
)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|