improve logging in worker.py (#902)

* improve logging in worker

* tests for log_result_lifespan
main
Samuel Colvin 8 years ago committed by Selwin Ong
parent f500186f3d
commit df571e14fd

@ -94,6 +94,9 @@ class Worker(object):
death_penalty_class = UnixSignalDeathPenalty death_penalty_class = UnixSignalDeathPenalty
queue_class = Queue queue_class = Queue
job_class = Job job_class = Job
# `log_result_lifespan` controls whether "Result is kept for XXX seconds"
# messages are logged after every job, by default they are.
log_result_lifespan = True
@classmethod @classmethod
def all(cls, connection=None, job_class=None, queue_class=None): def all(cls, connection=None, job_class=None, queue_class=None):
@ -132,7 +135,7 @@ class Worker(object):
connection=connection, connection=connection,
job_class=job_class, job_class=job_class,
queue_class=queue_class) queue_class=queue_class)
worker.refresh() worker.refresh()
return worker return worker
@ -253,7 +256,7 @@ class Worker(object):
def register_birth(self): def register_birth(self):
"""Registers its own birth.""" """Registers its own birth."""
self.log.debug('Registering birth of worker {0}'.format(self.name)) self.log.debug('Registering birth of worker %s', self.name)
if self.connection.exists(self.key) and \ if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'): not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already' msg = 'There exists an active worker named {0!r} already'
@ -383,8 +386,7 @@ class Worker(object):
# Take down the horse with the worker # Take down the horse with the worker
if self.horse_pid: if self.horse_pid:
msg = 'Taking down horse {0} with me'.format(self.horse_pid) self.log.debug('Taking down horse %s with me', self.horse_pid)
self.log.debug(msg)
self.kill_horse() self.kill_horse()
raise SystemExit() raise SystemExit()
@ -392,7 +394,7 @@ class Worker(object):
"""Stops the current worker loop but waits for child processes to """Stops the current worker loop but waits for child processes to
end gracefully (warm shutdown). end gracefully (warm shutdown).
""" """
self.log.debug('Got signal {0}'.format(signal_name(signum))) self.log.debug('Got signal %s', signal_name(signum))
signal.signal(signal.SIGINT, self.request_force_stop) signal.signal(signal.SIGINT, self.request_force_stop)
signal.signal(signal.SIGTERM, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop)
@ -451,6 +453,8 @@ class Worker(object):
self.register_birth() self.register_birth()
self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION))
self.set_state(WorkerStatus.STARTED) self.set_state(WorkerStatus.STARTED)
qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames)))
try: try:
while True: while True:
@ -487,12 +491,11 @@ class Worker(object):
def dequeue_job_and_maintain_ttl(self, timeout): def dequeue_job_and_maintain_ttl(self, timeout):
result = None result = None
qnames = self.queue_names() qnames = ','.join(self.queue_names())
self.set_state(WorkerStatus.IDLE) self.set_state(WorkerStatus.IDLE)
self.procline('Listening on {0}'.format(','.join(qnames))) self.procline('Listening on ' + qnames)
self.log.info('') self.log.debug('*** Listening on %s...', green(qnames))
self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames))))
while True: while True:
self.heartbeat() self.heartbeat()
@ -529,7 +532,7 @@ class Worker(object):
connection.expire(self.key, timeout) connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout)) 'Next one should arrive within %s seconds.', timeout)
def refresh(self): def refresh(self):
data = self.connection.hmget( data = self.connection.hmget(
@ -560,7 +563,7 @@ class Worker(object):
connection=self.connection, connection=self.connection,
job_class=self.job_class) job_class=self.job_class)
for queue in queues.split(',')] for queue in queues.split(',')]
def increment_failed_job_count(self, pipeline=None): def increment_failed_job_count(self, pipeline=None):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1) connection.hincrby(self.key, 'failed_job_count', 1)
@ -765,7 +768,7 @@ class Worker(object):
self.connection, self.connection,
job_class=self.job_class) job_class=self.job_class)
try: try:
job.started_at = utcnow() job.started_at = utcnow()
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform() rv = job.perform()
@ -793,15 +796,16 @@ class Worker(object):
self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
if rv is not None: if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv))) log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.debug('Result: {0}'.format(yellow(log_result))) self.log.debug('Result: %s', yellow(log_result))
result_ttl = job.get_result_ttl(self.default_result_ttl) if self.log_result_lifespan:
if result_ttl == 0: result_ttl = job.get_result_ttl(self.default_result_ttl)
self.log.info('Result discarded immediately') if result_ttl == 0:
elif result_ttl > 0: self.log.info('Result discarded immediately')
self.log.info('Result is kept for {0} seconds'.format(result_ttl)) elif result_ttl > 0:
else: self.log.info('Result is kept for {0} seconds'.format(result_ttl))
self.log.warning('Result will never expire, clean up result key manually') else:
self.log.warning('Result will never expire, clean up result key manually')
return True return True
@ -818,7 +822,7 @@ class Worker(object):
}) })
for handler in reversed(self._exc_handlers): for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler {0}'.format(handler)) self.log.debug('Invoking exception handler %s', handler)
fallthrough = handler(job, *exc_info) fallthrough = handler(job, *exc_info)
# Only handlers with explicit return values should disable further # Only handlers with explicit return values should disable further

@ -736,6 +736,30 @@ class TestWorker(RQTestCase):
self.assertEqual(job_check.meta['baz'], 10) self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka') self.assertEqual(job_check.meta['newinfo'], 'waka')
@mock.patch('rq.worker.logger.info')
def test_log_result_lifespan_true(self, mock_logger_info):
"""Check that log_result_lifespan True causes job lifespan to be logged."""
q = Queue()
w = Worker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.perform_job(job, q)
mock_logger_info.assert_called_with('Result is kept for 10 seconds')
self.assertIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list])
@mock.patch('rq.worker.logger.info')
def test_log_result_lifespan_false(self, mock_logger_info):
"""Check that log_result_lifespan False causes job lifespan to not be logged."""
q = Queue()
class TestWorker(Worker):
log_result_lifespan = False
w = TestWorker([q])
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w.perform_job(job, q)
self.assertNotIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list])
def kill_worker(pid, double_kill): def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process # wait for the worker to be started over on the main process

Loading…
Cancel
Save