Merge pull request #557 from selwin/exception_handling

Exception handling
main
Selwin Ong 10 years ago
commit cdcea84105

@ -142,10 +142,11 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
@click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN') @click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl, def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl,
verbose, quiet, sentry_dsn, pid, queues): verbose, quiet, sentry_dsn, exception_handler, pid, queues):
"""Starts an RQ worker.""" """Starts an RQ worker."""
if path: if path:
@ -166,6 +167,9 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
cleanup_ghosts(conn) cleanup_ghosts(conn)
worker_class = import_attribute(worker_class) worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class) queue_class = import_attribute(queue_class)
exception_handlers = []
for h in exception_handler:
exception_handlers.append(import_attribute(h))
if is_suspended(conn): if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
@ -179,7 +183,8 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
connection=conn, connection=conn,
default_worker_ttl=worker_ttl, default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl, default_result_ttl=results_ttl,
job_class=job_class) job_class=job_class,
exception_handlers=exception_handlers or None)
# Should we configure Sentry? # Should we configure Sentry?
if sentry_dsn: if sentry_dsn:

@ -122,8 +122,8 @@ class Worker(object):
return worker return worker
def __init__(self, queues, name=None, def __init__(self, queues, name=None,
default_result_ttl=None, connection=None, default_result_ttl=None, connection=None, exc_handler=None,
exc_handler=None, default_worker_ttl=None, job_class=None): # noqa exception_handlers=None, default_worker_ttl=None, job_class=None): # noqa
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
@ -153,9 +153,19 @@ class Worker(object):
# By default, push the "move-to-failed-queue" exception handler onto # By default, push the "move-to-failed-queue" exception handler onto
# the stack # the stack
if exception_handlers is None:
self.push_exc_handler(self.move_to_failed_queue) self.push_exc_handler(self.move_to_failed_queue)
if exc_handler is not None: if exc_handler is not None:
self.push_exc_handler(exc_handler) self.push_exc_handler(exc_handler)
warnings.warn(
"use of exc_handler is deprecated, pass a list to exception_handlers instead.",
DeprecationWarning
)
elif isinstance(exception_handlers, list):
for h in exception_handlers:
self.push_exc_handler(h)
elif exception_handlers is not None:
self.push_exc_handler(exception_handlers)
if job_class is not None: if job_class is not None:
if isinstance(job_class, string_types): if isinstance(job_class, string_types):

@ -13,6 +13,7 @@ from rq import Connection, get_current_job
from rq.decorators import job from rq.decorators import job
from rq.compat import PY2 from rq.compat import PY2
def say_pid(): def say_pid():
return os.getpid() return os.getpid()
@ -92,6 +93,11 @@ with Connection():
return x + y return x + y
def black_hole(job, *exc_info):
# Don't fall through to default behaviour (moving to failed queue)
return False
def long_running_job(timeout=10): def long_running_job(timeout=10):
time.sleep(timeout) time.sleep(timeout)
return 'Done sleeping...' return 'Done sleeping...'

@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from click.testing import CliRunner from click.testing import CliRunner
from rq import get_failed_queue from rq import get_failed_queue, Queue
from rq.compat import is_python_version from rq.compat import is_python_version
from rq.job import Job from rq.job import Job
from rq.cli import main from rq.cli import main
@ -75,6 +75,25 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b']) result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assert_normal_execution(result) self.assert_normal_execution(result)
def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>"""
q = Queue()
failed_q = get_failed_queue()
failed_q.empty()
runner = CliRunner()
# If exception handler is not given, failed job goes to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEquals(failed_q.count, 1)
# Black hole exception handler doesn't add failed jobs to FailedQueue
q.enqueue(div_by_zero)
runner.invoke(main, ['worker', '-u', self.redis_url, '-b',
'--exception-handler', 'tests.fixtures.black_hole'])
self.assertEquals(failed_q.count, 1)
def test_suspend_and_resume(self): def test_suspend_and_resume(self):
"""rq suspend -u <url> """rq suspend -u <url>
rq resume -u <url> rq resume -u <url>
@ -101,3 +120,4 @@ class TestRQCli(RQTestCase):
self.assertEqual(result.exit_code, 1) self.assertEqual(result.exit_code, 1)
self.assertIn("Duration must be an integer greater than 1", result.output) self.assertIn("Duration must be an integer greater than 1", result.output)

@ -162,7 +162,7 @@ class TestWorker(RQTestCase):
job = q.enqueue(div_by_zero) job = q.enqueue(div_by_zero)
self.assertEquals(q.count, 1) self.assertEquals(q.count, 1)
w = Worker([q], exc_handler=black_hole) w = Worker([q], exception_handlers=black_hole)
w.work(burst=True) # should silently pass w.work(burst=True) # should silently pass
# Postconditions # Postconditions

Loading…
Cancel
Save