|
|
@ -100,8 +100,8 @@ class Worker(object):
|
|
|
|
return worker
|
|
|
|
return worker
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, queues, name=None,
|
|
|
|
def __init__(self, queues, name=None,
|
|
|
|
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
|
|
|
|
default_result_ttl=None, connection=None,
|
|
|
|
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
|
|
|
|
exc_handler=None, default_worker_ttl=None): # noqa
|
|
|
|
if connection is None:
|
|
|
|
if connection is None:
|
|
|
|
connection = get_current_connection()
|
|
|
|
connection = get_current_connection()
|
|
|
|
self.connection = connection
|
|
|
|
self.connection = connection
|
|
|
@ -111,8 +111,15 @@ class Worker(object):
|
|
|
|
self.queues = queues
|
|
|
|
self.queues = queues
|
|
|
|
self.validate_queues()
|
|
|
|
self.validate_queues()
|
|
|
|
self._exc_handlers = []
|
|
|
|
self._exc_handlers = []
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if default_result_ttl is None:
|
|
|
|
|
|
|
|
default_result_ttl = DEFAULT_RESULT_TTL
|
|
|
|
self.default_result_ttl = default_result_ttl
|
|
|
|
self.default_result_ttl = default_result_ttl
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if default_worker_ttl is None:
|
|
|
|
|
|
|
|
default_worker_ttl = DEFAULT_WORKER_TTL
|
|
|
|
self.default_worker_ttl = default_worker_ttl
|
|
|
|
self.default_worker_ttl = default_worker_ttl
|
|
|
|
|
|
|
|
|
|
|
|
self._state = 'starting'
|
|
|
|
self._state = 'starting'
|
|
|
|
self._is_horse = False
|
|
|
|
self._is_horse = False
|
|
|
|
self._horse_pid = 0
|
|
|
|
self._horse_pid = 0
|
|
|
@ -334,7 +341,7 @@ class Worker(object):
|
|
|
|
if self.stopped:
|
|
|
|
if self.stopped:
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
break
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
@ -359,21 +366,21 @@ class Worker(object):
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
result = None
|
|
|
|
result = None
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
|
|
|
|
|
|
|
|
self.set_state('idle')
|
|
|
|
self.set_state('idle')
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('*** Listening on %s...' %
|
|
|
|
self.log.info('*** Listening on %s...' %
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
self.heartbeat()
|
|
|
|
self.heartbeat()
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result = Queue.dequeue_any(self.queues, timeout,
|
|
|
|
result = Queue.dequeue_any(self.queues, timeout,
|
|
|
|
connection=self.connection)
|
|
|
|
connection=self.connection)
|
|
|
|
if result is not None:
|
|
|
|
if result is not None:
|
|
|
|
job, queue = result
|
|
|
|
job, queue = result
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
blue(job.description), job.id))
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
@ -455,9 +462,9 @@ class Worker(object):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
self.set_state('busy')
|
|
|
|
self.set_state('busy')
|
|
|
|
self.set_current_job_id(job.id)
|
|
|
|
self.set_current_job_id(job.id)
|
|
|
|
self.heartbeat((job.timeout or 180) + 60)
|
|
|
|
self.heartbeat((job.timeout or 180) + 60)
|
|
|
|
|
|
|
|
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
job.func_name,
|
|
|
|
job.func_name,
|
|
|
|
job.origin, time.time()))
|
|
|
|
job.origin, time.time()))
|
|
|
@ -470,14 +477,14 @@ class Worker(object):
|
|
|
|
# Pickle the result in the same try-except block since we need to
|
|
|
|
# Pickle the result in the same try-except block since we need to
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
# use the same exc handling when pickling fails
|
|
|
|
job._result = rv
|
|
|
|
job._result = rv
|
|
|
|
|
|
|
|
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
self.set_current_job_id(None, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
result_ttl = job.get_ttl(self.default_result_ttl)
|
|
|
|
result_ttl = job.get_ttl(self.default_result_ttl)
|
|
|
|
if result_ttl != 0:
|
|
|
|
if result_ttl != 0:
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
job.save(pipeline=pipeline)
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline)
|
|
|
|
job.cleanup(result_ttl, pipeline=pipeline)
|
|
|
|
|
|
|
|
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
|
|
except Exception:
|
|
|
|
except Exception:
|
|
|
|