From b49e039d63b3af95e1544a80d3dd79f59692ccdc Mon Sep 17 00:00:00 2001 From: Yaniv Greenberg Date: Mon, 8 May 2017 18:42:20 +0300 Subject: [PATCH 1/3] Separate move_to_failed_queue from the worker to allow it's use in cli exception handler. --- rq/worker.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index c364b18..c33e5fa 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -85,6 +85,13 @@ WorkerStatus = enum( ) +def move_to_failed_queue(job, *exc_info): + """Default exception handler: move the job to the failed queue.""" + exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info)) + failed_queue = get_failed_queue(get_current_connection(), job.__class__) + failed_queue.quarantine(job, exc_info=exc_string) + + class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' @@ -744,7 +751,7 @@ class Worker(object): def handle_exception(self, job, *exc_info): """Walks the exception handler stack to delegate exception handling.""" - exc_string = self._get_safe_exception_string( + exc_string = Worker._get_safe_exception_string( traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info) ) self.log.error(exc_string, exc_info=True, extra={ @@ -768,11 +775,11 @@ class Worker(object): def move_to_failed_queue(self, job, *exc_info): """Default exception handler: move the job to the failed queue.""" - exc_string = self._get_safe_exception_string(traceback.format_exception(*exc_info)) self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) - self.failed_queue.quarantine(job, exc_info=exc_string) + move_to_failed_queue(job, *exc_info) - def _get_safe_exception_string(self, exc_strings): + @staticmethod + def _get_safe_exception_string(exc_strings): """Ensure list of exception strings is decoded on Python 2 and joined as one string safely.""" if sys.version_info[0] < 3: exc_strings = [exc.decode("utf-8") for exc in exc_strings] From 2da4ccd48d730b2aac40850d6f2add891ebaffab Mon Sep 17 00:00:00 2001 From: Yaniv Greenberg Date: Sun, 14 May 2017 11:12:35 +0300 Subject: [PATCH 2/3] Moved move_to_failed_queue to separate file. --- rq/handlers.py | 12 ++++++++++++ rq/worker.py | 12 ++---------- 2 files changed, 14 insertions(+), 10 deletions(-) create mode 100644 rq/handlers.py diff --git a/rq/handlers.py b/rq/handlers.py new file mode 100644 index 0000000..33ac03d --- /dev/null +++ b/rq/handlers.py @@ -0,0 +1,12 @@ +import traceback + +from .connections import get_current_connection +from .queue import get_failed_queue +from .worker import Worker + + +def move_to_failed_queue(job, *exc_info): + """Default exception handler: move the job to the failed queue.""" + exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info)) + failed_queue = get_failed_queue(get_current_connection(), job.__class__) + failed_queue.quarantine(job, exc_info=exc_string) diff --git a/rq/worker.py b/rq/worker.py index c33e5fa..1c3e96a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,15 +16,14 @@ from datetime import timedelta from redis import WatchError -from rq.compat import as_text, string_types, text_type - -from .compat import PY2 +from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue, get_failed_queue +from .handlers import move_to_failed_queue from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty @@ -85,13 +84,6 @@ WorkerStatus = enum( ) -def move_to_failed_queue(job, *exc_info): - """Default exception handler: move the job to the failed queue.""" - exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info)) - failed_queue = get_failed_queue(get_current_connection(), job.__class__) - failed_queue.quarantine(job, exc_info=exc_string) - - class Worker(object): redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = 'rq:workers' From 7dc5d4a936a8b40791752e61508cff87eb0634f6 Mon Sep 17 00:00:00 2001 From: Yaniv Greenberg Date: Sun, 14 May 2017 11:28:59 +0300 Subject: [PATCH 3/3] local commit to avoid circular imports. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 1c3e96a..81f13e4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -23,7 +23,6 @@ from .exceptions import DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue, get_failed_queue -from .handlers import move_to_failed_queue from .registry import FinishedJobRegistry, StartedJobRegistry, clean_registries from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty @@ -768,6 +767,7 @@ class Worker(object): def move_to_failed_queue(self, job, *exc_info): """Default exception handler: move the job to the failed queue.""" self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) + from .handlers import move_to_failed_queue move_to_failed_queue(job, *exc_info) @staticmethod