|
|
@ -20,6 +20,8 @@ from .utils import make_colorizer
|
|
|
|
from .exceptions import NoQueueError, UnpickleError
|
|
|
|
from .exceptions import NoQueueError, UnpickleError
|
|
|
|
|
|
|
|
|
|
|
|
green = make_colorizer('darkgreen')
|
|
|
|
green = make_colorizer('darkgreen')
|
|
|
|
|
|
|
|
yellow = make_colorizer('darkyellow')
|
|
|
|
|
|
|
|
blue = make_colorizer('darkblue')
|
|
|
|
|
|
|
|
|
|
|
|
def iterable(x):
|
|
|
|
def iterable(x):
|
|
|
|
return hasattr(x, '__iter__')
|
|
|
|
return hasattr(x, '__iter__')
|
|
|
@ -254,14 +256,13 @@ class Worker(object):
|
|
|
|
self.state = 'idle'
|
|
|
|
self.state = 'idle'
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.log.info('*** Listening for work on %s...' % (', '.join(qnames)))
|
|
|
|
self.log.info('')
|
|
|
|
|
|
|
|
self.log.info('*** Listening on %s...' % (green(', '.join(qnames))))
|
|
|
|
wait_for_job = not burst
|
|
|
|
wait_for_job = not burst
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result = Queue.dequeue_any(self.queues, wait_for_job)
|
|
|
|
result = Queue.dequeue_any(self.queues, wait_for_job)
|
|
|
|
if result is None:
|
|
|
|
if result is None:
|
|
|
|
break
|
|
|
|
break
|
|
|
|
job, queue = result
|
|
|
|
|
|
|
|
self.log.info('%s: %s' % (green(queue.name), job))
|
|
|
|
|
|
|
|
except UnpickleError as e:
|
|
|
|
except UnpickleError as e:
|
|
|
|
self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,))
|
|
|
|
self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,))
|
|
|
|
self.log.debug('Data follows:')
|
|
|
|
self.log.debug('Data follows:')
|
|
|
@ -270,6 +271,9 @@ class Worker(object):
|
|
|
|
self.failure_queue.push_job_id(e.job_id)
|
|
|
|
self.failure_queue.push_job_id(e.job_id)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
|
self.state = 'busy'
|
|
|
|
self.state = 'busy'
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
|
|
|
|
|
|
|
@ -309,10 +313,6 @@ class Worker(object):
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
job.func.__name__,
|
|
|
|
job.func.__name__,
|
|
|
|
job.origin, time.time()))
|
|
|
|
job.origin, time.time()))
|
|
|
|
msg = 'Got job %s from %s' % (
|
|
|
|
|
|
|
|
job.description,
|
|
|
|
|
|
|
|
job.origin)
|
|
|
|
|
|
|
|
self.log.info(msg)
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
rv = job.perform()
|
|
|
|
rv = job.perform()
|
|
|
|
except Exception as e:
|
|
|
|
except Exception as e:
|
|
|
@ -333,7 +333,10 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
return False
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.log.info('Job OK, result = %s' % (rv,))
|
|
|
|
if rv is None:
|
|
|
|
|
|
|
|
self.log.info('Job OK')
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
self.log.info('Job OK, result = %s' % (yellow(rv),))
|
|
|
|
if rv is not None:
|
|
|
|
if rv is not None:
|
|
|
|
p = conn.pipeline()
|
|
|
|
p = conn.pipeline()
|
|
|
|
p.set(job.result, dumps(rv))
|
|
|
|
p.set(job.result, dumps(rv))
|
|
|
|