|
|
@ -11,7 +11,7 @@ import traceback
|
|
|
|
from cPickle import dumps
|
|
|
|
from cPickle import dumps
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
from logbook import Logger
|
|
|
|
from logbook import Logger
|
|
|
|
Logger = Logger # Does nothing except it shuts up pyflakes annoying error
|
|
|
|
Logger = Logger # Does nothing except it shuts up pyflakes annoying error
|
|
|
|
except ImportError:
|
|
|
|
except ImportError:
|
|
|
|
from logging import Logger
|
|
|
|
from logging import Logger
|
|
|
|
from .queue import Queue
|
|
|
|
from .queue import Queue
|
|
|
@ -23,9 +23,11 @@ green = make_colorizer('darkgreen')
|
|
|
|
yellow = make_colorizer('darkyellow')
|
|
|
|
yellow = make_colorizer('darkyellow')
|
|
|
|
blue = make_colorizer('darkblue')
|
|
|
|
blue = make_colorizer('darkblue')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def iterable(x):
|
|
|
|
def iterable(x):
|
|
|
|
return hasattr(x, '__iter__')
|
|
|
|
return hasattr(x, '__iter__')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def compact(l):
|
|
|
|
def compact(l):
|
|
|
|
return [x for x in l if x is not None]
|
|
|
|
return [x for x in l if x is not None]
|
|
|
|
|
|
|
|
|
|
|
@ -33,6 +35,7 @@ _signames = dict((getattr(signal, signame), signame) \
|
|
|
|
for signame in dir(signal) \
|
|
|
|
for signame in dir(signal) \
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
if signame.startswith('SIG') and '_' not in signame)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def signal_name(signum):
|
|
|
|
def signal_name(signum):
|
|
|
|
# Hackety-hack-hack: is there really no better way to reverse lookup the
|
|
|
|
# Hackety-hack-hack: is there really no better way to reverse lookup the
|
|
|
|
# signal name? If you read this and know a way: please provide a patch :)
|
|
|
|
# signal name? If you read this and know a way: please provide a patch :)
|
|
|
@ -55,9 +58,9 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
@classmethod
|
|
|
|
def find_by_key(cls, worker_key):
|
|
|
|
def find_by_key(cls, worker_key):
|
|
|
|
"""Returns a Worker instance, based on the naming conventions for naming
|
|
|
|
"""Returns a Worker instance, based on the naming conventions for
|
|
|
|
the internal Redis keys. Can be used to reverse-lookup Workers by their
|
|
|
|
naming the internal Redis keys. Can be used to reverse-lookup Workers
|
|
|
|
Redis keys.
|
|
|
|
by their Redis keys.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
prefix = cls.redis_worker_namespace_prefix
|
|
|
|
prefix = cls.redis_worker_namespace_prefix
|
|
|
|
name = worker_key[len(prefix):]
|
|
|
|
name = worker_key[len(prefix):]
|
|
|
@ -76,7 +79,7 @@ class Worker(object):
|
|
|
|
return worker
|
|
|
|
return worker
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, queues, name=None, rv_ttl=500):
|
|
|
|
def __init__(self, queues, name=None, rv_ttl=500): # noqa
|
|
|
|
if isinstance(queues, Queue):
|
|
|
|
if isinstance(queues, Queue):
|
|
|
|
queues = [queues]
|
|
|
|
queues = [queues]
|
|
|
|
self._name = name
|
|
|
|
self._name = name
|
|
|
@ -91,7 +94,7 @@ class Worker(object):
|
|
|
|
self.failed_queue = Queue('failed')
|
|
|
|
self.failed_queue = Queue('failed')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def validate_queues(self):
|
|
|
|
def validate_queues(self): # noqa
|
|
|
|
"""Sanity check for the given queues."""
|
|
|
|
"""Sanity check for the given queues."""
|
|
|
|
if not iterable(self.queues):
|
|
|
|
if not iterable(self.queues):
|
|
|
|
raise ValueError('Argument queues not iterable.')
|
|
|
|
raise ValueError('Argument queues not iterable.')
|
|
|
@ -108,7 +111,7 @@ class Worker(object):
|
|
|
|
return map(lambda q: q.key, self.queues)
|
|
|
|
return map(lambda q: q.key, self.queues)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property # noqa
|
|
|
|
def name(self):
|
|
|
|
def name(self):
|
|
|
|
"""Returns the name of the worker, under which it is registered to the
|
|
|
|
"""Returns the name of the worker, under which it is registered to the
|
|
|
|
monitoring system.
|
|
|
|
monitoring system.
|
|
|
@ -152,11 +155,13 @@ class Worker(object):
|
|
|
|
procname.setprocname('rq: %s' % (message,))
|
|
|
|
procname.setprocname('rq: %s' % (message,))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def register_birth(self):
|
|
|
|
def register_birth(self): # noqa
|
|
|
|
"""Registers its own birth."""
|
|
|
|
"""Registers its own birth."""
|
|
|
|
self.log.debug('Registering birth of worker %s' % (self.name,))
|
|
|
|
self.log.debug('Registering birth of worker %s' % (self.name,))
|
|
|
|
if conn.exists(self.key) and not conn.hexists(self.key, 'death'):
|
|
|
|
if conn.exists(self.key) and not conn.hexists(self.key, 'death'):
|
|
|
|
raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,))
|
|
|
|
raise ValueError(
|
|
|
|
|
|
|
|
'There exists an active worker named \'%s\' '
|
|
|
|
|
|
|
|
'already.' % (self.name,))
|
|
|
|
key = self.key
|
|
|
|
key = self.key
|
|
|
|
now = time.time()
|
|
|
|
now = time.time()
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
@ -203,7 +208,8 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
# Take down the horse with the worker
|
|
|
|
# Take down the horse with the worker
|
|
|
|
if self.horse_pid:
|
|
|
|
if self.horse_pid:
|
|
|
|
self.log.debug('Taking down horse %d with me.' % self.horse_pid)
|
|
|
|
msg = 'Taking down horse %d with me.' % self.horse_pid
|
|
|
|
|
|
|
|
self.log.debug(msg)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
os.kill(self.horse_pid, signal.SIGKILL)
|
|
|
|
os.kill(self.horse_pid, signal.SIGKILL)
|
|
|
|
except OSError as e:
|
|
|
|
except OSError as e:
|
|
|
@ -226,7 +232,8 @@ class Worker(object):
|
|
|
|
self.log.debug('Ignoring signal %s.' % signal_name(signum))
|
|
|
|
self.log.debug('Ignoring signal %s.' % signal_name(signum))
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
|
self.log.warning('Warm shut down. Press Ctrl+C again for a cold shutdown.')
|
|
|
|
msg = 'Warm shut down. Press Ctrl+C again for a cold shutdown.'
|
|
|
|
|
|
|
|
self.log.warning(msg)
|
|
|
|
self._stopped = True
|
|
|
|
self._stopped = True
|
|
|
|
self.log.debug('Stopping after current horse is finished.')
|
|
|
|
self.log.debug('Stopping after current horse is finished.')
|
|
|
|
|
|
|
|
|
|
|
@ -234,7 +241,7 @@ class Worker(object):
|
|
|
|
signal.signal(signal.SIGTERM, request_stop)
|
|
|
|
signal.signal(signal.SIGTERM, request_stop)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def work(self, burst=False):
|
|
|
|
def work(self, burst=False): # noqa
|
|
|
|
"""Starts the work loop.
|
|
|
|
"""Starts the work loop.
|
|
|
|
|
|
|
|
|
|
|
|
Pops and performs all jobs on the current list of queues. When all
|
|
|
|
Pops and performs all jobs on the current list of queues. When all
|
|
|
@ -257,14 +264,17 @@ class Worker(object):
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('*** Listening on %s...' % (green(', '.join(qnames))))
|
|
|
|
self.log.info('*** Listening on %s...' % \
|
|
|
|
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
wait_for_job = not burst
|
|
|
|
wait_for_job = not burst
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
result = Queue.dequeue_any(self.queues, wait_for_job)
|
|
|
|
result = Queue.dequeue_any(self.queues, wait_for_job)
|
|
|
|
if result is None:
|
|
|
|
if result is None:
|
|
|
|
break
|
|
|
|
break
|
|
|
|
except UnpickleError as e:
|
|
|
|
except UnpickleError as e:
|
|
|
|
self.log.warning('*** Ignoring unpickleable data on %s.' % (e.queue.name,))
|
|
|
|
msg = '*** Ignoring unpickleable data on %s.' % \
|
|
|
|
|
|
|
|
green(e.queue.name)
|
|
|
|
|
|
|
|
self.log.warning(msg)
|
|
|
|
self.log.debug('Data follows:')
|
|
|
|
self.log.debug('Data follows:')
|
|
|
|
self.log.debug(e.raw_data)
|
|
|
|
self.log.debug(e.raw_data)
|
|
|
|
self.log.debug('End of unreadable data.')
|
|
|
|
self.log.debug('End of unreadable data.')
|
|
|
@ -272,7 +282,8 @@ class Worker(object):
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
job, queue = result
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id))
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
|
self.state = 'busy'
|
|
|
|
self.state = 'busy'
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
@ -301,11 +312,11 @@ class Worker(object):
|
|
|
|
break
|
|
|
|
break
|
|
|
|
except OSError as e:
|
|
|
|
except OSError as e:
|
|
|
|
# In case we encountered an OSError due to EINTR (which is
|
|
|
|
# In case we encountered an OSError due to EINTR (which is
|
|
|
|
# caused by a SIGINT or SIGTERM signal during os.waitpid()),
|
|
|
|
# caused by a SIGINT or SIGTERM signal during
|
|
|
|
# we simply ignore it and enter the next iteration of the
|
|
|
|
# os.waitpid()), we simply ignore it and enter the next
|
|
|
|
# loop, waiting for the child to end. In any other case,
|
|
|
|
# iteration of the loop, waiting for the child to end. In
|
|
|
|
# this is some other unexpected OS error, which we don't
|
|
|
|
# any other case, this is some other unexpected OS error,
|
|
|
|
# want to catch, so we re-raise those ones.
|
|
|
|
# which we don't want to catch, so we re-raise those ones.
|
|
|
|
if e.errno != errno.EINTR:
|
|
|
|
if e.errno != errno.EINTR:
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|