diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 8ae68cf..f86e716 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -33,7 +33,7 @@ jobs: run: | python -m pip install --upgrade pip pip install redis==${{ matrix.redis-py-version }} - pip install pytest pytest-cov sentry-sdk codecov mock + pip install pytest pytest-cov sentry-sdk codecov mock psutil pip install -e . - name: Test with pytest @@ -43,4 +43,4 @@ jobs: - name: Upload coverage to Codecov uses: codecov/codecov-action@v1 with: - file: ./coverage.xml \ No newline at end of file + file: ./coverage.xml diff --git a/dev-requirements.txt b/dev-requirements.txt index 615fac9..ccb6eff 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,2 +1,3 @@ mock -pytest \ No newline at end of file +pytest +psutil diff --git a/rq/worker.py b/rq/worker.py index a4e3b25..88c72b0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -394,7 +394,7 @@ class Worker(object): Kill the horse but catch "No such process" error has the horse could already be dead. """ try: - os.kill(self.horse_pid, sig) + os.killpg(os.getpgid(self.horse_pid), sig) self.log.info('Killed horse pid %s', self.horse_pid) except OSError as e: if e.errno == errno.ESRCH: @@ -699,6 +699,7 @@ class Worker(object): os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id if child_pid == 0: + os.setsid() self.main_work_horse(job, queue) os._exit(0) # just in case else: diff --git a/tests/fixtures.py b/tests/fixtures.py index 057b932..5d8e8bc 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -10,6 +10,7 @@ import os import time import signal import sys +import subprocess from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job @@ -66,6 +67,17 @@ def create_file_after_timeout(path, timeout): time.sleep(timeout) create_file(path) +def create_file_after_timeout_and_setsid(path, timeout): + os.setsid() + create_file_after_timeout(path, timeout) + +def launch_process_within_worker_and_store_pid(path, timeout): + + p = subprocess.Popen(['sleep', str(timeout)]) + with open(path, 'w') as f: + f.write('{}'.format(p.pid)) + + p.wait() def access_self(): assert get_current_connection() is not None diff --git a/tests/test_worker.py b/tests/test_worker.py index 4ea8a30..1ab5865 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -4,6 +4,7 @@ from __future__ import (absolute_import, division, print_function, import json import os +import psutil import shutil import signal import subprocess @@ -23,9 +24,10 @@ from mock import Mock from tests import RQTestCase, slow from tests.fixtures import ( - access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing, + access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing, kill_worker, long_running_job, modify_self, modify_self_and_error, - run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock + run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock, + launch_process_within_worker_and_store_pid ) from rq import Queue, SimpleWorker, Worker, get_current_connection @@ -37,7 +39,6 @@ from rq.utils import utcnow from rq.version import VERSION from rq.worker import HerokuWorker, WorkerStatus - class CustomJob(Job): pass @@ -1166,12 +1167,16 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): sentinel_file = '/tmp/.rq_sentinel_work_horse_death' if os.path.exists(sentinel_file): os.remove(sentinel_file) - fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100) job, queue = w.dequeue_job_and_maintain_ttl(5) w.fork_work_horse(job, queue) job.timeout = 5 w.job_monitoring_interval = 1 now = utcnow() + time.sleep(1) + with open(sentinel_file) as f: + subprocess_pid = int(f.read().strip()) + self.assertTrue(psutil.pid_exists(subprocess_pid)) w.monitor_work_horse(job, queue) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor @@ -1180,6 +1185,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): failed_job_registry = FailedJobRegistry(queue=fooq) self.assertTrue(job in failed_job_registry) self.assertEqual(fooq.count, 0) + self.assertFalse(psutil.pid_exists(subprocess_pid)) def schedule_access_self(): @@ -1283,9 +1289,10 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w = HerokuWorker('foo') path = os.path.join(self.sandbox, 'shouldnt_exist') - p = Process(target=create_file_after_timeout, args=(path, 2)) + p = Process(target=create_file_after_timeout_and_setsid, args=(path, 2)) p.start() self.assertEqual(p.exitcode, None) + time.sleep(0.1) w._horse_pid = p.pid w.handle_warm_shutdown_request()