diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 18979dc..d5fc1a1 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -132,10 +132,11 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') +@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, - verbose, quiet, sentry_dsn, pid, queues): + verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" if path: @@ -157,6 +158,9 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) + exc_handler = [] + for h in exception_handler: + exc_handler.append(import_attribute(h)) try: queues = [queue_class(queue, connection=conn) for queue in queues] @@ -165,7 +169,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, connection=conn, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, - job_class=job_class) + job_class=job_class, + exc_handler=exc_handler) # Should we configure Sentry? if sentry_dsn: diff --git a/rq/worker.py b/rq/worker.py index bf40a65..55ff452 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -140,7 +140,10 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack self.push_exc_handler(self.move_to_failed_queue) - if exc_handler is not None: + if isinstance(exc_handler, list): + for h in exc_handler: + self.push_exc_handler(h) + elif exc_handler is not None: self.push_exc_handler(exc_handler) if job_class is not None: diff --git a/tests/fixtures.py b/tests/fixtures.py index a0e8eba..3ef8482 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -87,3 +87,7 @@ with Connection(): def long_running_job(): time.sleep(10) + +def black_hole(job, *exc_info): + # Don't fall through to default behaviour (moving to failed queue) + return False diff --git a/tests/test_cli.py b/tests/test_cli.py index a92fb34..fad684a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from click.testing import CliRunner -from rq import get_failed_queue +from rq import get_failed_queue, Queue from rq.compat import is_python_version from rq.job import Job from rq.cli import main @@ -63,3 +63,27 @@ class TestRQCli(RQTestCase): runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assertEqual(result.exit_code, 0) + + def test_exc_handler(self): + """rq worker -u -b --exception-handler """ + q = Queue() + failed_q = get_failed_queue() + failed_q.empty() + # Preconditions + self.assertEquals(failed_q.count, 0) + self.assertEquals(q.count, 0) + + # Action + job = q.enqueue(div_by_zero) + self.assertEquals(q.count, 1) + + runner = CliRunner() + result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.black_hole']) + self.assertEqual(result.exit_code, 0) + # Postconditions + self.assertEquals(q.count, 0) + self.assertEquals(failed_q.count, 0) + + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.is_failed, True)