diff --git a/rq/cli/cli.py b/rq/cli/cli.py index d0a5eff..da156d3 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -142,10 +142,11 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') +@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, - verbose, quiet, sentry_dsn, pid, queues): + verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" if path: @@ -166,6 +167,9 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) + exception_handlers = [] + for h in exception_handler: + exception_handlers.append(import_attribute(h)) if is_suspended(conn): click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') @@ -179,7 +183,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, connection=conn, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, - job_class=job_class) + job_class=job_class, + exception_handlers=exception_handlers or None) # Should we configure Sentry? if sentry_dsn: diff --git a/rq/worker.py b/rq/worker.py index e31b146..5d9732c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -122,8 +122,8 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=None, connection=None, - exc_handler=None, default_worker_ttl=None, job_class=None): # noqa + default_result_ttl=None, connection=None, exc_handler=None, + exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -153,9 +153,19 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack - self.push_exc_handler(self.move_to_failed_queue) - if exc_handler is not None: - self.push_exc_handler(exc_handler) + if exception_handlers is None: + self.push_exc_handler(self.move_to_failed_queue) + if exc_handler is not None: + self.push_exc_handler(exc_handler) + warnings.warn( + "use of exc_handler is deprecated, pass a list to exception_handlers instead.", + DeprecationWarning + ) + elif isinstance(exception_handlers, list): + for h in exception_handlers: + self.push_exc_handler(h) + elif exception_handlers is not None: + self.push_exc_handler(exception_handlers) if job_class is not None: if isinstance(job_class, string_types): diff --git a/tests/fixtures.py b/tests/fixtures.py index cdb1822..c1b7783 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -13,6 +13,7 @@ from rq import Connection, get_current_job from rq.decorators import job from rq.compat import PY2 + def say_pid(): return os.getpid() @@ -92,6 +93,11 @@ with Connection(): return x + y +def black_hole(job, *exc_info): + # Don't fall through to default behaviour (moving to failed queue) + return False + + def long_running_job(timeout=10): time.sleep(timeout) return 'Done sleeping...' diff --git a/tests/test_cli.py b/tests/test_cli.py index f1b7fd4..1a812ae 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from click.testing import CliRunner -from rq import get_failed_queue +from rq import get_failed_queue, Queue from rq.compat import is_python_version from rq.job import Job from rq.cli import main @@ -75,6 +75,25 @@ class TestRQCli(RQTestCase): result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assert_normal_execution(result) + def test_exception_handlers(self): + """rq worker -u -b --exception-handler """ + q = Queue() + failed_q = get_failed_queue() + failed_q.empty() + + runner = CliRunner() + + # If exception handler is not given, failed job goes to FailedQueue + q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) + self.assertEquals(failed_q.count, 1) + + # Black hole exception handler doesn't add failed jobs to FailedQueue + q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', + '--exception-handler', 'tests.fixtures.black_hole']) + self.assertEquals(failed_q.count, 1) + def test_suspend_and_resume(self): """rq suspend -u rq resume -u @@ -101,3 +120,4 @@ class TestRQCli(RQTestCase): self.assertEqual(result.exit_code, 1) self.assertIn("Duration must be an integer greater than 1", result.output) + diff --git a/tests/test_worker.py b/tests/test_worker.py index 02dacb7..40b84b4 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -162,7 +162,7 @@ class TestWorker(RQTestCase): job = q.enqueue(div_by_zero) self.assertEquals(q.count, 1) - w = Worker([q], exc_handler=black_hole) + w = Worker([q], exception_handlers=black_hole) w.work(burst=True) # should silently pass # Postconditions