feat: avoided "zombie" processes after killing work horse (#1348)

* feat: avoided "zombie" processes after killing work horse by setting work horse process group and killing this group

* fixed tests

* tests: added test to check that all workhorse subprocesses are killed

* tests: updated guthub run tests dependencies since they are not using (dev-)requirements.txt

Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
main
Ruslan Mullakhmetov 4 years ago committed by GitHub
parent 55dce19a95
commit 9adcd7e50c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -33,7 +33,7 @@ jobs:
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install redis==${{ matrix.redis-py-version }} pip install redis==${{ matrix.redis-py-version }}
pip install pytest pytest-cov sentry-sdk codecov mock pip install pytest pytest-cov sentry-sdk codecov mock psutil
pip install -e . pip install -e .
- name: Test with pytest - name: Test with pytest
@ -43,4 +43,4 @@ jobs:
- name: Upload coverage to Codecov - name: Upload coverage to Codecov
uses: codecov/codecov-action@v1 uses: codecov/codecov-action@v1
with: with:
file: ./coverage.xml file: ./coverage.xml

@ -1,2 +1,3 @@
mock mock
pytest pytest
psutil

@ -394,7 +394,7 @@ class Worker(object):
Kill the horse but catch "No such process" error has the horse could already be dead. Kill the horse but catch "No such process" error has the horse could already be dead.
""" """
try: try:
os.kill(self.horse_pid, sig) os.killpg(os.getpgid(self.horse_pid), sig)
self.log.info('Killed horse pid %s', self.horse_pid) self.log.info('Killed horse pid %s', self.horse_pid)
except OSError as e: except OSError as e:
if e.errno == errno.ESRCH: if e.errno == errno.ESRCH:
@ -699,6 +699,7 @@ class Worker(object):
os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0: if child_pid == 0:
os.setsid()
self.main_work_horse(job, queue) self.main_work_horse(job, queue)
os._exit(0) # just in case os._exit(0) # just in case
else: else:

@ -10,6 +10,7 @@ import os
import time import time
import signal import signal
import sys import sys
import subprocess
from rq import Connection, get_current_job, get_current_connection, Queue from rq import Connection, get_current_job, get_current_connection, Queue
from rq.decorators import job from rq.decorators import job
@ -66,6 +67,17 @@ def create_file_after_timeout(path, timeout):
time.sleep(timeout) time.sleep(timeout)
create_file(path) create_file(path)
def create_file_after_timeout_and_setsid(path, timeout):
os.setsid()
create_file_after_timeout(path, timeout)
def launch_process_within_worker_and_store_pid(path, timeout):
p = subprocess.Popen(['sleep', str(timeout)])
with open(path, 'w') as f:
f.write('{}'.format(p.pid))
p.wait()
def access_self(): def access_self():
assert get_current_connection() is not None assert get_current_connection() is not None

@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import json import json
import os import os
import psutil
import shutil import shutil
import signal import signal
import subprocess import subprocess
@ -23,9 +24,10 @@ from mock import Mock
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import ( from tests.fixtures import (
access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing, access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
kill_worker, long_running_job, modify_self, modify_self_and_error, kill_worker, long_running_job, modify_self, modify_self_and_error,
run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock,
launch_process_within_worker_and_store_pid
) )
from rq import Queue, SimpleWorker, Worker, get_current_connection from rq import Queue, SimpleWorker, Worker, get_current_connection
@ -37,7 +39,6 @@ from rq.utils import utcnow
from rq.version import VERSION from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus from rq.worker import HerokuWorker, WorkerStatus
class CustomJob(Job): class CustomJob(Job):
pass pass
@ -1166,12 +1167,16 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
sentinel_file = '/tmp/.rq_sentinel_work_horse_death' sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file): if os.path.exists(sentinel_file):
os.remove(sentinel_file) os.remove(sentinel_file)
fooq.enqueue(create_file_after_timeout, sentinel_file, 100) fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5) job, queue = w.dequeue_job_and_maintain_ttl(5)
w.fork_work_horse(job, queue) w.fork_work_horse(job, queue)
job.timeout = 5 job.timeout = 5
w.job_monitoring_interval = 1 w.job_monitoring_interval = 1
now = utcnow() now = utcnow()
time.sleep(1)
with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid))
w.monitor_work_horse(job, queue) w.monitor_work_horse(job, queue)
fudge_factor = 1 fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor total_time = w.job_monitoring_interval + 65 + fudge_factor
@ -1180,6 +1185,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
failed_job_registry = FailedJobRegistry(queue=fooq) failed_job_registry = FailedJobRegistry(queue=fooq)
self.assertTrue(job in failed_job_registry) self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
self.assertFalse(psutil.pid_exists(subprocess_pid))
def schedule_access_self(): def schedule_access_self():
@ -1283,9 +1289,10 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
w = HerokuWorker('foo') w = HerokuWorker('foo')
path = os.path.join(self.sandbox, 'shouldnt_exist') path = os.path.join(self.sandbox, 'shouldnt_exist')
p = Process(target=create_file_after_timeout, args=(path, 2)) p = Process(target=create_file_after_timeout_and_setsid, args=(path, 2))
p.start() p.start()
self.assertEqual(p.exitcode, None) self.assertEqual(p.exitcode, None)
time.sleep(0.1)
w._horse_pid = p.pid w._horse_pid = p.pid
w.handle_warm_shutdown_request() w.handle_warm_shutdown_request()

Loading…
Cancel
Save