|
|
@ -43,9 +43,9 @@ def iterable(x):
|
|
|
|
def compact(l):
|
|
|
|
def compact(l):
|
|
|
|
return [x for x in l if x is not None]
|
|
|
|
return [x for x in l if x is not None]
|
|
|
|
|
|
|
|
|
|
|
|
_signames = dict((getattr(signal, signame), signame) \
|
|
|
|
_signames = dict((getattr(signal, signame), signame)
|
|
|
|
for signame in dir(signal) \
|
|
|
|
for signame in dir(signal)
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def signal_name(signum):
|
|
|
|
def signal_name(signum):
|
|
|
@ -68,8 +68,8 @@ class Worker(object):
|
|
|
|
if connection is None:
|
|
|
|
if connection is None:
|
|
|
|
connection = get_current_connection()
|
|
|
|
connection = get_current_connection()
|
|
|
|
reported_working = connection.smembers(cls.redis_workers_keys)
|
|
|
|
reported_working = connection.smembers(cls.redis_workers_keys)
|
|
|
|
workers = [cls.find_by_key(as_text(key), connection) for key in
|
|
|
|
workers = [cls.find_by_key(as_text(key), connection)
|
|
|
|
reported_working]
|
|
|
|
for key in reported_working]
|
|
|
|
return compact(workers)
|
|
|
|
return compact(workers)
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@classmethod
|
|
|
@ -95,13 +95,12 @@ class Worker(object):
|
|
|
|
worker._state = connection.hget(worker.key, 'state') or '?'
|
|
|
|
worker._state = connection.hget(worker.key, 'state') or '?'
|
|
|
|
if queues:
|
|
|
|
if queues:
|
|
|
|
worker.queues = [Queue(queue, connection=connection)
|
|
|
|
worker.queues = [Queue(queue, connection=connection)
|
|
|
|
for queue in queues.split(',')]
|
|
|
|
for queue in queues.split(',')]
|
|
|
|
return worker
|
|
|
|
return worker
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, queues, name=None,
|
|
|
|
def __init__(self, queues, name=None,
|
|
|
|
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
|
|
|
|
default_result_ttl=DEFAULT_RESULT_TTL, connection=None,
|
|
|
|
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
|
|
|
|
exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa
|
|
|
|
if connection is None:
|
|
|
|
if connection is None:
|
|
|
|
connection = get_current_connection()
|
|
|
|
connection = get_current_connection()
|
|
|
|
self.connection = connection
|
|
|
|
self.connection = connection
|
|
|
@ -193,9 +192,8 @@ class Worker(object):
|
|
|
|
self.log.debug('Registering birth of worker %s' % (self.name,))
|
|
|
|
self.log.debug('Registering birth of worker %s' % (self.name,))
|
|
|
|
if self.connection.exists(self.key) and \
|
|
|
|
if self.connection.exists(self.key) and \
|
|
|
|
not self.connection.hexists(self.key, 'death'):
|
|
|
|
not self.connection.hexists(self.key, 'death'):
|
|
|
|
raise ValueError(
|
|
|
|
raise ValueError('There exists an active worker named \'%s\' '
|
|
|
|
'There exists an active worker named \'%s\' '
|
|
|
|
'already.' % (self.name,))
|
|
|
|
'already.' % (self.name,))
|
|
|
|
|
|
|
|
key = self.key
|
|
|
|
key = self.key
|
|
|
|
now = time.time()
|
|
|
|
now = time.time()
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
@ -304,8 +302,8 @@ class Worker(object):
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('*** Listening on %s...' % \
|
|
|
|
self.log.info('*** Listening on %s...' %
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
@ -324,7 +322,7 @@ class Worker(object):
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
# Use the public setter here, to immediately update Redis
|
|
|
|
job.status = Status.STARTED
|
|
|
|
job.status = Status.STARTED
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
blue(job.description), job.id))
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
|
self.connection.expire(self.key, (job.timeout or 180) + 60)
|
|
|
|
self.connection.expire(self.key, (job.timeout or 180) + 60)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
@ -336,19 +334,17 @@ class Worker(object):
|
|
|
|
self.register_death()
|
|
|
|
self.register_death()
|
|
|
|
return did_perform_work
|
|
|
|
return did_perform_work
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return Queue.dequeue_any(self.queues, timeout,
|
|
|
|
return Queue.dequeue_any(self.queues, timeout,
|
|
|
|
connection=self.connection)
|
|
|
|
connection=self.connection)
|
|
|
|
except DequeueTimeout:
|
|
|
|
except DequeueTimeout:
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
self.log.debug('Sending heartbeat to prevent worker timeout.')
|
|
|
|
self.log.debug('Sending heartbeat to prevent worker timeout.')
|
|
|
|
self.connection.expire(self.key, self.default_worker_ttl)
|
|
|
|
self.connection.expire(self.key, self.default_worker_ttl)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fork_and_perform_job(self, job):
|
|
|
|
def fork_and_perform_job(self, job):
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|
The worker will wait for the work horse and make sure it executes
|
|
|
|
The worker will wait for the work horse and make sure it executes
|
|
|
@ -443,12 +439,10 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def handle_exception(self, job, *exc_info):
|
|
|
|
def handle_exception(self, job, *exc_info):
|
|
|
|
"""Walks the exception handler stack to delegate exception handling."""
|
|
|
|
"""Walks the exception handler stack to delegate exception handling."""
|
|
|
|
exc_string = ''.join(
|
|
|
|
exc_string = ''.join(traceback.format_exception_only(*exc_info[:2]) +
|
|
|
|
traceback.format_exception_only(*exc_info[:2]) +
|
|
|
|
traceback.format_exception(*exc_info))
|
|
|
|
traceback.format_exception(*exc_info))
|
|
|
|
|
|
|
|
self.log.error(exc_string)
|
|
|
|
self.log.error(exc_string)
|
|
|
|
|
|
|
|
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|