|
|
|
@ -430,7 +430,7 @@ class Worker(object):
|
|
|
|
|
raise StopRequested()
|
|
|
|
|
|
|
|
|
|
def handle_warm_shutdown_request(self):
|
|
|
|
|
self.log.warning('Warm shut down requested')
|
|
|
|
|
self.log.info('Warm shut down requested')
|
|
|
|
|
|
|
|
|
|
def check_for_suspension(self, burst):
|
|
|
|
|
"""Check to see if workers have been suspended by `rq suspend`"""
|
|
|
|
@ -469,7 +469,7 @@ class Worker(object):
|
|
|
|
|
self._install_signal_handlers()
|
|
|
|
|
did_perform_work = False
|
|
|
|
|
self.register_birth()
|
|
|
|
|
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
|
|
|
|
|
self.log.info("RQ worker %r started, version %s", self.key, VERSION)
|
|
|
|
|
self.set_state(WorkerStatus.STARTED)
|
|
|
|
|
qnames = self.queue_names()
|
|
|
|
|
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
|
|
|
|
@ -491,7 +491,7 @@ class Worker(object):
|
|
|
|
|
result = self.dequeue_job_and_maintain_ttl(timeout)
|
|
|
|
|
if result is None:
|
|
|
|
|
if burst:
|
|
|
|
|
self.log.info("RQ worker {0!r} done, quitting".format(self.key))
|
|
|
|
|
self.log.info("RQ worker %r done, quitting", self.key)
|
|
|
|
|
break
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
@ -526,12 +526,12 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
job, queue = result
|
|
|
|
|
if self.log_job_description:
|
|
|
|
|
self.log.info('{0}: {1} ({2})'.format(green(queue.name),
|
|
|
|
|
blue(job.description),
|
|
|
|
|
job.id))
|
|
|
|
|
self.log.info('%s: %s (%s)', green(queue.name),
|
|
|
|
|
blue(job.description),
|
|
|
|
|
job.id)
|
|
|
|
|
else:
|
|
|
|
|
self.log.info('{0}:{1}'.format(green(queue.name),
|
|
|
|
|
job.id))
|
|
|
|
|
self.log.info('%s:%s', green(queue.name),
|
|
|
|
|
job.id)
|
|
|
|
|
|
|
|
|
|
break
|
|
|
|
|
except DequeueTimeout:
|
|
|
|
@ -652,9 +652,9 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
# Unhandled failure: move the job to the failed queue
|
|
|
|
|
self.log.warning((
|
|
|
|
|
'Moving job to {0!r} queue '
|
|
|
|
|
'(work-horse terminated unexpectedly; waitpid returned {1})'
|
|
|
|
|
).format(self.failed_queue.name, ret_val))
|
|
|
|
|
'Moving job to %r queue '
|
|
|
|
|
'(work-horse terminated unexpectedly; waitpid returned %s)'
|
|
|
|
|
), self.failed_queue.name, ret_val)
|
|
|
|
|
self.failed_queue.quarantine(
|
|
|
|
|
job,
|
|
|
|
|
exc_info=(
|
|
|
|
@ -827,7 +827,7 @@ class Worker(object):
|
|
|
|
|
finally:
|
|
|
|
|
pop_connection()
|
|
|
|
|
|
|
|
|
|
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
|
|
|
|
|
self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
|
|
|
|
|
if rv is not None:
|
|
|
|
|
log_result = "{0!r}".format(as_text(text_type(rv)))
|
|
|
|
|
self.log.debug('Result: %s', yellow(log_result))
|
|
|
|
@ -837,9 +837,9 @@ class Worker(object):
|
|
|
|
|
if result_ttl == 0:
|
|
|
|
|
self.log.info('Result discarded immediately')
|
|
|
|
|
elif result_ttl > 0:
|
|
|
|
|
self.log.info('Result is kept for {0} seconds'.format(result_ttl))
|
|
|
|
|
self.log.info('Result is kept for %s seconds', result_ttl)
|
|
|
|
|
else:
|
|
|
|
|
self.log.warning('Result will never expire, clean up result key manually')
|
|
|
|
|
self.log.info('Result will never expire, clean up result key manually')
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
@ -869,7 +869,7 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
def move_to_failed_queue(self, job, *exc_info):
|
|
|
|
|
"""Default exception handler: move the job to the failed queue."""
|
|
|
|
|
self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name))
|
|
|
|
|
self.log.warning('Moving job to %r queue', self.failed_queue.name)
|
|
|
|
|
from .handlers import move_to_failed_queue
|
|
|
|
|
move_to_failed_queue(job, *exc_info)
|
|
|
|
|
|
|
|
|
@ -904,7 +904,7 @@ class Worker(object):
|
|
|
|
|
def clean_registries(self):
|
|
|
|
|
"""Runs maintenance jobs on each Queue's registries."""
|
|
|
|
|
for queue in self.queues:
|
|
|
|
|
self.log.info('Cleaning registries for queue: {0}'.format(queue.name))
|
|
|
|
|
self.log.info('Cleaning registries for queue: %s', queue.name)
|
|
|
|
|
clean_registries(queue)
|
|
|
|
|
self.last_cleaned_at = utcnow()
|
|
|
|
|
|
|
|
|
@ -952,7 +952,7 @@ class HerokuWorker(Worker):
|
|
|
|
|
def handle_warm_shutdown_request(self):
|
|
|
|
|
"""If horse is alive send it SIGRTMIN"""
|
|
|
|
|
if self.horse_pid != 0:
|
|
|
|
|
self.log.warning('Warm shut down requested, sending horse SIGRTMIN signal')
|
|
|
|
|
self.log.info('Warm shut down requested, sending horse SIGRTMIN signal')
|
|
|
|
|
self.kill_horse(sig=signal.SIGRTMIN)
|
|
|
|
|
else:
|
|
|
|
|
self.log.warning('Warm shut down requested, no horse found')
|
|
|
|
|