|
|
@ -85,6 +85,13 @@ WorkerStatus = enum(
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def move_to_failed_queue(job, *exc_info):
|
|
|
|
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
|
|
|
|
exc_string = Worker._get_safe_exception_string(traceback.format_exception(*exc_info))
|
|
|
|
|
|
|
|
failed_queue = get_failed_queue(get_current_connection(), job.__class__)
|
|
|
|
|
|
|
|
failed_queue.quarantine(job, exc_info=exc_string)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Worker(object):
|
|
|
|
class Worker(object):
|
|
|
|
redis_worker_namespace_prefix = 'rq:worker:'
|
|
|
|
redis_worker_namespace_prefix = 'rq:worker:'
|
|
|
|
redis_workers_keys = 'rq:workers'
|
|
|
|
redis_workers_keys = 'rq:workers'
|
|
|
@ -744,7 +751,7 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
def handle_exception(self, job, *exc_info):
|
|
|
|
def handle_exception(self, job, *exc_info):
|
|
|
|
"""Walks the exception handler stack to delegate exception handling."""
|
|
|
|
"""Walks the exception handler stack to delegate exception handling."""
|
|
|
|
exc_string = self._get_safe_exception_string(
|
|
|
|
exc_string = Worker._get_safe_exception_string(
|
|
|
|
traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info)
|
|
|
|
traceback.format_exception_only(*exc_info[:2]) + traceback.format_exception(*exc_info)
|
|
|
|
)
|
|
|
|
)
|
|
|
|
self.log.error(exc_string, exc_info=True, extra={
|
|
|
|
self.log.error(exc_string, exc_info=True, extra={
|
|
|
@ -768,11 +775,11 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
exc_string = self._get_safe_exception_string(traceback.format_exception(*exc_info))
|
|
|
|
|
|
|
|
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
|
|
|
|
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
|
|
|
|
self.failed_queue.quarantine(job, exc_info=exc_string)
|
|
|
|
move_to_failed_queue(job, *exc_info)
|
|
|
|
|
|
|
|
|
|
|
|
def _get_safe_exception_string(self, exc_strings):
|
|
|
|
@staticmethod
|
|
|
|
|
|
|
|
def _get_safe_exception_string(exc_strings):
|
|
|
|
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
|
|
|
|
"""Ensure list of exception strings is decoded on Python 2 and joined as one string safely."""
|
|
|
|
if sys.version_info[0] < 3:
|
|
|
|
if sys.version_info[0] < 3:
|
|
|
|
exc_strings = [exc.decode("utf-8") for exc in exc_strings]
|
|
|
|
exc_strings = [exc.decode("utf-8") for exc in exc_strings]
|
|
|
|