From 5caccaabfe213e98a5a03a6f5914662d8c322134 Mon Sep 17 00:00:00 2001 From: Bradley Young Date: Wed, 12 Nov 2014 09:04:40 -0500 Subject: [PATCH 1/6] Adding optional list handling to the exc_handler option in Worker. Adding command line --exception_handler option (with multiple entries allowed) to `rq worker` Added tests for command line options. --- rq/cli/cli.py | 9 +++++++-- rq/worker.py | 5 ++++- tests/fixtures.py | 4 ++++ tests/test_cli.py | 26 +++++++++++++++++++++++++- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 18979dc..d5fc1a1 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -132,10 +132,11 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues): @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') +@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True) @click.option('--pid', help='Write the process ID number to a file at the specified path') @click.argument('queues', nargs=-1) def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, - verbose, quiet, sentry_dsn, pid, queues): + verbose, quiet, sentry_dsn, exception_handler, pid, queues): """Starts an RQ worker.""" if path: @@ -157,6 +158,9 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, cleanup_ghosts(conn) worker_class = import_attribute(worker_class) queue_class = import_attribute(queue_class) + exc_handler = [] + for h in exception_handler: + exc_handler.append(import_attribute(h)) try: queues = [queue_class(queue, connection=conn) for queue in queues] @@ -165,7 +169,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, connection=conn, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, - job_class=job_class) + job_class=job_class, + exc_handler=exc_handler) # Should we configure Sentry? if sentry_dsn: diff --git a/rq/worker.py b/rq/worker.py index bf40a65..55ff452 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -140,7 +140,10 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack self.push_exc_handler(self.move_to_failed_queue) - if exc_handler is not None: + if isinstance(exc_handler, list): + for h in exc_handler: + self.push_exc_handler(h) + elif exc_handler is not None: self.push_exc_handler(exc_handler) if job_class is not None: diff --git a/tests/fixtures.py b/tests/fixtures.py index a0e8eba..3ef8482 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -87,3 +87,7 @@ with Connection(): def long_running_job(): time.sleep(10) + +def black_hole(job, *exc_info): + # Don't fall through to default behaviour (moving to failed queue) + return False diff --git a/tests/test_cli.py b/tests/test_cli.py index a92fb34..fad684a 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) from click.testing import CliRunner -from rq import get_failed_queue +from rq import get_failed_queue, Queue from rq.compat import is_python_version from rq.job import Job from rq.cli import main @@ -63,3 +63,27 @@ class TestRQCli(RQTestCase): runner = CliRunner() result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) self.assertEqual(result.exit_code, 0) + + def test_exc_handler(self): + """rq worker -u -b --exception-handler """ + q = Queue() + failed_q = get_failed_queue() + failed_q.empty() + # Preconditions + self.assertEquals(failed_q.count, 0) + self.assertEquals(q.count, 0) + + # Action + job = q.enqueue(div_by_zero) + self.assertEquals(q.count, 1) + + runner = CliRunner() + result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.black_hole']) + self.assertEqual(result.exit_code, 0) + # Postconditions + self.assertEquals(q.count, 0) + self.assertEquals(failed_q.count, 0) + + # Check the job + job = Job.fetch(job.id) + self.assertEquals(job.is_failed, True) From f575137612239bdbaea0a6a811adf9bc414c0901 Mon Sep 17 00:00:00 2001 From: Bradley Young Date: Sat, 24 Jan 2015 17:09:57 -0500 Subject: [PATCH 2/6] updating worker to have exception_handlers --- rq/worker.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 55ff452..5399160 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -111,7 +111,7 @@ class Worker(object): def __init__(self, queues, name=None, default_result_ttl=None, connection=None, - exc_handler=None, default_worker_ttl=None, job_class=None): # noqa + exception_handlers="default", default_worker_ttl=None, job_class=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -139,12 +139,13 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack - self.push_exc_handler(self.move_to_failed_queue) - if isinstance(exc_handler, list): - for h in exc_handler: + if exception_handlers == "default": + self.push_exc_handler(self.move_to_failed_queue) + elif isinstance(exception_handlers, list): + for h in exception_handlers: self.push_exc_handler(h) - elif exc_handler is not None: - self.push_exc_handler(exc_handler) + elif exception_handlers is not None: + self.push_exc_handler(exception_handlers) if job_class is not None: if isinstance(job_class, string_types): From f05d77b92b1a2134a274cda3bcc57b947089c99e Mon Sep 17 00:00:00 2001 From: Bradley Young Date: Mon, 26 Jan 2015 09:54:30 -0500 Subject: [PATCH 3/6] changing "default" for exception_handlers to None; same exact logic can be implemented, but cleaner --- rq/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 89c18c0..dfee230 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -122,7 +122,7 @@ class Worker(object): def __init__(self, queues, name=None, default_result_ttl=None, connection=None, - exception_handlers="default", default_worker_ttl=None, job_class=None): # noqa + exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -150,7 +150,7 @@ class Worker(object): # By default, push the "move-to-failed-queue" exception handler onto # the stack - if exception_handlers == "default": + if exception_handlers is None: self.push_exc_handler(self.move_to_failed_queue) elif isinstance(exception_handlers, list): for h in exception_handlers: From c428f955e419cf4e43380b2fe1bbfa0d6c667ec6 Mon Sep 17 00:00:00 2001 From: Bradley Young Date: Tue, 24 Feb 2015 10:31:22 -0500 Subject: [PATCH 4/6] per @selwin, adding a deprecated handler for exc_handler --- rq/worker.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index dfee230..19176a3 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -121,7 +121,7 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=None, connection=None, + default_result_ttl=None, connection=None, exc_handler=None exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa if connection is None: connection = get_current_connection() @@ -152,6 +152,12 @@ class Worker(object): # the stack if exception_handlers is None: self.push_exc_handler(self.move_to_failed_queue) + if exc_handler is not None: + self.push_exc_handler(exc_handler) + warnings.warn( + "use of exc_handler is deprecated, pass a list to exception_handlers instead.", + DeprecationWarning + ) elif isinstance(exception_handlers, list): for h in exception_handlers: self.push_exc_handler(h) From 04e403e1a360a21c3969310b033dbff72a260574 Mon Sep 17 00:00:00 2001 From: Bradley Young Date: Tue, 24 Feb 2015 10:51:08 -0500 Subject: [PATCH 5/6] dread comma, I adjure thee. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 707a1db..a2149c5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -120,7 +120,7 @@ class Worker(object): return worker def __init__(self, queues, name=None, - default_result_ttl=None, connection=None, exc_handler=None + default_result_ttl=None, connection=None, exc_handler=None, exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa if connection is None: connection = get_current_connection() From 8bfbeac67d96376fb468a1aa24cb1ae7b88ff3eb Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Fri, 3 Jul 2015 17:04:01 +0700 Subject: [PATCH 6/6] Running rqworker without specifiying --exception-handler shouldn't override default behavior. --- rq/cli/cli.py | 2 +- tests/test_cli.py | 27 +++++++++++---------------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 40f528f..da156d3 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -184,7 +184,7 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path, default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, job_class=job_class, - exception_handlers=exception_handlers) + exception_handlers=exception_handlers or None) # Should we configure Sentry? if sentry_dsn: diff --git a/tests/test_cli.py b/tests/test_cli.py index 6744a11..1a812ae 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -80,24 +80,19 @@ class TestRQCli(RQTestCase): q = Queue() failed_q = get_failed_queue() failed_q.empty() - # Preconditions - self.assertEquals(failed_q.count, 0) - self.assertEquals(q.count, 0) - - # Action - job = q.enqueue(div_by_zero) - self.assertEquals(q.count, 1) runner = CliRunner() - result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b', '--exception-handler', 'tests.fixtures.black_hole']) - self.assertEqual(result.exit_code, 0) - # Postconditions - self.assertEquals(q.count, 0) - self.assertEquals(failed_q.count, 0) - - # Check the job - job = Job.fetch(job.id) - self.assertEquals(job.is_failed, True) + + # If exception handler is not given, failed job goes to FailedQueue + q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) + self.assertEquals(failed_q.count, 1) + + # Black hole exception handler doesn't add failed jobs to FailedQueue + q.enqueue(div_by_zero) + runner.invoke(main, ['worker', '-u', self.redis_url, '-b', + '--exception-handler', 'tests.fixtures.black_hole']) + self.assertEquals(failed_q.count, 1) def test_suspend_and_resume(self): """rq suspend -u