From f9d58979227dd6498545ff62c0d3378475a6db4e Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Wed, 25 May 2016 15:24:34 -0400 Subject: [PATCH] Solves issue 702 In order to solve issue 702 we have to check whether a work-horse terminated unexpectedly (by inspecting the exit code of the work-horse process). If it exited unexpectedly we check if the job has either been marked as finished, failed or other valid states. If it's not in any valid state we mark it as failed and move it to the failed queue. Since the process was terminated unexpectedly (think OOM) we do not have any exception context and we can't run any custom exception handlers. There is still a chance that the job will finish successfully but the work-horse process will be killed before the job is marked as finished and we will erroneously mark it as failed. The users should take care to write idempotent jobs. --- rq/worker.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 01145cb..efeaf92 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -523,6 +523,25 @@ class Worker(object): while True: try: _, ret_val = os.waitpid(self._horse_pid, 0) + if not (ret_val == os.EX_OK): + job_status = job.get_status() + if job_status is None: + # Job completed and its ttl has expired + break + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + with self.connection._pipeline() as pipeline: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + pass + self.move_to_failed_queue_unhandled( + job, + "Work-horse proccess was terminated unexpectedly" + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -690,6 +709,11 @@ class Worker(object): self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) + def move_to_failed_queue_unhandled(self, job, message): + """Unhandled failure default handler: move the job to the failed queue.""" + self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) + self.failed_queue.quarantine(job, exc_info=message) + def push_exc_handler(self, handler_func): """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func)