diff --git a/rq/worker.py b/rq/worker.py index 01145cb..efeaf92 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -523,6 +523,25 @@ class Worker(object): while True: try: _, ret_val = os.waitpid(self._horse_pid, 0) + if not (ret_val == os.EX_OK): + job_status = job.get_status() + if job_status is None: + # Job completed and its ttl has expired + break + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + with self.connection._pipeline() as pipeline: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + pass + self.move_to_failed_queue_unhandled( + job, + "Work-horse proccess was terminated unexpectedly" + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -690,6 +709,11 @@ class Worker(object): self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) + def move_to_failed_queue_unhandled(self, job, message): + """Unhandled failure default handler: move the job to the failed queue.""" + self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) + self.failed_queue.quarantine(job, exc_info=message) + def push_exc_handler(self, handler_func): """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func)