diff --git a/rq/worker.py b/rq/worker.py index 1155c35..5b56a36 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,9 +1,11 @@ import sys import os +import errno import random import time import procname import socket +import signal from pickle import dumps try: from logbook import Logger @@ -63,6 +65,7 @@ class Worker(object): self.rv_ttl = rv_ttl self._state = 'starting' self._is_horse = False + self._stopped = False self.log = Logger('worker') @@ -154,14 +157,46 @@ class Worker(object): state = property(get_state, set_state) + @property + def stopped(self): + return self._stopped + + def install_sigint_handler(self): + def request_force_stop(signum, frame): + """Terminates the application.""" + self.log.warning('Cold shut down.') + raise SystemExit() + + def request_stop(signum, frame): + signal.signal(signal.SIGINT, request_force_stop) + + if self.is_horse: + self.log.debug('Ignoring SIGINT.') + return + + self.log.warning('Warm shut down. Press Ctrl+C again for a cold shutdown.') + + #if self.state == 'idle': + # raise SystemExit() + self._stopped = True + self.log.debug('Stopping after current horse is finished.') + + signal.signal(signal.SIGINT, request_stop) + + def _work(self, quit_when_done=False): """This method starts the work loop. """ + self.install_sigint_handler() + did_work = False self.register_birth() self.state = 'starting' try: while True: + if self.stopped: + self.log.info('Stopping on request.') + break self.state = 'idle' qnames = self.queue_names() self.procline('Listening on %s' % (','.join(qnames))) @@ -171,10 +206,12 @@ class Worker(object): if job is None: break self.state = 'busy' + self.fork_and_perform_job(job) + did_work = True finally: - if not self._is_horse: + if not self.is_horse: self.register_death() return did_work @@ -201,7 +238,7 @@ class Worker(object): self.log = Logger('horse') try: self.perform_job(job) - except Exception, e: + except Exception as e: self.log.exception(e) sys.exit(1) sys.exit(0)