|
|
|
@ -544,28 +544,7 @@ class Worker(object):
|
|
|
|
|
"""
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
_, ret_val = os.waitpid(self._horse_pid, 0)
|
|
|
|
|
if ret_val != os.EX_OK:
|
|
|
|
|
job_status = job.get_status()
|
|
|
|
|
if job_status is None:
|
|
|
|
|
# Job completed and its ttl has expired
|
|
|
|
|
break
|
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
|
self.handle_job_failure(job=job)
|
|
|
|
|
|
|
|
|
|
# Unhandled failure: move the job to the failed queue
|
|
|
|
|
self.log.warning(
|
|
|
|
|
'Moving job to {0!r} queue'.format(
|
|
|
|
|
self.failed_queue.name
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self.failed_queue.quarantine(
|
|
|
|
|
job,
|
|
|
|
|
exc_info=(
|
|
|
|
|
"Work-horse process "
|
|
|
|
|
"was terminated unexpectedly"
|
|
|
|
|
)
|
|
|
|
|
)
|
|
|
|
|
self._monitor_work_horse_tick(job)
|
|
|
|
|
break
|
|
|
|
|
except OSError as e:
|
|
|
|
|
# In case we encountered an OSError due to EINTR (which is
|
|
|
|
@ -577,6 +556,29 @@ class Worker(object):
|
|
|
|
|
if e.errno != errno.EINTR:
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def _monitor_work_horse_tick(self, job):
|
|
|
|
|
_, ret_val = os.waitpid(self._horse_pid, 0)
|
|
|
|
|
if ret_val == os.EX_OK: # The process exited normally.
|
|
|
|
|
return
|
|
|
|
|
job_status = job.get_status()
|
|
|
|
|
if job_status is None: # Job completed and its ttl has expired
|
|
|
|
|
return
|
|
|
|
|
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
|
|
|
|
|
self.handle_job_failure(job=job)
|
|
|
|
|
|
|
|
|
|
# Unhandled failure: move the job to the failed queue
|
|
|
|
|
self.log.warning((
|
|
|
|
|
'Moving job to {0!r} queue '
|
|
|
|
|
'(work-horse terminated unexpectedly; waitpid returned {1})'
|
|
|
|
|
).format(self.failed_queue.name, ret_val))
|
|
|
|
|
self.failed_queue.quarantine(
|
|
|
|
|
job,
|
|
|
|
|
exc_info=(
|
|
|
|
|
"Work-horse process was terminated unexpectedly "
|
|
|
|
|
"(waitpid returned {0})"
|
|
|
|
|
).format(ret_val)
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def execute_job(self, job, queue):
|
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|
|
The worker will wait for the work horse and make sure it executes
|
|
|
|
|