|
|
@ -714,3 +714,81 @@ class SimpleWorker(Worker):
|
|
|
|
def execute_job(self, *args, **kwargs):
|
|
|
|
def execute_job(self, *args, **kwargs):
|
|
|
|
"""Execute job in same thread/process, do not fork()"""
|
|
|
|
"""Execute job in same thread/process, do not fork()"""
|
|
|
|
return self.perform_job(*args, **kwargs)
|
|
|
|
return self.perform_job(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ShutDownImminentException(Exception):
|
|
|
|
|
|
|
|
def __init__(self, msg, extra_info):
|
|
|
|
|
|
|
|
self.extra_info = extra_info
|
|
|
|
|
|
|
|
super(ShutDownImminentException, self).__init__(msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class HerokuWorker(Worker):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
Modified version of rq worker which:
|
|
|
|
|
|
|
|
* stops work horses getting killed with SIGTERM
|
|
|
|
|
|
|
|
* sends SIGRTMIN to work horses on SIGTERM to the main process so they can crash as they wish
|
|
|
|
|
|
|
|
Note: coverage doesn't work inside the forked thread so code expected to be processed there has pragma: no cover
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
imminent_shutdown_delay = 8
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main_work_horse(self, job, queue):
|
|
|
|
|
|
|
|
"""Modified entry point which ignores SIGINT and SIGTERM and only handles SIGRTMIN"""
|
|
|
|
|
|
|
|
random.seed()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
signal.signal(signal.SIGRTMIN, self.handle_shutdown_imminent)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, signal.SIG_IGN)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._is_horse = True
|
|
|
|
|
|
|
|
self.log = logger
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
success = self.perform_job(job, queue)
|
|
|
|
|
|
|
|
os._exit(int(not success))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def request_stop(self, signum, frame):
|
|
|
|
|
|
|
|
"""Stops the current worker loop but waits for child processes to
|
|
|
|
|
|
|
|
end gracefully (warm shutdown).
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
self.log.debug('Got signal {0}'.format(signal_name(signum)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, self.request_force_stop)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGTERM, self.request_force_stop)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# start altered {
|
|
|
|
|
|
|
|
if self.horse_pid != 0:
|
|
|
|
|
|
|
|
self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal')
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
os.kill(self.horse_pid, signal.SIGRTMIN)
|
|
|
|
|
|
|
|
except OSError as e:
|
|
|
|
|
|
|
|
if e.errno != errno.ESRCH:
|
|
|
|
|
|
|
|
self.log.debug('Horse already down')
|
|
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
self.log.warning('Warm shut down requested, no horse found')
|
|
|
|
|
|
|
|
# } end altered
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# If shutdown is requested in the middle of a job, wait until
|
|
|
|
|
|
|
|
# finish before shutting down
|
|
|
|
|
|
|
|
if self.get_state() == 'busy':
|
|
|
|
|
|
|
|
self._stop_requested = True
|
|
|
|
|
|
|
|
self.log.debug('Stopping after current horse is finished. '
|
|
|
|
|
|
|
|
'Press Ctrl+C again for a cold shutdown.')
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
raise StopRequested()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_shutdown_imminent(self, signum, frame):
|
|
|
|
|
|
|
|
if self.imminent_shutdown_delay == 0:
|
|
|
|
|
|
|
|
logger.warn('Imminent shutdown, raising ShutDownImminentException immediately')
|
|
|
|
|
|
|
|
self.force_shutdown(signum, frame)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
logger.warn('Imminent shutdown, raising ShutDownImminentException in %d seconds',
|
|
|
|
|
|
|
|
self.imminent_shutdown_delay)
|
|
|
|
|
|
|
|
signal.signal(signal.SIGALRM, self.force_shutdown)
|
|
|
|
|
|
|
|
signal.alarm(self.imminent_shutdown_delay)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
|
|
def force_shutdown(signum, frame):
|
|
|
|
|
|
|
|
info = {attr: getattr(frame, attr) for attr in ['f_code', 'f_exc_traceback', 'f_exc_type', 'f_exc_value',
|
|
|
|
|
|
|
|
'f_lasti', 'f_lineno', 'f_locals', 'f_restricted', 'f_trace']}
|
|
|
|
|
|
|
|
logger.warn('raising ShutDownImminentException to cancel job...')
|
|
|
|
|
|
|
|
raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)
|
|
|
|