Enhance worker termination logic (#1729)

* enhance worker termination logic and allow passing custom exc_info in failure callback

* handle ret_val None

* fix unbound variable

* typing

* docs

* Update exceptions.md

* rename

* typing

* rename

* Update exceptions.md

* revert test change
main
Rony Lutsky 2 years ago committed by GitHub
parent a985445486
commit bba781d288
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -119,3 +119,28 @@ use a custom exception handler that doesn't fall through, for example:
def black_hole(job, *exc_info):
return False
```
## Work Horse Killed Handler
_New in version 1.13.0._
In addition to job exception handler(s), RQ supports registering a handler for unexpected workhorse termination.
This handler is called when a workhorse is unexpectedly terminated, for example due to OOM.
This is how you set a workhorse termination handler to an RQ worker:
```python
from my_handlers import my_work_horse_killed_handler
w = Worker([q], work_horse_killed_handler=my_work_horse_killed_handler)
```
The handler itself is a function that takes the following parameters: `job`,
`retpid`, `ret_val` and `rusage`:
```python
from resource import struct_rusage
from rq.job import Job
def my_work_horse_killed_handler(job: Job, retpid: int, ret_val: int, rusage: struct_rusage):
# do your thing here, for example set job.retries_left to 0
```

@ -1,7 +1,9 @@
import contextlib
import errno
import logging
import os
import random
import resource
import signal
import socket
import sys
@ -231,6 +233,7 @@ class Worker:
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True,
serializer=None,
work_horse_killed_handler: Optional[Callable[[Job, int, int, resource.struct_rusage], None]] = None
): # noqa
self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
@ -258,6 +261,7 @@ class Worker:
self.validate_queues()
self._ordered_queues = self.queues[:]
self._exc_handlers: List[Callable] = []
self._work_horse_killed_handler = work_horse_killed_handler
self._state: str = 'starting'
self._is_horse: bool = False
@ -549,18 +553,14 @@ class Worker:
else:
raise
def wait_for_horse(self) -> Tuple[Optional[int], Optional[int]]:
def wait_for_horse(self) -> Tuple[Optional[int], Optional[int], Optional[resource.struct_rusage]]:
"""Waits for the horse process to complete.
Uses `0` as argument as to include "any child in the process group of the current process".
"""
pid = None
stat = None
try:
pid, stat = os.waitpid(self.horse_pid, 0)
except ChildProcessError:
# ChildProcessError: [Errno 10] No child processes
pass
return pid, stat
pid = stat = rusage = None
with contextlib.suppress(ChildProcessError): # ChildProcessError: [Errno 10] No child processes
pid, stat, rusage = os.wait4(self.horse_pid, 0)
return pid, stat, rusage
def request_force_stop(self, signum, frame):
"""Terminates the application (cold shutdown).
@ -1058,12 +1058,12 @@ class Worker:
job (Job): _description_
queue (Queue): _description_
"""
ret_val = None
retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = self.wait_for_horse()
retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
@ -1109,14 +1109,14 @@ class Worker:
job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue
self.log.warning(
('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format(
ret_val
)
)
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
self.log.warning(f'Moving job to FailedJobRegistry ({exc_string})')
self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(
job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val
job, queue=queue,
exc_string=exc_string
)
def execute_job(self, job: 'Job', queue: 'Queue'):
@ -1323,7 +1323,7 @@ class Worker:
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.success_callback(job, self.connection, result)
def execute_failure_callback(self, job: 'Job'):
def execute_failure_callback(self, job: 'Job', *exc_info):
"""Executes failure_callback with timeout
Args:
@ -1332,7 +1332,7 @@ class Worker:
self.log.debug(f"Running failure callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection, *sys.exc_info())
job.failure_callback(job, self.connection, *exc_info)
def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
"""Performs the actual work of a job. Will/should only be called
@ -1377,11 +1377,11 @@ class Worker:
if job.failure_callback:
try:
self.execute_failure_callback(job)
self.execute_failure_callback(job, *exc_info)
except: # noqa
self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True)
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=exc_info)
self.handle_job_failure(
job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry
@ -1453,6 +1453,12 @@ class Worker:
"""Pops the latest exception handler off of the exc handler stack."""
return self._exc_handlers.pop()
def handle_work_horse_killed(self, job, retpid, ret_val, rusage):
if self._work_horse_killed_handler is None:
return
self._work_horse_killed_handler(job, retpid, ret_val, rusage)
def __eq__(self, other):
"""Equality does not take the database/connection into account"""
if not isinstance(other, self.__class__):

@ -1224,13 +1224,14 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w.prepare_job_execution(job)
w.fork_work_horse(job, queue)
job.timeout = 5
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
with mock.patch.object(w, 'handle_work_horse_killed', wraps=w.handle_work_horse_killed) as mocked:
w.monitor_work_horse(job, queue)
self.assertEqual(mocked.call_count, 1)
fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor

Loading…
Cancel
Save