From bba781d288dd846781d449f343058c4d5cdd94ab Mon Sep 17 00:00:00 2001 From: Rony Lutsky <3050627+ronlut@users.noreply.github.com> Date: Fri, 17 Feb 2023 01:42:41 +0200 Subject: [PATCH] Enhance worker termination logic (#1729) * enhance worker termination logic and allow passing custom exc_info in failure callback * handle ret_val None * fix unbound variable * typing * docs * Update exceptions.md * rename * typing * rename * Update exceptions.md * revert test change --- docs/docs/exceptions.md | 25 +++++++++++++++++++++ rq/worker.py | 48 +++++++++++++++++++++++------------------ tests/test_worker.py | 5 +++-- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/docs/docs/exceptions.md b/docs/docs/exceptions.md index f757f2c..c036354 100644 --- a/docs/docs/exceptions.md +++ b/docs/docs/exceptions.md @@ -119,3 +119,28 @@ use a custom exception handler that doesn't fall through, for example: def black_hole(job, *exc_info): return False ``` + +## Work Horse Killed Handler +_New in version 1.13.0._ + +In addition to job exception handler(s), RQ supports registering a handler for unexpected workhorse termination. +This handler is called when a workhorse is unexpectedly terminated, for example due to OOM. + +This is how you set a workhorse termination handler to an RQ worker: + +```python +from my_handlers import my_work_horse_killed_handler + +w = Worker([q], work_horse_killed_handler=my_work_horse_killed_handler) +``` + +The handler itself is a function that takes the following parameters: `job`, +`retpid`, `ret_val` and `rusage`: + +```python +from resource import struct_rusage +from rq.job import Job +def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: struct_rusage): + # do your thing here, for example set job.retries_left to 0 + +``` diff --git a/rq/worker.py b/rq/worker.py index 6fdc6ca..6c3e357 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,7 +1,9 @@ +import contextlib import errno import logging import os import random +import resource import signal import socket import sys @@ -231,6 +233,7 @@ class Worker: disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None, + work_horse_killed_handler: Optional[Callable[[Job, int, int, resource.struct_rusage], None]] = None ): # noqa self.default_result_ttl = default_result_ttl self.worker_ttl = default_worker_ttl @@ -258,6 +261,7 @@ class Worker: self.validate_queues() self._ordered_queues = self.queues[:] self._exc_handlers: List[Callable] = [] + self._work_horse_killed_handler = work_horse_killed_handler self._state: str = 'starting' self._is_horse: bool = False @@ -549,18 +553,14 @@ class Worker: else: raise - def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]: + def wait_for_horse(self) -> Tuple[Optional[int], Optional[int], Optional[resource.struct_rusage]]: """Waits for the horse process to complete. Uses `0` as argument as to include "any child in the process group of the current process". """ - pid = None - stat = None - try: - pid, stat = os.waitpid(self.horse_pid, 0) - except ChildProcessError: - # ChildProcessError: [Errno 10] No child processes - pass - return pid, stat + pid = stat = rusage = None + with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes + pid, stat, rusage = os.wait4(self.horse_pid, 0) + return pid, stat, rusage def request_force_stop(self, signum, frame): """Terminates the application (cold shutdown). @@ -1058,12 +1058,12 @@ class Worker: job (Job): _description_ queue (Queue): _description_ """ - ret_val = None + retpid = ret_val = rusage = None job.started_at = utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): - retpid, ret_val = self.wait_for_horse() + retpid, ret_val, rusage = self.wait_for_horse() break except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. @@ -1109,14 +1109,14 @@ class Worker: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning( - ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( - ret_val - ) - ) + signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else "" + exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " + self.log.warning(f'Moving job to FailedJobRegistry ({exc_string})') + self.handle_work_horse_killed(job, retpid, ret_val, rusage) self.handle_job_failure( - job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val + job, queue=queue, + exc_string=exc_string ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1323,7 +1323,7 @@ class Worker: with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.success_callback(job, self.connection, result) - def execute_failure_callback(self, job: 'Job'): + def execute_failure_callback(self, job: 'Job', *exc_info): """Executes failure_callback with timeout Args: @@ -1332,7 +1332,7 @@ class Worker: self.log.debug(f"Running failure callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): - job.failure_callback(job, self.connection, *sys.exc_info()) + job.failure_callback(job, self.connection, *exc_info) def perform_job(self, job: 'Job', queue: 'Queue') -> bool: """Performs the actual work of a job. Will/should only be called @@ -1377,11 +1377,11 @@ class Worker: if job.failure_callback: try: - self.execute_failure_callback(job) + self.execute_failure_callback(job, *exc_info) except: # noqa - self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info) self.handle_job_failure( job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry @@ -1453,6 +1453,12 @@ class Worker: """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() + def handle_work_horse_killed(self, job, retpid, ret_val, rusage): + if self._work_horse_killed_handler is None: + return + + self._work_horse_killed_handler(job, retpid, ret_val, rusage) + def __eq__(self, other): """Equality does not take the database/connection into account""" if not isinstance(other, self.__class__): diff --git a/tests/test_worker.py b/tests/test_worker.py index cfce473..edbab98 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1224,13 +1224,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) - w.monitor_work_horse(job, queue) + with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked: + w.monitor_work_horse(job, queue) + self.assertEqual(mocked.call_count, 1) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor