diff --git a/rq/worker.py b/rq/worker.py index 18e52f8..f9466a4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,3 +1,4 @@ +import sys import os import errno import random @@ -104,6 +105,7 @@ class Worker(object): self._name = name self.queues = queues self.validate_queues() + self._exc_handlers = [] self.default_result_ttl = default_result_ttl self._state = 'starting' self._is_horse = False @@ -112,6 +114,10 @@ class Worker(object): self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + # By default, push the "move-to-failed-queue" exception handler onto + # the stack + self.push_exc_handler(self.move_to_failed_queue) + def validate_queues(self): # noqa """Sanity check for the given queues.""" @@ -387,13 +393,9 @@ class Worker(object): # use the same exc handling when pickling fails pickled_rv = dumps(rv) job._status = Status.FINISHED - except Exception as e: - fq = self.failed_queue - self.log.exception(red(str(e))) - self.log.warning('Moving job to %s queue.' % fq.name) + except: job._status = Status.FAILED - - fq.quarantine(job, exc_info=traceback.format_exc()) + self.handle_exception(job, *sys.exc_info()) return False if rv is None: @@ -423,3 +425,35 @@ class Worker(object): p.execute() return True + + + def handle_exception(self, job, *exc_info): + """Walks the exception handler stack to delegate exception handling.""" + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.exception(red(exc_string)) + + for handler in reversed(self._exc_handlers): + self.log.debug('Invoking exception handler %s' % (handler,)) + fallthrough = handler(job, *exc_info) + + # Only handlers with explicit return values should disable further + # exc handling, so interpret a None return value as True. + if fallthrough is None: + fallthrough = True + + if not fallthrough: + break + + def move_to_failed_queue(self, job, *exc_info): + """Default exception handler: move the job to the failed queue.""" + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.warning('Moving job to %s queue.' % self.failed_queue.name) + self.failed_queue.quarantine(job, exc_info=exc_string) + + def push_exc_handler(self, handler_func): + """Pushes an exception handler onto the exc hanlder stack.""" + self._exc_handlers.append(handler_func) + + def pop_exc_handler(self, handler_func): + """Pops the latest exception handler off of the exc hanlder stack.""" + return self._exc_handlers.pop()