|
|
@ -1,3 +1,4 @@
|
|
|
|
|
|
|
|
import sys
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
import errno
|
|
|
|
import errno
|
|
|
|
import random
|
|
|
|
import random
|
|
|
@ -22,7 +23,6 @@ from .version import VERSION
|
|
|
|
|
|
|
|
|
|
|
|
green = make_colorizer('darkgreen')
|
|
|
|
green = make_colorizer('darkgreen')
|
|
|
|
yellow = make_colorizer('darkyellow')
|
|
|
|
yellow = make_colorizer('darkyellow')
|
|
|
|
red = make_colorizer('darkred')
|
|
|
|
|
|
|
|
blue = make_colorizer('darkblue')
|
|
|
|
blue = make_colorizer('darkblue')
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
@ -95,7 +95,7 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, queues, name=None, default_result_ttl=500,
|
|
|
|
def __init__(self, queues, name=None, default_result_ttl=500,
|
|
|
|
connection=None): # noqa
|
|
|
|
connection=None, exc_handler=None): # noqa
|
|
|
|
if connection is None:
|
|
|
|
if connection is None:
|
|
|
|
connection = get_current_connection()
|
|
|
|
connection = get_current_connection()
|
|
|
|
self.connection = connection
|
|
|
|
self.connection = connection
|
|
|
@ -104,6 +104,7 @@ class Worker(object):
|
|
|
|
self._name = name
|
|
|
|
self._name = name
|
|
|
|
self.queues = queues
|
|
|
|
self.queues = queues
|
|
|
|
self.validate_queues()
|
|
|
|
self.validate_queues()
|
|
|
|
|
|
|
|
self._exc_handlers = []
|
|
|
|
self.default_result_ttl = default_result_ttl
|
|
|
|
self.default_result_ttl = default_result_ttl
|
|
|
|
self._state = 'starting'
|
|
|
|
self._state = 'starting'
|
|
|
|
self._is_horse = False
|
|
|
|
self._is_horse = False
|
|
|
@ -112,6 +113,12 @@ class Worker(object):
|
|
|
|
self.log = logger
|
|
|
|
self.log = logger
|
|
|
|
self.failed_queue = get_failed_queue(connection=self.connection)
|
|
|
|
self.failed_queue = get_failed_queue(connection=self.connection)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# By default, push the "move-to-failed-queue" exception handler onto
|
|
|
|
|
|
|
|
# the stack
|
|
|
|
|
|
|
|
self.push_exc_handler(self.move_to_failed_queue)
|
|
|
|
|
|
|
|
if exc_handler is not None:
|
|
|
|
|
|
|
|
self.push_exc_handler(exc_handler)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_queues(self): # noqa
|
|
|
|
def validate_queues(self): # noqa
|
|
|
|
"""Sanity check for the given queues."""
|
|
|
|
"""Sanity check for the given queues."""
|
|
|
@ -387,13 +394,10 @@ class Worker(object):
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
pickled_rv = dumps(rv)
|
|
|
|
pickled_rv = dumps(rv)
|
|
|
|
job._status = Status.FINISHED
|
|
|
|
job._status = Status.FINISHED
|
|
|
|
except Exception as e:
|
|
|
|
except:
|
|
|
|
fq = self.failed_queue
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
self.log.exception(red(str(e)))
|
|
|
|
job.status = Status.FAILED
|
|
|
|
self.log.warning('Moving job to %s queue.' % fq.name)
|
|
|
|
self.handle_exception(job, *sys.exc_info())
|
|
|
|
job._status = Status.FAILED
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fq.quarantine(job, exc_info=traceback.format_exc())
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
if rv is None:
|
|
|
|
if rv is None:
|
|
|
@ -423,3 +427,37 @@ class Worker(object):
|
|
|
|
p.execute()
|
|
|
|
p.execute()
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_exception(self, job, *exc_info):
|
|
|
|
|
|
|
|
"""Walks the exception handler stack to delegate exception handling."""
|
|
|
|
|
|
|
|
exc_string = ''.join(
|
|
|
|
|
|
|
|
traceback.format_exception_only(*exc_info[:2]) +
|
|
|
|
|
|
|
|
traceback.format_exception(*exc_info))
|
|
|
|
|
|
|
|
self.log.error(exc_string)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|
|
|
|
|
self.log.debug('Invoking exception handler %s' % (handler,))
|
|
|
|
|
|
|
|
fallthrough = handler(job, *exc_info)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Only handlers with explicit return values should disable further
|
|
|
|
|
|
|
|
# exc handling, so interpret a None return value as True.
|
|
|
|
|
|
|
|
if fallthrough is None:
|
|
|
|
|
|
|
|
fallthrough = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not fallthrough:
|
|
|
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
|
|
|
|
exc_string = ''.join(traceback.format_exception(*exc_info))
|
|
|
|
|
|
|
|
self.log.warning('Moving job to %s queue.' % self.failed_queue.name)
|
|
|
|
|
|
|
|
self.failed_queue.quarantine(job, exc_info=exc_string)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def push_exc_handler(self, handler_func):
|
|
|
|
|
|
|
|
"""Pushes an exception handler onto the exc handler stack."""
|
|
|
|
|
|
|
|
self._exc_handlers.append(handler_func)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pop_exc_handler(self):
|
|
|
|
|
|
|
|
"""Pops the latest exception handler off of the exc handler stack."""
|
|
|
|
|
|
|
|
return self._exc_handlers.pop()
|
|
|
|