diff --git a/CHANGES.md b/CHANGES.md index bf0eeb7..4d2cf95 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,11 @@ - Remove `logbook` dependency (in favor of `logging`) +- Custom exception handlers can now be configured in addition to, or to fully + replace, moving failed jobs to the failed queue. Relevant documentation + [here](http://python-rq.org/docs/exceptions/) and + [here](http://python-rq.org/patterns/sentry/). + ### 0.3.0 (August 5th, 2012) diff --git a/rq/contrib/__init__.py b/rq/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py new file mode 100644 index 0000000..abaef72 --- /dev/null +++ b/rq/contrib/sentry.py @@ -0,0 +1,16 @@ +def register_sentry(client, worker): + """Given a Raven client and an RQ worker, registers exception handlers + with the worker so exceptions are logged to Sentry. + """ + def send_to_sentry(job, *exc_info): + client.captureException( + exc_info=exc_info, + extra={ + 'job_id': job.id, + 'func': job.func, + 'args': job.args, + 'kwargs': job.kwargs, + 'description': job.description, + }) + + worker.push_exc_handler(send_to_sentry) diff --git a/rq/job.py b/rq/job.py index 84fa3f8..a8c7937 100644 --- a/rq/job.py +++ b/rq/job.py @@ -87,11 +87,16 @@ class Job(object): def func_name(self): return self._func_name - @property - def status(self): + def _get_status(self): self._status = self.connection.hget(self.key, 'status') return self._status + def _set_status(self, status): + self._status = status + self.connection.hset(self.key, 'status', self._status) + + status = property(_get_status, _set_status) + @property def is_finished(self): return self.status == Status.FINISHED diff --git a/rq/worker.py b/rq/worker.py index 18e52f8..4d5c0b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,3 +1,4 @@ +import sys import os import errno import random @@ -22,7 +23,6 @@ from .version import VERSION green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') -red = make_colorizer('darkred') blue = make_colorizer('darkblue') logger = logging.getLogger(__name__) @@ -95,7 +95,7 @@ class Worker(object): def __init__(self, queues, name=None, default_result_ttl=500, - connection=None): # noqa + connection=None, exc_handler=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -104,6 +104,7 @@ class Worker(object): self._name = name self.queues = queues self.validate_queues() + self._exc_handlers = [] self.default_result_ttl = default_result_ttl self._state = 'starting' self._is_horse = False @@ -112,6 +113,12 @@ class Worker(object): self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + # By default, push the "move-to-failed-queue" exception handler onto + # the stack + self.push_exc_handler(self.move_to_failed_queue) + if exc_handler is not None: + self.push_exc_handler(exc_handler) + def validate_queues(self): # noqa """Sanity check for the given queues.""" @@ -387,13 +394,10 @@ class Worker(object): # use the same exc handling when pickling fails pickled_rv = dumps(rv) job._status = Status.FINISHED - except Exception as e: - fq = self.failed_queue - self.log.exception(red(str(e))) - self.log.warning('Moving job to %s queue.' % fq.name) - job._status = Status.FAILED - - fq.quarantine(job, exc_info=traceback.format_exc()) + except: + # Use the public setter here, to immediately update Redis + job.status = Status.FAILED + self.handle_exception(job, *sys.exc_info()) return False if rv is None: @@ -423,3 +427,37 @@ class Worker(object): p.execute() return True + + + def handle_exception(self, job, *exc_info): + """Walks the exception handler stack to delegate exception handling.""" + exc_string = ''.join( + traceback.format_exception_only(*exc_info[:2]) + + traceback.format_exception(*exc_info)) + self.log.error(exc_string) + + for handler in reversed(self._exc_handlers): + self.log.debug('Invoking exception handler %s' % (handler,)) + fallthrough = handler(job, *exc_info) + + # Only handlers with explicit return values should disable further + # exc handling, so interpret a None return value as True. + if fallthrough is None: + fallthrough = True + + if not fallthrough: + break + + def move_to_failed_queue(self, job, *exc_info): + """Default exception handler: move the job to the failed queue.""" + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.warning('Moving job to %s queue.' % self.failed_queue.name) + self.failed_queue.quarantine(job, exc_info=exc_string) + + def push_exc_handler(self, handler_func): + """Pushes an exception handler onto the exc handler stack.""" + self._exc_handlers.append(handler_func) + + def pop_exc_handler(self): + """Pops the latest exception handler off of the exc handler stack.""" + return self._exc_handlers.pop() diff --git a/tests/test_worker.py b/tests/test_worker.py index 67cf6e3..e3e706e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -96,6 +96,34 @@ class TestWorker(RQTestCase): self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info + def test_custom_exc_handling(self): + """Custom exception handling.""" + def black_hole(job, *exc_info): + # Don't fall through to default behaviour (moving to failed queue) + return False + + q = Queue() + failed_q = get_failed_queue() + + # Preconditions + self.assertEquals(failed_q.count, 0) + self.assertEquals(q.count, 0) + + # Action + job = q.enqueue(div_by_zero) + self.assertEquals(q.count, 1) + + w = Worker([q], exc_handler=black_hole) + w.work(burst=True) # should silently pass + + # Postconditions + self.assertEquals(q.count, 0) + self.assertEquals(failed_q.count, 0) + + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.is_failed, True) + def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs."""