From 92cf6f4696912e717965ce35c5670d5a83ec2d8f Mon Sep 17 00:00:00 2001 From: Aarni Koskela Date: Fri, 28 Apr 2017 03:42:28 +0300 Subject: [PATCH] Add waitpid() return value to workhorse error/log messages (#819) * Refactor the inner part of the monitor_work_horse loop into a separate function * Add waitpid() return value to workhorse error/log messages --- rq/worker.py | 46 ++++++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 22 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 0744a8d..c52c21c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -544,28 +544,7 @@ class Worker(object): """ while True: try: - _, ret_val = os.waitpid(self._horse_pid, 0) - if ret_val != os.EX_OK: - job_status = job.get_status() - if job_status is None: - # Job completed and its ttl has expired - break - if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - self.handle_job_failure(job=job) - - # Unhandled failure: move the job to the failed queue - self.log.warning( - 'Moving job to {0!r} queue'.format( - self.failed_queue.name - ) - ) - self.failed_queue.quarantine( - job, - exc_info=( - "Work-horse process " - "was terminated unexpectedly" - ) - ) + self._monitor_work_horse_tick(job) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -577,6 +556,29 @@ class Worker(object): if e.errno != errno.EINTR: raise + def _monitor_work_horse_tick(self, job): + _, ret_val = os.waitpid(self._horse_pid, 0) + if ret_val == os.EX_OK: # The process exited normally. + return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired + return + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + self.handle_job_failure(job=job) + + # Unhandled failure: move the job to the failed queue + self.log.warning(( + 'Moving job to {0!r} queue ' + '(work-horse terminated unexpectedly; waitpid returned {1})' + ).format(self.failed_queue.name, ret_val)) + self.failed_queue.quarantine( + job, + exc_info=( + "Work-horse process was terminated unexpectedly " + "(waitpid returned {0})" + ).format(ret_val) + ) + def execute_job(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes