From 18ba4658a428c557cc44745445e5c27b66e6c148 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sat, 4 Jun 2016 13:11:25 +0100 Subject: [PATCH 1/6] adding heroku worker as per #584 --- rq/worker.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 831fe03..6c63d11 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -714,3 +714,81 @@ class SimpleWorker(Worker): def execute_job(self, *args, **kwargs): """Execute job in same thread/process, do not fork()""" return self.perform_job(*args, **kwargs) + + +class ShutDownImminentException(Exception): + def __init__(self, msg, extra_info): + self.extra_info = extra_info + super(ShutDownImminentException, self).__init__(msg) + + +class HerokuWorker(Worker): + """ + Modified version of rq worker which: + * stops work horses getting killed with SIGTERM + * sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish + Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover + """ + imminent_shutdown_delay = 8 + + def main_work_horse(self, job, queue): + """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" + random.seed() + + signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent) + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + self._is_horse = True + self.log = logger + + success = self.perform_job(job, queue) + os._exit(int(not success)) + + def request_stop(self, signum, frame): + """Stops the current worker loop but waits for child processes to + end gracefully (warm shutdown). + """ + self.log.debug('Got signal {0}'.format(signal_name(signum))) + + signal.signal(signal.SIGINT, self.request_force_stop) + signal.signal(signal.SIGTERM, self.request_force_stop) + + # start altered { + if self.horse_pid != 0: + self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') + try: + os.kill(self.horse_pid, signal.SIGRTMIN) + except OSError as e: + if e.errno != errno.ESRCH: + self.log.debug('Horse already down') + raise + else: + self.log.warning('Warm shut down requested, no horse found') + # } end altered + + # If shutdown is requested in the middle of a job, wait until + # finish before shutting down + if self.get_state() == 'busy': + self._stop_requested = True + self.log.debug('Stopping after current horse is finished. ' + 'Press Ctrl+C again for a cold shutdown.') + else: + raise StopRequested() + + def handle_shutdown_imminent(self, signum, frame): + if self.imminent_shutdown_delay == 0: + logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') + self.force_shutdown(signum, frame) + else: + logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', + self.imminent_shutdown_delay) + signal.signal(signal.SIGALRM, self.force_shutdown) + signal.alarm(self.imminent_shutdown_delay) + + @staticmethod + def force_shutdown(signum, frame): + info = {attr: getattr(frame, attr) for attr in ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', + 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']} + logger.warn('raising ShutDownImminentException to cancel job...') + raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) From e2f89b3171aef3cd0d4bd946e181a94b5b0a7904 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sat, 4 Jun 2016 13:31:07 +0100 Subject: [PATCH 2/6] fix for python 2.6 --- rq/worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 6c63d11..c9fe4b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -730,6 +730,8 @@ class HerokuWorker(Worker): Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover """ imminent_shutdown_delay = 8 + frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_lasti', 'f_lineno', 'f_locals', + 'f_restricted', 'f_trace'] def main_work_horse(self, job, queue): """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" @@ -786,9 +788,7 @@ class HerokuWorker(Worker): signal.signal(signal.SIGALRM, self.force_shutdown) signal.alarm(self.imminent_shutdown_delay) - @staticmethod - def force_shutdown(signum, frame): - info = {attr: getattr(frame, attr) for attr in ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', - 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']} + def force_shutdown(self, signum, frame): + info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) logger.warn('raising ShutDownImminentException to cancel job...') raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) From 2b544e5b17537bd200b6aea019b8f8ddf405861d Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sat, 4 Jun 2016 15:15:27 +0100 Subject: [PATCH 3/6] add tests to HerokuWorker --- rq/worker.py | 42 +++++--------------- tests/fixtures.py | 24 +++++++++++- tests/test_worker.py | 91 +++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 119 insertions(+), 38 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c9fe4b4..c2fab62 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -371,8 +371,7 @@ class Worker(object): signal.signal(signal.SIGINT, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop) - msg = 'Warm shut down requested' - self.log.warning(msg) + self.handle_warm_shutdown_request() # If shutdown is requested in the middle of a job, wait until # finish before shutting down and save the request in redis @@ -384,6 +383,9 @@ class Worker(object): else: raise StopRequested() + def handle_warm_shutdown_request(self): + self.log.warning('Warm shut down requested') + def check_for_suspension(self, burst): """Check to see if workers have been suspended by `rq suspend`""" @@ -727,11 +729,10 @@ class HerokuWorker(Worker): Modified version of rq worker which: * stops work horses getting killed with SIGTERM * sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish - Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover """ - imminent_shutdown_delay = 8 - frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_lasti', 'f_lineno', 'f_locals', - 'f_restricted', 'f_trace'] + imminent_shutdown_delay = 6 + frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', + 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] def main_work_horse(self, job, queue): """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" @@ -747,36 +748,13 @@ class HerokuWorker(Worker): success = self.perform_job(job, queue) os._exit(int(not success)) - def request_stop(self, signum, frame): - """Stops the current worker loop but waits for child processes to - end gracefully (warm shutdown). - """ - self.log.debug('Got signal {0}'.format(signal_name(signum))) - - signal.signal(signal.SIGINT, self.request_force_stop) - signal.signal(signal.SIGTERM, self.request_force_stop) - - # start altered { + def handle_warm_shutdown_request(self): + """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') - try: - os.kill(self.horse_pid, signal.SIGRTMIN) - except OSError as e: - if e.errno != errno.ESRCH: - self.log.debug('Horse already down') - raise + os.kill(self.horse_pid, signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') - # } end altered - - # If shutdown is requested in the middle of a job, wait until - # finish before shutting down - if self.get_state() == 'busy': - self._stop_requested = True - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') - else: - raise StopRequested() def handle_shutdown_imminent(self, signum, frame): if self.imminent_shutdown_delay == 0: diff --git a/tests/fixtures.py b/tests/fixtures.py index 2d15d88..6b05ba1 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -9,9 +9,10 @@ from __future__ import (absolute_import, division, print_function, import os import time -from rq import Connection, get_current_job, get_current_connection +from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job from rq.compat import PY2 +from rq.worker import HerokuWorker def say_pid(): @@ -102,3 +103,24 @@ def black_hole(job, *exc_info): def long_running_job(timeout=10): time.sleep(timeout) return 'Done sleeping...' + + +def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay): + """ + Run a simplified heroku worker where perform_job job just creates two files 2 seconds apart + :param sandbox: directory to create files in + :param _imminent_shutdown_delay: delay to use for TestHerokuWorker + :return: + """ + class TestHerokuWorker(HerokuWorker): + imminent_shutdown_delay = _imminent_shutdown_delay + + def perform_job(self, job, queue): + create_file(os.path.join(sandbox, 'started')) + # have to loop here rather than one sleep to avoid holding the GIL and preventing signals being recieved + for i in range(20): + time.sleep(0.1) + create_file(os.path.join(sandbox, 'finished')) + + w = TestHerokuWorker(Queue('dummy')) + w.main_work_horse(None, None) diff --git a/tests/test_worker.py b/tests/test_worker.py index c63f97b..3b4b53e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +import shutil from datetime import timedelta from time import sleep import signal @@ -13,7 +14,7 @@ import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - access_self) + run_dummy_heroku_worker, access_self) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -23,6 +24,7 @@ from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend from rq.utils import utcnow +from rq.worker import HerokuWorker class CustomJob(Job): @@ -473,6 +475,7 @@ class TestWorker(RQTestCase): assert q.count == 0 self.assertEqual(os.path.exists(SENTINEL_FILE), True) + @slow def test_suspend_with_duration(self): q = Queue() for _ in range(5): @@ -575,7 +578,7 @@ def kill_worker(pid, double_kill): os.kill(pid, signal.SIGTERM) -class TestWorkerShutdown(RQTestCase): +class TimeoutTestCase: def setUp(self): # we want tests to fail if signal are ignored and the work remain # running, so set a signal to kill them after X seconds @@ -588,6 +591,8 @@ class TestWorkerShutdown(RQTestCase): "test still running after %i seconds, likely the worker wasn't shutdown correctly" % self.killtimeout ) + +class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): @slow def test_idle_worker_warm_shutdown(self): """worker with no ongoing job receiving single SIGTERM signal and shutting down""" @@ -616,12 +621,12 @@ class TestWorkerShutdown(RQTestCase): w.work() p.join(2) + self.assertFalse(p.is_alive()) self.assertTrue(w._stop_requested) self.assertTrue(os.path.exists(sentinel_file)) - shutdown_requested_date = w.shutdown_requested_date - self.assertIsNotNone(shutdown_requested_date) - self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + self.assertIsNotNone(w.shutdown_requested_date) + self.assertEqual(type(w.shutdown_requested_date).__name__, 'datetime') @slow def test_working_worker_cold_shutdown(self): @@ -675,3 +680,79 @@ class TestWorkerSubprocess(RQTestCase): subprocess.check_call(['rqworker', '-u', self.redis_url, '-b']) assert get_failed_queue().count == 0 assert q.count == 0 + + +class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): + def setUp(self): + super(HerokuWorkerShutdownTestCase, self).setUp() + self.sandbox = '/tmp/rq_shutdown/' + os.makedirs(self.sandbox) + + def tearDown(self): + shutil.rmtree(self.sandbox, ignore_errors=True) + + @slow + def test_idle_worker_shutdown(self): + """worker with no ongoing job receiving single SIGTERM signal and shutting down""" + w = HerokuWorker('foo') + self.assertFalse(w._stop_requested) + p = Process(target=kill_worker, args=(os.getpid(), False)) + p.start() + + w.work() + + p.join(1) + self.assertFalse(w._stop_requested) + + @slow + def test_immediate_shutdown(self): + """Heroku work horse shutdown with immediate (0 second) kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 0)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + + p.join(2) + self.assertEqual(p.exitcode, 1) + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + @slow + def test_1_sec_shutdown(self): + """Heroku work horse shutdown with 1 second kill""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 1)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + time.sleep(0.1) + self.assertEqual(p.exitcode, None) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + + def test_handle_shutdown_request(self): + """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + path = os.path.join(self.sandbox, 'shouldnt_exist') + p = Process(target=create_file_after_timeout, args=(path, 2)) + p.start() + self.assertEqual(p.exitcode, None) + + w._horse_pid = p.pid + w.handle_warm_shutdown_request() + p.join(2) + self.assertEqual(p.exitcode, -34) + self.assertFalse(os.path.exists(path)) + + def test_handle_shutdown_request_no_horse(self): + """Mutate HerokuWorker so _horse_pid refers to non existent process and test handle_warm_shutdown_request""" + w = HerokuWorker('foo') + + w._horse_pid = 19999 + with self.assertRaises(OSError): + w.handle_warm_shutdown_request() From 9f9c8876456f9d4bbd9b36d1340c3a6a61ff173e Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sat, 4 Jun 2016 15:58:44 +0100 Subject: [PATCH 4/6] better function names and process double SIGRTMIN --- rq/worker.py | 41 +++++++++++++++++++---------------------- tests/test_worker.py | 17 +++++++++++++++++ 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c2fab62..dfa70cd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -539,13 +539,7 @@ class Worker(object): # that are different from the worker. random.seed() - # Always ignore Ctrl+C in the work horse, as it might abort the - # currently running job. - # The main worker catches the Ctrl+C and requests graceful shutdown - # after the current work is done. When cold shutdown is requested, it - # kills the current job anyway. - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.setup_work_horse_signals() self._is_horse = True self.log = logger @@ -556,6 +550,16 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) + def setup_work_horse_signals(self): + """Setup signal handing for the newly spawned work horse.""" + # Always ignore Ctrl+C in the work horse, as it might abort the + # currently running job. + # The main worker catches the Ctrl+C and requests graceful shutdown + # after the current work is done. When cold shutdown is requested, it + # kills the current job anyway. + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + def prepare_job_execution(self, job): """Performs misc bookkeeping like updating states prior to job execution. @@ -734,20 +738,12 @@ class HerokuWorker(Worker): frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] - def main_work_horse(self, job, queue): - """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" - random.seed() - - signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent) + def setup_work_horse_signals(self): + """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN""" + signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin) signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) - self._is_horse = True - self.log = logger - - success = self.perform_job(job, queue) - os._exit(int(not success)) - def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: @@ -756,17 +752,18 @@ class HerokuWorker(Worker): else: self.log.warning('Warm shut down requested, no horse found') - def handle_shutdown_imminent(self, signum, frame): + def request_stop_sigrtmin(self, signum, frame): if self.imminent_shutdown_delay == 0: logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') - self.force_shutdown(signum, frame) + self.request_force_stop_sigrtmin(signum, frame) else: logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay) - signal.signal(signal.SIGALRM, self.force_shutdown) + signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) + signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) - def force_shutdown(self, signum, frame): + def request_force_stop_sigrtmin(self, signum, frame): info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) logger.warn('raising ShutDownImminentException to cancel job...') raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3b4b53e..6195dc4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -734,6 +734,23 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + @slow + def test_shutdown_double_sigrtmin(self): + """Heroku work horse shutdown with long delay but SIGRTMIN sent twice""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 10)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + # we have to wait a short while otherwise the second signal wont bet processed. + time.sleep(0.1) + os.kill(p.pid, signal.SIGRTMIN) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + def test_handle_shutdown_request(self): """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" w = HerokuWorker('foo') From 0e26db9e08b8bb08f836d87ad79cf1f558553c73 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sat, 4 Jun 2016 16:05:52 +0100 Subject: [PATCH 5/6] correct wording in docstring and tests --- rq/worker.py | 3 ++- tests/fixtures.py | 9 +++++---- tests/test_worker.py | 19 ++++--------------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index dfa70cd..235b8af 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -732,7 +732,8 @@ class HerokuWorker(Worker): """ Modified version of rq worker which: * stops work horses getting killed with SIGTERM - * sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish + * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn + causes the horse to crash `imminent_shutdown_delay` seconds later """ imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', diff --git a/tests/fixtures.py b/tests/fixtures.py index 6b05ba1..7402e06 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -107,17 +107,18 @@ def long_running_job(timeout=10): def run_dummy_heroku_worker(sandbox, _imminent_shutdown_delay): """ - Run a simplified heroku worker where perform_job job just creates two files 2 seconds apart + Run the work horse for a simplified heroku worker where perform_job just + creates two sentinel files 2 seconds apart. :param sandbox: directory to create files in - :param _imminent_shutdown_delay: delay to use for TestHerokuWorker - :return: + :param _imminent_shutdown_delay: delay to use for HerokuWorker """ class TestHerokuWorker(HerokuWorker): imminent_shutdown_delay = _imminent_shutdown_delay def perform_job(self, job, queue): create_file(os.path.join(sandbox, 'started')) - # have to loop here rather than one sleep to avoid holding the GIL and preventing signals being recieved + # have to loop here rather than one sleep to avoid holding the GIL + # and preventing signals being received for i in range(20): time.sleep(0.1) create_file(os.path.join(sandbox, 'finished')) diff --git a/tests/test_worker.py b/tests/test_worker.py index 6195dc4..7df27f0 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -691,19 +691,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): def tearDown(self): shutil.rmtree(self.sandbox, ignore_errors=True) - @slow - def test_idle_worker_shutdown(self): - """worker with no ongoing job receiving single SIGTERM signal and shutting down""" - w = HerokuWorker('foo') - self.assertFalse(w._stop_requested) - p = Process(target=kill_worker, args=(os.getpid(), False)) - p.start() - - w.work() - - p.join(1) - self.assertFalse(w._stop_requested) - @slow def test_immediate_shutdown(self): """Heroku work horse shutdown with immediate (0 second) kill""" @@ -752,7 +739,8 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) def test_handle_shutdown_request(self): - """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" + """Mutate HerokuWorker so _horse_pid refers to an artificial process + and test handle_warm_shutdown_request""" w = HerokuWorker('foo') path = os.path.join(self.sandbox, 'shouldnt_exist') @@ -767,7 +755,8 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertFalse(os.path.exists(path)) def test_handle_shutdown_request_no_horse(self): - """Mutate HerokuWorker so _horse_pid refers to non existent process and test handle_warm_shutdown_request""" + """Mutate HerokuWorker so _horse_pid refers to non existent process + and test handle_warm_shutdown_request""" w = HerokuWorker('foo') w._horse_pid = 19999 From 7efd036a2c7b5db6d91a3725152747d9bc240ada Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Mon, 13 Jun 2016 18:29:16 +0100 Subject: [PATCH 6/6] move ShutDownImminentException into exceptions.py --- rq/exceptions.py | 6 ++++++ rq/worker.py | 8 +------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 88bbbb0..530733d 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -19,3 +19,9 @@ class UnpickleError(Exception): class DequeueTimeout(Exception): pass + + +class ShutDownImminentException(Exception): + def __init__(self, msg, extra_info): + self.extra_info = extra_info + super(ShutDownImminentException, self).__init__(msg) diff --git a/rq/worker.py b/rq/worker.py index 235b8af..f7c8c63 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -18,7 +18,7 @@ from rq.compat import as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL -from .exceptions import DequeueTimeout +from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue, get_failed_queue @@ -722,12 +722,6 @@ class SimpleWorker(Worker): return self.perform_job(*args, **kwargs) -class ShutDownImminentException(Exception): - def __init__(self, msg, extra_info): - self.extra_info = extra_info - super(ShutDownImminentException, self).__init__(msg) - - class HerokuWorker(Worker): """ Modified version of rq worker which: