diff --git a/rq/exceptions.py b/rq/exceptions.py index 88bbbb0..530733d 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -19,3 +19,9 @@ class UnpickleError(Exception): class DequeueTimeout(Exception): pass + + +class ShutDownImminentException(Exception): + def __init__(self, msg, extra_info): + self.extra_info = extra_info + super(ShutDownImminentException, self).__init__(msg) diff --git a/rq/worker.py b/rq/worker.py index 831fe03..f7c8c63 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -18,7 +18,7 @@ from rq.compat import as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL -from .exceptions import DequeueTimeout +from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue, get_failed_queue @@ -371,8 +371,7 @@ class Worker(object): signal.signal(signal.SIGINT, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop) - msg = 'Warm shut down requested' - self.log.warning(msg) + self.handle_warm_shutdown_request() # If shutdown is requested in the middle of a job, wait until # finish before shutting down and save the request in redis @@ -384,6 +383,9 @@ class Worker(object): else: raise StopRequested() + def handle_warm_shutdown_request(self): + self.log.warning('Warm shut down requested') + def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" @@ -537,13 +539,7 @@ class Worker(object): # that are different from the worker. random.seed() - # Always ignore Ctrl+C in the work horse, as it might abort the - # currently running job. - # The main worker catches the Ctrl+C and requests graceful shutdown - # after the current work is done. When cold shutdown is requested, it - # kills the current job anyway. - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.setup_work_horse_signals() self._is_horse = True self.log = logger @@ -554,6 +550,16 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) + def setup_work_horse_signals(self): + """Setup signal handing for the newly spawned work horse.""" + # Always ignore Ctrl+C in the work horse, as it might abort the + # currently running job. + # The main worker catches the Ctrl+C and requests graceful shutdown + # after the current work is done. When cold shutdown is requested, it + # kills the current job anyway. + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + def prepare_job_execution(self, job): """Performs misc bookkeeping like updating states prior to job execution. @@ -714,3 +720,45 @@ class SimpleWorker(Worker): def execute_job(self, *args, **kwargs): """Execute job in same thread/process, do not fork()""" return self.perform_job(*args, **kwargs) + + +class HerokuWorker(Worker): + """ + Modified version of rq worker which: + * stops work horses getting killed with SIGTERM + * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn + causes the horse to crash `imminent_shutdown_delay` seconds later + """ + imminent_shutdown_delay = 6 + frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', + 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] + + def setup_work_horse_signals(self): + """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN""" + signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin) + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + def handle_warm_shutdown_request(self): + """If horse is alive send it SIGRTMIN""" + if self.horse_pid != 0: + self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') + os.kill(self.horse_pid, signal.SIGRTMIN) + else: + self.log.warning('Warm shut down requested, no horse found') + + def request_stop_sigrtmin(self, signum, frame): + if self.imminent_shutdown_delay == 0: + logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') + self.request_force_stop_sigrtmin(signum, frame) + else: + logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', + self.imminent_shutdown_delay) + signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) + signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) + signal.alarm(self.imminent_shutdown_delay) + + def request_force_stop_sigrtmin(self, signum, frame): + info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) + logger.warn('raising ShutDownImminentException to cancel job...') + raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) diff --git a/tests/fixtures.py b/tests/fixtures.py index 2d15d88..7402e06 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,9 +9,10 @@ from __future__ import (absolute_import, division, print_function, import os import time -from rq import Connection, get_current_job, get_current_connection +from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job from rq.compat import PY2 +from rq.worker import HerokuWorker def say_pid(): @@ -102,3 +103,25 @@ def black_hole(job, *exc_info): def long_running_job(timeout=10): time.sleep(timeout) return 'Done sleeping...' + + +def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay): + """ + Run the work horse for a simplified heroku worker where perform_job just + creates two sentinel files 2 seconds apart. + :param sandbox: directory to create files in + :param _imminent_shutdown_delay: delay to use for HerokuWorker + """ + class TestHerokuWorker(HerokuWorker): + imminent_shutdown_delay = _imminent_shutdown_delay + + def perform_job(self, job, queue): + create_file(os.path.join(sandbox, 'started')) + # have to loop here rather than one sleep to avoid holding the GIL + # and preventing signals being received + for i in range(20): + time.sleep(0.1) + create_file(os.path.join(sandbox, 'finished')) + + w = TestHerokuWorker(Queue('dummy')) + w.main_work_horse(None, None) diff --git a/tests/test_worker.py b/tests/test_worker.py index c63f97b..7df27f0 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +import shutil from datetime import timedelta from time import sleep import signal @@ -13,7 +14,7 @@ import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - access_self) + run_dummy_heroku_worker, access_self) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -23,6 +24,7 @@ from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend from rq.utils import utcnow +from rq.worker import HerokuWorker class CustomJob(Job): @@ -473,6 +475,7 @@ class TestWorker(RQTestCase): assert q.count == 0 self.assertEqual(os.path.exists(SENTINEL_FILE), True) + @slow def test_suspend_with_duration(self): q = Queue() for _ in range(5): @@ -575,7 +578,7 @@ def kill_worker(pid, double_kill): os.kill(pid, signal.SIGTERM) -class TestWorkerShutdown(RQTestCase): +class TimeoutTestCase: def setUp(self): # we want tests to fail if signal are ignored and the work remain # running, so set a signal to kill them after X seconds @@ -588,6 +591,8 @@ class TestWorkerShutdown(RQTestCase): "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout ) + +class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): @slow def test_idle_worker_warm_shutdown(self): """worker with no ongoing job receiving single SIGTERM signal and shutting down""" @@ -616,12 +621,12 @@ class TestWorkerShutdown(RQTestCase): w.work() p.join(2) + self.assertFalse(p.is_alive()) self.assertTrue(w._stop_requested) self.assertTrue(os.path.exists(sentinel_file)) - shutdown_requested_date = w.shutdown_requested_date - self.assertIsNotNone(shutdown_requested_date) - self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + self.assertIsNotNone(w.shutdown_requested_date) + self.assertEqual(type(w.shutdown_requested_date).__name__, 'datetime') @slow def test_working_worker_cold_shutdown(self): @@ -675,3 +680,85 @@ class TestWorkerSubprocess(RQTestCase): subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) assert get_failed_queue().count == 0 assert q.count == 0 + + +class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): + def setUp(self): + super(HerokuWorkerShutdownTestCase, self).setUp() + self.sandbox = '/tmp/rq_shutdown/' + os.makedirs(self.sandbox) + + def tearDown(self): + shutil.rmtree(self.sandbox, ignore_errors=True) + + @slow + def test_immediate_shutdown(self): + """Heroku work horse shutdown with immediate (0 second) kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 0)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + + p.join(2) + self.assertEqual(p.exitcode, 1) + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + @slow + def test_1_sec_shutdown(self): + """Heroku work horse shutdown with 1 second kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 1)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + time.sleep(0.1) + self.assertEqual(p.exitcode, None) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + @slow + def test_shutdown_double_sigrtmin(self): + """Heroku work horse shutdown with long delay but SIGRTMIN sent twice""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 10)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + # we have to wait a short while otherwise the second signal wont bet processed. + time.sleep(0.1) + os.kill(p.pid, signal.SIGRTMIN) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + def test_handle_shutdown_request(self): + """Mutate HerokuWorker so _horse_pid refers to an artificial process + and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + path = os.path.join(self.sandbox, 'shouldnt_exist') + p = Process(target=create_file_after_timeout, args=(path, 2)) + p.start() + self.assertEqual(p.exitcode, None) + + w._horse_pid = p.pid + w.handle_warm_shutdown_request() + p.join(2) + self.assertEqual(p.exitcode, -34) + self.assertFalse(os.path.exists(path)) + + def test_handle_shutdown_request_no_horse(self): + """Mutate HerokuWorker so _horse_pid refers to non existent process + and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + w._horse_pid = 19999 + with self.assertRaises(OSError): + w.handle_warm_shutdown_request()