Merge pull request #833 from yaniv-g/extract_move_to_failed_queue

Expose move_to_failed_queue from Worker
main
Selwin Ong 8 years ago committed by GitHub
commit 8ebebc9f7c

@ -0,0 +1,12 @@
import traceback
from .connections import get_current_connection
from .queue import get_failed_queue
from .worker import Worker
def move_to_failed_queue(job, *exc_info):
"""Default exception handler: move the job to the failed queue."""
exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info))
failed_queue = get_failed_queue(get_current_connection(), job.__class__)
failed_queue.quarantine(job, exc_info=exc_string)

@ -16,9 +16,7 @@ from datetime import timedelta
from redis import WatchError from redis import WatchError
from rq.compat import as_text, string_types, text_type from .compat import PY2, as_text, string_types, text_type
from .compat import PY2
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL
from .exceptions import DequeueTimeout, ShutDownImminentException from .exceptions import DequeueTimeout, ShutDownImminentException
@ -744,7 +742,7 @@ class Worker(object):
def handle_exception(self, job, *exc_info): def handle_exception(self, job, *exc_info):
"""Walks the exception handler stack to delegate exception handling.""" """Walks the exception handler stack to delegate exception handling."""
exc_string = self._get_safe_exception_string( exc_string = Worker._get_safe_exception_string(
traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info) traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info)
) )
self.log.error(exc_string, exc_info=True, extra={ self.log.error(exc_string, exc_info=True, extra={
@ -768,11 +766,12 @@ class Worker(object):
def move_to_failed_queue(self, job, *exc_info): def move_to_failed_queue(self, job, *exc_info):
"""Default exception handler: move the job to the failed queue.""" """Default exception handler: move the job to the failed queue."""
exc_string = self._get_safe_exception_string(traceback.format_exception(*exc_info))
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
self.failed_queue.quarantine(job, exc_info=exc_string) from .handlers import move_to_failed_queue
move_to_failed_queue(job, *exc_info)
def _get_safe_exception_string(self, exc_strings): @staticmethod
def _get_safe_exception_string(exc_strings):
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely.""" """Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
exc_strings = [exc.decode("utf-8") for exc in exc_strings] exc_strings = [exc.decode("utf-8") for exc in exc_strings]

Loading…
Cancel
Save