Add custom exception handling capabilities.

This fixes #95.
main
Vincent Driessen 13 years ago
parent a017756748
commit a20deb2c52

@ -1,3 +1,4 @@
import sys
import os
import errno
import random
@ -104,6 +105,7 @@ class Worker(object):
self._name = name
self.queues = queues
self.validate_queues()
self._exc_handlers = []
self.default_result_ttl = default_result_ttl
self._state = 'starting'
self._is_horse = False
@ -112,6 +114,10 @@ class Worker(object):
self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection)
# By default, push the "move-to-failed-queue" exception handler onto
# the stack
self.push_exc_handler(self.move_to_failed_queue)
def validate_queues(self): # noqa
"""Sanity check for the given queues."""
@ -387,13 +393,9 @@ class Worker(object):
# use the same exc handling when pickling fails
pickled_rv = dumps(rv)
job._status = Status.FINISHED
except Exception as e:
fq = self.failed_queue
self.log.exception(red(str(e)))
self.log.warning('Moving job to %s queue.' % fq.name)
except:
job._status = Status.FAILED
fq.quarantine(job, exc_info=traceback.format_exc())
self.handle_exception(job, *sys.exc_info())
return False
if rv is None:
@ -423,3 +425,35 @@ class Worker(object):
p.execute()
return True
def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.exception(red(exc_string))
for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,))
fallthrough = handler(job, *exc_info)
# Only handlers with explicit return values should disable further
# exc handling, so interpret a None return value as True.
if fallthrough is None:
fallthrough = True
if not fallthrough:
break
def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue."""
exc_string = ''.join(traceback.format_exception(*exc_info))
self.log.warning('Moving job to %s queue.' % self.failed_queue.name)
self.failed_queue.quarantine(job, exc_info=exc_string)
def push_exc_handler(self, handler_func):
"""Pushes an exception handler onto the exc hanlder stack."""
self._exc_handlers.append(handler_func)
def pop_exc_handler(self, handler_func):
"""Pops the latest exception handler off of the exc hanlder stack."""
return self._exc_handlers.pop()

Loading…
Cancel
Save