|
|
@ -102,7 +102,7 @@ class Worker(object):
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
prefix = cls.redis_worker_namespace_prefix
|
|
|
|
prefix = cls.redis_worker_namespace_prefix
|
|
|
|
if not worker_key.startswith(prefix):
|
|
|
|
if not worker_key.startswith(prefix):
|
|
|
|
raise ValueError('Not a valid RQ worker key: %s' % (worker_key,))
|
|
|
|
raise ValueError('Not a valid RQ worker key: {0}'.format(worker_key))
|
|
|
|
|
|
|
|
|
|
|
|
if connection is None:
|
|
|
|
if connection is None:
|
|
|
|
connection = get_current_connection()
|
|
|
|
connection = get_current_connection()
|
|
|
@ -166,13 +166,12 @@ class Worker(object):
|
|
|
|
raise NoQueueError('{0} is not a queue'.format(queue))
|
|
|
|
raise NoQueueError('{0} is not a queue'.format(queue))
|
|
|
|
|
|
|
|
|
|
|
|
def process_queue_args(self, queue_args):
|
|
|
|
def process_queue_args(self, queue_args):
|
|
|
|
""" allow for a string, a queue an iterable of strings
|
|
|
|
"""Allow for a string, a queue an iterable of strings or an iterable of queues"""
|
|
|
|
or an iterable of queues"""
|
|
|
|
|
|
|
|
if isinstance(queue_args, text_type):
|
|
|
|
if isinstance(queue_args, text_type):
|
|
|
|
return self.queue_class(name = queue_args)
|
|
|
|
return self.queue_class(name=queue_args)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type)
|
|
|
|
return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type) else queue_arg
|
|
|
|
else queue_arg for queue_arg in queue_args]
|
|
|
|
for queue_arg in queue_args]
|
|
|
|
|
|
|
|
|
|
|
|
def queue_names(self):
|
|
|
|
def queue_names(self):
|
|
|
|
"""Returns the queue names of this worker's queues."""
|
|
|
|
"""Returns the queue names of this worker's queues."""
|
|
|
@ -193,7 +192,7 @@ class Worker(object):
|
|
|
|
if self._name is None:
|
|
|
|
if self._name is None:
|
|
|
|
hostname = socket.gethostname()
|
|
|
|
hostname = socket.gethostname()
|
|
|
|
shortname, _, _ = hostname.partition('.')
|
|
|
|
shortname, _, _ = hostname.partition('.')
|
|
|
|
self._name = '%s.%s' % (shortname, self.pid)
|
|
|
|
self._name = '{0}.{1}'.format(shortname, self.pid)
|
|
|
|
return self._name
|
|
|
|
return self._name
|
|
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
@property
|
|
|
@ -223,15 +222,15 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
This can be used to make `ps -ef` output more readable.
|
|
|
|
This can be used to make `ps -ef` output more readable.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
setprocname('rq: %s' % (message,))
|
|
|
|
setprocname('rq: {0}'.format(message))
|
|
|
|
|
|
|
|
|
|
|
|
def register_birth(self):
|
|
|
|
def register_birth(self):
|
|
|
|
"""Registers its own birth."""
|
|
|
|
"""Registers its own birth."""
|
|
|
|
self.log.debug('Registering birth of worker %s' % (self.name,))
|
|
|
|
self.log.debug('Registering birth of worker {0}'.format(self.name))
|
|
|
|
if self.connection.exists(self.key) and \
|
|
|
|
if self.connection.exists(self.key) and \
|
|
|
|
not self.connection.hexists(self.key, 'death'):
|
|
|
|
not self.connection.hexists(self.key, 'death'):
|
|
|
|
raise ValueError('There exists an active worker named \'%s\' '
|
|
|
|
msg = 'There exists an active worker named {0!r} already'
|
|
|
|
'already.' % (self.name,))
|
|
|
|
raise ValueError(msg.format(self.name))
|
|
|
|
key = self.key
|
|
|
|
key = self.key
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
|
queues = ','.join(self.queue_names())
|
|
|
|
with self.connection._pipeline() as p:
|
|
|
|
with self.connection._pipeline() as p:
|
|
|
@ -326,18 +325,18 @@ class Worker(object):
|
|
|
|
def request_force_stop(signum, frame):
|
|
|
|
def request_force_stop(signum, frame):
|
|
|
|
"""Terminates the application (cold shutdown).
|
|
|
|
"""Terminates the application (cold shutdown).
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
self.log.warning('Cold shut down.')
|
|
|
|
self.log.warning('Cold shut down')
|
|
|
|
|
|
|
|
|
|
|
|
# Take down the horse with the worker
|
|
|
|
# Take down the horse with the worker
|
|
|
|
if self.horse_pid:
|
|
|
|
if self.horse_pid:
|
|
|
|
msg = 'Taking down horse %d with me.' % self.horse_pid
|
|
|
|
msg = 'Taking down horse {0} with me'.format(self.horse_pid)
|
|
|
|
self.log.debug(msg)
|
|
|
|
self.log.debug(msg)
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
os.kill(self.horse_pid, signal.SIGKILL)
|
|
|
|
os.kill(self.horse_pid, signal.SIGKILL)
|
|
|
|
except OSError as e:
|
|
|
|
except OSError as e:
|
|
|
|
# ESRCH ("No such process") is fine with us
|
|
|
|
# ESRCH ("No such process") is fine with us
|
|
|
|
if e.errno != errno.ESRCH:
|
|
|
|
if e.errno != errno.ESRCH:
|
|
|
|
self.log.debug('Horse already down.')
|
|
|
|
self.log.debug('Horse already down')
|
|
|
|
raise
|
|
|
|
raise
|
|
|
|
raise SystemExit()
|
|
|
|
raise SystemExit()
|
|
|
|
|
|
|
|
|
|
|
@ -345,12 +344,12 @@ class Worker(object):
|
|
|
|
"""Stops the current worker loop but waits for child processes to
|
|
|
|
"""Stops the current worker loop but waits for child processes to
|
|
|
|
end gracefully (warm shutdown).
|
|
|
|
end gracefully (warm shutdown).
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
self.log.debug('Got signal %s.' % signal_name(signum))
|
|
|
|
self.log.debug('Got signal {0}'.format(signal_name(signum)))
|
|
|
|
|
|
|
|
|
|
|
|
signal.signal(signal.SIGINT, request_force_stop)
|
|
|
|
signal.signal(signal.SIGINT, request_force_stop)
|
|
|
|
signal.signal(signal.SIGTERM, request_force_stop)
|
|
|
|
signal.signal(signal.SIGTERM, request_force_stop)
|
|
|
|
|
|
|
|
|
|
|
|
msg = 'Warm shut down requested.'
|
|
|
|
msg = 'Warm shut down requested'
|
|
|
|
self.log.warning(msg)
|
|
|
|
self.log.warning(msg)
|
|
|
|
|
|
|
|
|
|
|
|
# If shutdown is requested in the middle of a job, wait until
|
|
|
|
# If shutdown is requested in the middle of a job, wait until
|
|
|
@ -374,12 +373,12 @@ class Worker(object):
|
|
|
|
while not self.stopped and is_suspended(self.connection):
|
|
|
|
while not self.stopped and is_suspended(self.connection):
|
|
|
|
|
|
|
|
|
|
|
|
if burst:
|
|
|
|
if burst:
|
|
|
|
self.log.info('Suspended in burst mode -- exiting.'
|
|
|
|
self.log.info('Suspended in burst mode, exiting')
|
|
|
|
'Note: There could still be unperformed jobs on the queue')
|
|
|
|
self.log.info('Note: There could still be unfinished jobs on the queue')
|
|
|
|
raise StopRequested
|
|
|
|
raise StopRequested
|
|
|
|
|
|
|
|
|
|
|
|
if not notified:
|
|
|
|
if not notified:
|
|
|
|
self.log.info('Worker suspended, use "rq resume" command to resume')
|
|
|
|
self.log.info('Worker suspended, run `rq resume` to resume')
|
|
|
|
before_state = self.get_state()
|
|
|
|
before_state = self.get_state()
|
|
|
|
self.set_state(WorkerStatus.SUSPENDED)
|
|
|
|
self.set_state(WorkerStatus.SUSPENDED)
|
|
|
|
notified = True
|
|
|
|
notified = True
|
|
|
@ -402,7 +401,7 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
did_perform_work = False
|
|
|
|
did_perform_work = False
|
|
|
|
self.register_birth()
|
|
|
|
self.register_birth()
|
|
|
|
self.log.info("RQ worker, '%s', started, version %s" % (self.key, VERSION))
|
|
|
|
self.log.info("RQ worker {0!r} started, version %s".format(self.key, VERSION))
|
|
|
|
self.set_state(WorkerStatus.STARTED)
|
|
|
|
self.set_state(WorkerStatus.STARTED)
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
@ -414,7 +413,7 @@ class Worker(object):
|
|
|
|
self.clean_registries()
|
|
|
|
self.clean_registries()
|
|
|
|
|
|
|
|
|
|
|
|
if self.stopped:
|
|
|
|
if self.stopped:
|
|
|
|
self.log.info('Stopping on request.')
|
|
|
|
self.log.info('Stopping on request')
|
|
|
|
break
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
|
timeout = None if burst else max(1, self.default_worker_ttl - 60)
|
|
|
@ -422,7 +421,7 @@ class Worker(object):
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
if result is None:
|
|
|
|
if result is None:
|
|
|
|
if burst:
|
|
|
|
if burst:
|
|
|
|
self.log.info("RQ worker, '%s', done, quitting." % self.key)
|
|
|
|
self.log.info("RQ worker {0!r} done, quitting".format(self.key))
|
|
|
|
break
|
|
|
|
break
|
|
|
|
except StopRequested:
|
|
|
|
except StopRequested:
|
|
|
|
break
|
|
|
|
break
|
|
|
@ -446,10 +445,9 @@ class Worker(object):
|
|
|
|
qnames = self.queue_names()
|
|
|
|
qnames = self.queue_names()
|
|
|
|
|
|
|
|
|
|
|
|
self.set_state(WorkerStatus.IDLE)
|
|
|
|
self.set_state(WorkerStatus.IDLE)
|
|
|
|
self.procline('Listening on %s' % ','.join(qnames))
|
|
|
|
self.procline('Listening on {0}'.format(','.join(qnames)))
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('')
|
|
|
|
self.log.info('*** Listening on %s...' %
|
|
|
|
self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
|
|
|
|
green(', '.join(qnames)))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
self.heartbeat()
|
|
|
|
self.heartbeat()
|
|
|
@ -459,7 +457,7 @@ class Worker(object):
|
|
|
|
connection=self.connection)
|
|
|
|
connection=self.connection)
|
|
|
|
if result is not None:
|
|
|
|
if result is not None:
|
|
|
|
job, queue = result
|
|
|
|
job, queue = result
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
self.log.info('{0}: {1} ({2})'.format(green(queue.name),
|
|
|
|
blue(job.description), job.id))
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
break
|
|
|
@ -497,7 +495,7 @@ class Worker(object):
|
|
|
|
self.main_work_horse(job)
|
|
|
|
self.main_work_horse(job)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self._horse_pid = child_pid
|
|
|
|
self._horse_pid = child_pid
|
|
|
|
self.procline('Forked %d at %d' % (child_pid, time.time()))
|
|
|
|
self.procline('Forked {0} at {0}'.format(child_pid, time.time()))
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
self.set_state('busy')
|
|
|
|
self.set_state('busy')
|
|
|
@ -552,9 +550,8 @@ class Worker(object):
|
|
|
|
job.set_status(JobStatus.STARTED, pipeline=pipeline)
|
|
|
|
job.set_status(JobStatus.STARTED, pipeline=pipeline)
|
|
|
|
pipeline.execute()
|
|
|
|
pipeline.execute()
|
|
|
|
|
|
|
|
|
|
|
|
self.procline('Processing %s from %s since %s' % (
|
|
|
|
msg = 'Processing {0} from {1} since {2}'
|
|
|
|
job.func_name,
|
|
|
|
self.procline(msg.format(job.func_name, job.origin, time.time()))
|
|
|
|
job.origin, time.time()))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def perform_job(self, job):
|
|
|
|
def perform_job(self, job):
|
|
|
|
"""Performs the actual work of a job. Will/should only be called
|
|
|
|
"""Performs the actual work of a job. Will/should only be called
|
|
|
@ -599,14 +596,14 @@ class Worker(object):
|
|
|
|
if rv is None:
|
|
|
|
if rv is None:
|
|
|
|
self.log.info('Job OK')
|
|
|
|
self.log.info('Job OK')
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.log.info('Job OK, result = %s' % (yellow(text_type(rv)),))
|
|
|
|
self.log.info('Job OK, result = {0!r}'.format(yellow(text_type(rv))))
|
|
|
|
|
|
|
|
|
|
|
|
if result_ttl == 0:
|
|
|
|
if result_ttl == 0:
|
|
|
|
self.log.info('Result discarded immediately.')
|
|
|
|
self.log.info('Result discarded immediately')
|
|
|
|
elif result_ttl > 0:
|
|
|
|
elif result_ttl > 0:
|
|
|
|
self.log.info('Result is kept for %d seconds.' % result_ttl)
|
|
|
|
self.log.info('Result is kept for {0} seconds'.format(result_ttl))
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.log.warning('Result will never expire, clean up result key manually.')
|
|
|
|
self.log.warning('Result will never expire, clean up result key manually')
|
|
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
@ -622,7 +619,7 @@ class Worker(object):
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|
for handler in reversed(self._exc_handlers):
|
|
|
|
self.log.debug('Invoking exception handler %s' % (handler,))
|
|
|
|
self.log.debug('Invoking exception handler {0}'.format(handler))
|
|
|
|
fallthrough = handler(job, *exc_info)
|
|
|
|
fallthrough = handler(job, *exc_info)
|
|
|
|
|
|
|
|
|
|
|
|
# Only handlers with explicit return values should disable further
|
|
|
|
# Only handlers with explicit return values should disable further
|
|
|
@ -636,7 +633,7 @@ class Worker(object):
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
exc_string = ''.join(traceback.format_exception(*exc_info))
|
|
|
|
exc_string = ''.join(traceback.format_exception(*exc_info))
|
|
|
|
self.log.warning('Moving job to %s queue.' % self.failed_queue.name)
|
|
|
|
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
|
|
|
|
self.failed_queue.quarantine(job, exc_info=exc_string)
|
|
|
|
self.failed_queue.quarantine(job, exc_info=exc_string)
|
|
|
|
|
|
|
|
|
|
|
|
def push_exc_handler(self, handler_func):
|
|
|
|
def push_exc_handler(self, handler_func):
|
|
|
|