From a20deb2c52ae2da77ee2e3aa675a07e4b6a1cf15 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 10:06:45 +0200 Subject: [PATCH 01/10] Add custom exception handling capabilities. This fixes #95. --- rq/worker.py | 46 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 18e52f8..f9466a4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,3 +1,4 @@ +import sys import os import errno import random @@ -104,6 +105,7 @@ class Worker(object): self._name = name self.queues = queues self.validate_queues() + self._exc_handlers = [] self.default_result_ttl = default_result_ttl self._state = 'starting' self._is_horse = False @@ -112,6 +114,10 @@ class Worker(object): self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + # By default, push the "move-to-failed-queue" exception handler onto + # the stack + self.push_exc_handler(self.move_to_failed_queue) + def validate_queues(self): # noqa """Sanity check for the given queues.""" @@ -387,13 +393,9 @@ class Worker(object): # use the same exc handling when pickling fails pickled_rv = dumps(rv) job._status = Status.FINISHED - except Exception as e: - fq = self.failed_queue - self.log.exception(red(str(e))) - self.log.warning('Moving job to %s queue.' % fq.name) + except: job._status = Status.FAILED - - fq.quarantine(job, exc_info=traceback.format_exc()) + self.handle_exception(job, *sys.exc_info()) return False if rv is None: @@ -423,3 +425,35 @@ class Worker(object): p.execute() return True + + + def handle_exception(self, job, *exc_info): + """Walks the exception handler stack to delegate exception handling.""" + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.exception(red(exc_string)) + + for handler in reversed(self._exc_handlers): + self.log.debug('Invoking exception handler %s' % (handler,)) + fallthrough = handler(job, *exc_info) + + # Only handlers with explicit return values should disable further + # exc handling, so interpret a None return value as True. + if fallthrough is None: + fallthrough = True + + if not fallthrough: + break + + def move_to_failed_queue(self, job, *exc_info): + """Default exception handler: move the job to the failed queue.""" + exc_string = ''.join(traceback.format_exception(*exc_info)) + self.log.warning('Moving job to %s queue.' % self.failed_queue.name) + self.failed_queue.quarantine(job, exc_info=exc_string) + + def push_exc_handler(self, handler_func): + """Pushes an exception handler onto the exc hanlder stack.""" + self._exc_handlers.append(handler_func) + + def pop_exc_handler(self, handler_func): + """Pops the latest exception handler off of the exc hanlder stack.""" + return self._exc_handlers.pop() From daa8f38b4bf7be7d06bda6d8e78c9cbef548d20a Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 10:19:04 +0200 Subject: [PATCH 02/10] Improve formatting of the exceptions on the console. --- rq/worker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index f9466a4..21a9302 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,6 @@ from .version import VERSION green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') -red = make_colorizer('darkred') blue = make_colorizer('darkblue') logger = logging.getLogger(__name__) @@ -429,8 +428,10 @@ class Worker(object): def handle_exception(self, job, *exc_info): """Walks the exception handler stack to delegate exception handling.""" - exc_string = ''.join(traceback.format_exception(*exc_info)) - self.log.exception(red(exc_string)) + exc_string = ''.join( + traceback.format_exception_only(*exc_info[:2]) + + traceback.format_exception(*exc_info)) + self.log.error(exc_string) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) From f457bd9da2e461ab22179be9881dcf85f816e9e2 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 10:26:37 +0200 Subject: [PATCH 03/10] Remove incorrect arg to pop_exc_handler method. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 21a9302..ec21806 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -455,6 +455,6 @@ class Worker(object): """Pushes an exception handler onto the exc hanlder stack.""" self._exc_handlers.append(handler_func) - def pop_exc_handler(self, handler_func): + def pop_exc_handler(self): """Pops the latest exception handler off of the exc hanlder stack.""" return self._exc_handlers.pop() From cdc3a6c7a9929a4826681e2c3b0147b68f0abc65 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 12:07:18 +0200 Subject: [PATCH 04/10] Convenience exc_handler registration in __init__. Allow custom exception handlers to be passed in in the Worker constructor. --- rq/worker.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index ec21806..4b0273a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -95,7 +95,7 @@ class Worker(object): def __init__(self, queues, name=None, default_result_ttl=500, - connection=None): # noqa + connection=None, exc_handler=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -116,6 +116,8 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack self.push_exc_handler(self.move_to_failed_queue) + if exc_handler is not None: + self.push_exc_handler(exc_handler) def validate_queues(self): # noqa From 502bf593993c94cd1f3e7269edf61d4730851223 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 12:08:54 +0200 Subject: [PATCH 05/10] Add convenience directive to easily configure Sentry with RQ. --- rq/contrib/__init__.py | 0 rq/contrib/sentry.py | 15 +++++++++++++++ 2 files changed, 15 insertions(+) create mode 100644 rq/contrib/__init__.py create mode 100644 rq/contrib/sentry.py diff --git a/rq/contrib/__init__.py b/rq/contrib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py new file mode 100644 index 0000000..7ffe12b --- /dev/null +++ b/rq/contrib/sentry.py @@ -0,0 +1,15 @@ +def register_sentry(client, worker): + """Given a Raven client and an RQ worker, registers exception handlers + with the worker so exceptions are logged to Sentry. + """ + def send_to_sentry(job, *exc_info): + client.captureException( + exc_info=exc_info, + extra={ + 'job_id': job.id, + 'func': job.func, + 'args': job.args, + 'kwargs': job.kwargs, + }) + + worker.push_exc_handler(send_to_sentry) From db4ec16be8e252576cc6ee25a0d238d88cc887fd Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 12:18:26 +0200 Subject: [PATCH 06/10] Also report job description. --- rq/contrib/sentry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index 7ffe12b..abaef72 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -10,6 +10,7 @@ def register_sentry(client, worker): 'func': job.func, 'args': job.args, 'kwargs': job.kwargs, + 'description': job.description, }) worker.push_exc_handler(send_to_sentry) From db80be4ef7a08b217aa995e8e638a8287403f915 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 12:19:49 +0200 Subject: [PATCH 07/10] Fix typos. --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 4b0273a..2378614 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -454,9 +454,9 @@ class Worker(object): self.failed_queue.quarantine(job, exc_info=exc_string) def push_exc_handler(self, handler_func): - """Pushes an exception handler onto the exc hanlder stack.""" + """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func) def pop_exc_handler(self): - """Pops the latest exception handler off of the exc hanlder stack.""" + """Pops the latest exception handler off of the exc handler stack.""" return self._exc_handlers.pop() From 9e22847d13f8817a5d552acfe47bf1bfc9195b4c Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 13:06:41 +0200 Subject: [PATCH 08/10] Update changelog. --- CHANGES.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index bf0eeb7..2baed34 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,9 @@ - Remove `logbook` dependency (in favor of `logging`) +- Custom exception handlers can now be configured in addition to, or to fully + replace, moving failed jobs to the failed queue. + ### 0.3.0 (August 5th, 2012) From cd05f6550a6c302e85ea7820fb7a8aaa82608b53 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 13:11:01 +0200 Subject: [PATCH 09/10] Link to docs from within changelog. --- CHANGES.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 2baed34..4d2cf95 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -20,7 +20,9 @@ - Remove `logbook` dependency (in favor of `logging`) - Custom exception handlers can now be configured in addition to, or to fully - replace, moving failed jobs to the failed queue. + replace, moving failed jobs to the failed queue. Relevant documentation + [here](http://python-rq.org/docs/exceptions/) and + [here](http://python-rq.org/patterns/sentry/). ### 0.3.0 From 781f3e0460dbf4e91e3350dd0b54508faf7f4a25 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 29 Aug 2012 13:21:48 +0200 Subject: [PATCH 10/10] Add test for custom exc handling. --- rq/job.py | 9 +++++++-- rq/worker.py | 3 ++- tests/test_worker.py | 28 ++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/rq/job.py b/rq/job.py index 84fa3f8..a8c7937 100644 --- a/rq/job.py +++ b/rq/job.py @@ -87,11 +87,16 @@ class Job(object): def func_name(self): return self._func_name - @property - def status(self): + def _get_status(self): self._status = self.connection.hget(self.key, 'status') return self._status + def _set_status(self, status): + self._status = status + self.connection.hset(self.key, 'status', self._status) + + status = property(_get_status, _set_status) + @property def is_finished(self): return self.status == Status.FINISHED diff --git a/rq/worker.py b/rq/worker.py index 2378614..4d5c0b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -395,7 +395,8 @@ class Worker(object): pickled_rv = dumps(rv) job._status = Status.FINISHED except: - job._status = Status.FAILED + # Use the public setter here, to immediately update Redis + job.status = Status.FAILED self.handle_exception(job, *sys.exc_info()) return False diff --git a/tests/test_worker.py b/tests/test_worker.py index 67cf6e3..e3e706e 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -96,6 +96,34 @@ class TestWorker(RQTestCase): self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info + def test_custom_exc_handling(self): + """Custom exception handling.""" + def black_hole(job, *exc_info): + # Don't fall through to default behaviour (moving to failed queue) + return False + + q = Queue() + failed_q = get_failed_queue() + + # Preconditions + self.assertEquals(failed_q.count, 0) + self.assertEquals(q.count, 0) + + # Action + job = q.enqueue(div_by_zero) + self.assertEquals(q.count, 1) + + w = Worker([q], exc_handler=black_hole) + w.work(burst=True) # should silently pass + + # Postconditions + self.assertEquals(q.count, 0) + self.assertEquals(failed_q.count, 0) + + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.is_failed, True) + def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs."""