diff --git a/tests/test_worker.py b/tests/test_worker.py index 40b84b4..f3032ea 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -5,10 +5,13 @@ from __future__ import (absolute_import, division, print_function, import os from datetime import timedelta from time import sleep +import signal +import time +from multiprocessing import Process from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, - div_by_zero, do_nothing, say_hello, say_pid) + div_by_zero, do_nothing, say_hello, say_pid, long_running_job) from tests.helpers import strip_microseconds from rq import get_failed_queue, Queue, SimpleWorker, Worker @@ -468,3 +471,66 @@ class TestWorker(RQTestCase): worker = Worker(queue, connection=self.testconn) worker.work(burst=True) self.assertEqual(self.testconn.zcard(registry.key), 0) + + +def kill_worker(pid, double_kill): + # wait for the worker to be started over on the main process + time.sleep(0.5) + os.kill(pid, signal.SIGTERM) + if double_kill: + # give the worker time to switch signal handler + time.sleep(0.5) + os.kill(pid, signal.SIGTERM) + + +class TestWorkerShutdown(RQTestCase): + def setUp(self): + # we want tests to fail if signal are ignored and the work remain running, + # so set a signal to kill them after 5 seconds + signal.signal(signal.SIGALRM, self._timeout) + signal.alarm(5) + + def _timeout(self, signal, frame): + raise AssertionError("test still running after 5 seconds, " + "likely the worker wasn't shutdown correctly") + + @slow + def test_idle_worker_warm_shutdown(self): + w = Worker('foo') + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(1) + self.assertFalse(w._stop_requested) + + @slow + def test_working_worker_warm_shutdown(self): + fooq = Queue('foo') + w = Worker(fooq) + fooq.enqueue(long_running_job, 2) + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(1) + self.assertTrue(w._stop_requested) + + @slow + def test_working_worker_cold_shutdown(self): + fooq = Queue('foo') + w = Worker(fooq) + fooq.enqueue(long_running_job, 10) + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), True)) + p.start() + + self.assertRaises(SystemExit, w.work) + + p.join(1) + self.assertTrue(w._stop_requested) +