better function names and process double SIGRTMIN

main
Samuel Colvin 9 years ago
parent 2b544e5b17
commit 9f9c887645

@ -539,13 +539,7 @@ class Worker(object):
# that are different from the worker. # that are different from the worker.
random.seed() random.seed()
# Always ignore Ctrl+C in the work horse, as it might abort the self.setup_work_horse_signals()
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self._is_horse = True self._is_horse = True
self.log = logger self.log = logger
@ -556,6 +550,16 @@ class Worker(object):
# constrast to the regular sys.exit() # constrast to the regular sys.exit()
os._exit(int(not success)) os._exit(int(not success))
def setup_work_horse_signals(self):
"""Setup signal handing for the newly spawned work horse."""
# Always ignore Ctrl+C in the work horse, as it might abort the
# currently running job.
# The main worker catches the Ctrl+C and requests graceful shutdown
# after the current work is done. When cold shutdown is requested, it
# kills the current job anyway.
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def prepare_job_execution(self, job): def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to """Performs misc bookkeeping like updating states prior to
job execution. job execution.
@ -734,20 +738,12 @@ class HerokuWorker(Worker):
frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', frame_properties = ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value',
'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace'] 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']
def main_work_horse(self, job, queue): def setup_work_horse_signals(self):
"""Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" """Modified to ignore SIGINT and SIGTERM and only handle SIGRTMIN"""
random.seed() signal.signal(signal.SIGRTMIN, self.request_stop_sigrtmin)
signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent)
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_IGN)
self._is_horse = True
self.log = logger
success = self.perform_job(job, queue)
os._exit(int(not success))
def handle_warm_shutdown_request(self): def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN""" """If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0: if self.horse_pid != 0:
@ -756,17 +752,18 @@ class HerokuWorker(Worker):
else: else:
self.log.warning('Warm shut down requested, no horse found') self.log.warning('Warm shut down requested, no horse found')
def handle_shutdown_imminent(self, signum, frame): def request_stop_sigrtmin(self, signum, frame):
if self.imminent_shutdown_delay == 0: if self.imminent_shutdown_delay == 0:
logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') logger.warn('Imminent shutdown, raising ShutDownImminentException immediately')
self.force_shutdown(signum, frame) self.request_force_stop_sigrtmin(signum, frame)
else: else:
logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds',
self.imminent_shutdown_delay) self.imminent_shutdown_delay)
signal.signal(signal.SIGALRM, self.force_shutdown) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay) signal.alarm(self.imminent_shutdown_delay)
def force_shutdown(self, signum, frame): def request_force_stop_sigrtmin(self, signum, frame):
info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties)
logger.warn('raising ShutDownImminentException to cancel job...') logger.warn('raising ShutDownImminentException to cancel job...')
raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)

@ -734,6 +734,23 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
@slow
def test_shutdown_double_sigrtmin(self):
"""Heroku work horse shutdown with long delay but SIGRTMIN sent twice"""
p = Process(target=run_dummy_heroku_worker, args=(self.sandbox, 10))
p.start()
time.sleep(0.5)
os.kill(p.pid, signal.SIGRTMIN)
# we have to wait a short while otherwise the second signal wont bet processed.
time.sleep(0.1)
os.kill(p.pid, signal.SIGRTMIN)
p.join(2)
self.assertEqual(p.exitcode, 1)
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
def test_handle_shutdown_request(self): def test_handle_shutdown_request(self):
"""Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request""" """Mutate HerokuWorker so _horse_pid refers to an artificial process and test handle_warm_shutdown_request"""
w = HerokuWorker('foo') w = HerokuWorker('foo')

Loading…
Cancel
Save