diff --git a/rq/worker.py b/rq/worker.py index 831fe03..6c63d11 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -714,3 +714,81 @@ class SimpleWorker(Worker): def execute_job(self, *args, **kwargs): """Execute job in same thread/process, do not fork()""" return self.perform_job(*args, **kwargs) + + +class ShutDownImminentException(Exception): + def __init__(self, msg, extra_info): + self.extra_info = extra_info + super(ShutDownImminentException, self).__init__(msg) + + +class HerokuWorker(Worker): + """ + Modified version of rq worker which: + * stops work horses getting killed with SIGTERM + * sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish + Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover + """ + imminent_shutdown_delay = 8 + + def main_work_horse(self, job, queue): + """Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN""" + random.seed() + + signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent) + signal.signal(signal.SIGINT, signal.SIG_IGN) + signal.signal(signal.SIGTERM, signal.SIG_IGN) + + self._is_horse = True + self.log = logger + + success = self.perform_job(job, queue) + os._exit(int(not success)) + + def request_stop(self, signum, frame): + """Stops the current worker loop but waits for child processes to + end gracefully (warm shutdown). + """ + self.log.debug('Got signal {0}'.format(signal_name(signum))) + + signal.signal(signal.SIGINT, self.request_force_stop) + signal.signal(signal.SIGTERM, self.request_force_stop) + + # start altered { + if self.horse_pid != 0: + self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal') + try: + os.kill(self.horse_pid, signal.SIGRTMIN) + except OSError as e: + if e.errno != errno.ESRCH: + self.log.debug('Horse already down') + raise + else: + self.log.warning('Warm shut down requested, no horse found') + # } end altered + + # If shutdown is requested in the middle of a job, wait until + # finish before shutting down + if self.get_state() == 'busy': + self._stop_requested = True + self.log.debug('Stopping after current horse is finished. ' + 'Press Ctrl+C again for a cold shutdown.') + else: + raise StopRequested() + + def handle_shutdown_imminent(self, signum, frame): + if self.imminent_shutdown_delay == 0: + logger.warn('Imminent shutdown, raising ShutDownImminentException immediately') + self.force_shutdown(signum, frame) + else: + logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds', + self.imminent_shutdown_delay) + signal.signal(signal.SIGALRM, self.force_shutdown) + signal.alarm(self.imminent_shutdown_delay) + + @staticmethod + def force_shutdown(signum, frame): + info = {attr: getattr(frame, attr) for attr in ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value', + 'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']} + logger.warn('raising ShutDownImminentException to cancel job...') + raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)