diff --git a/rq/worker.py b/rq/worker.py index c2fab62..dfa70cd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -539,13 +539,7 @@ class Worker(object): # that are different from the worker. random.seed() - # Always ignore Ctrl+C in the work horse, as it might abort the - # currently running job. - # The main worker catches the Ctrl+C and requests graceful shutdown - # after the current work is done. When cold shutdown is requested, it - # kills the current job anyway. - signal.signal(signal.SIGINT, signal.SIG_IGN) - signal.signal(signal.SIGTERM, signal.SIG_DFL) + self.setup_work_horse_signals() self._is_horse = True self.log = logger @@ -556,6 +550,16 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) + def setup_work_horse_signals(self): + """Setup signal handing for the newly spawned work horse.""" + # Always ignore Ctrl+C in the work horse, as it might abort the + # currently running job. + # The main worker catches the Ctrl+C and requests graceful shutdown + # after the current work is done. When cold shutdown is requested, it + # kills the current job anyway. + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + def prepare_job_execution(self, job): """Performs misc bookkeeping like updating states prior to job execution. @@ -734,20 +738,12 @@ class HerokuWorker(Worker): frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] - def main_work_horse(self, job, queue): - """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" - random.seed() - - signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent) + def setup_work_horse_signals(self): + """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN""" + signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin) signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN) - self._is_horse = True - self.log = logger - - success = self.perform_job(job, queue) - os._exit(int(not success)) - def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: @@ -756,17 +752,18 @@ class HerokuWorker(Worker): else: self.log.warning('Warm shut down requested, no horse found') - def handle_shutdown_imminent(self, signum, frame): + def request_stop_sigrtmin(self, signum, frame): if self.imminent_shutdown_delay == 0: logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') - self.force_shutdown(signum, frame) + self.request_force_stop_sigrtmin(signum, frame) else: logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay) - signal.signal(signal.SIGALRM, self.force_shutdown) + signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) + signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) - def force_shutdown(self, signum, frame): + def request_force_stop_sigrtmin(self, signum, frame): info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) logger.warn('raising ShutDownImminentException to cancel job...') raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3b4b53e..6195dc4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -734,6 +734,23 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + @slow + def test_shutdown_double_sigrtmin(self): + """Heroku work horse shutdown with long delay but SIGRTMIN sent twice""" + p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 10)) + p.start() + time.sleep(0.5) + + os.kill(p.pid, signal.SIGRTMIN) + # we have to wait a short while otherwise the second signal wont bet processed. + time.sleep(0.1) + os.kill(p.pid, signal.SIGRTMIN) + p.join(2) + self.assertEqual(p.exitcode, 1) + + self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) + self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) + def test_handle_shutdown_request(self): """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" w = HerokuWorker('foo')