diff --git a/rq/worker.py b/rq/worker.py index ec25119..db68ee2 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -94,6 +94,9 @@ class Worker(object): death_penalty_class = UnixSignalDeathPenalty queue_class = Queue job_class = Job + # `log_result_lifespan` controls whether "Result is kept for XXX seconds" + # messages are logged after every job, by default they are. + log_result_lifespan = True @classmethod def all(cls, connection=None, job_class=None, queue_class=None): @@ -132,7 +135,7 @@ class Worker(object): connection=connection, job_class=job_class, queue_class=queue_class) - + worker.refresh() return worker @@ -253,7 +256,7 @@ class Worker(object): def register_birth(self): """Registers its own birth.""" - self.log.debug('Registering birth of worker {0}'.format(self.name)) + self.log.debug('Registering birth of worker %s', self.name) if self.connection.exists(self.key) and \ not self.connection.hexists(self.key, 'death'): msg = 'There exists an active worker named {0!r} already' @@ -383,8 +386,7 @@ class Worker(object): # Take down the horse with the worker if self.horse_pid: - msg = 'Taking down horse {0} with me'.format(self.horse_pid) - self.log.debug(msg) + self.log.debug('Taking down horse %s with me', self.horse_pid) self.kill_horse() raise SystemExit() @@ -392,7 +394,7 @@ class Worker(object): """Stops the current worker loop but waits for child processes to end gracefully (warm shutdown). """ - self.log.debug('Got signal {0}'.format(signal_name(signum))) + self.log.debug('Got signal %s', signal_name(signum)) signal.signal(signal.SIGINT, self.request_force_stop) signal.signal(signal.SIGTERM, self.request_force_stop) @@ -451,6 +453,8 @@ class Worker(object): self.register_birth() self.log.info("RQ worker {0!r} started, version {1}".format(self.key, VERSION)) self.set_state(WorkerStatus.STARTED) + qnames = self.queue_names() + self.log.info('*** Listening on %s...', green(', '.join(qnames))) try: while True: @@ -487,12 +491,11 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): result = None - qnames = self.queue_names() + qnames = ','.join(self.queue_names()) self.set_state(WorkerStatus.IDLE) - self.procline('Listening on {0}'.format(','.join(qnames))) - self.log.info('') - self.log.info('*** Listening on {0}...'.format(green(', '.join(qnames)))) + self.procline('Listening on ' + qnames) + self.log.debug('*** Listening on %s...', green(qnames)) while True: self.heartbeat() @@ -529,7 +532,7 @@ class Worker(object): connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) self.log.debug('Sent heartbeat to prevent worker timeout. ' - 'Next one should arrive within {0} seconds.'.format(timeout)) + 'Next one should arrive within %s seconds.', timeout) def refresh(self): data = self.connection.hmget( @@ -560,7 +563,7 @@ class Worker(object): connection=self.connection, job_class=self.job_class) for queue in queues.split(',')] - + def increment_failed_job_count(self, pipeline=None): connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'failed_job_count', 1) @@ -765,7 +768,7 @@ class Worker(object): self.connection, job_class=self.job_class) - try: + try: job.started_at = utcnow() with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): rv = job.perform() @@ -793,15 +796,16 @@ class Worker(object): self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) - self.log.debug('Result: {0}'.format(yellow(log_result))) - - result_ttl = job.get_result_ttl(self.default_result_ttl) - if result_ttl == 0: - self.log.info('Result discarded immediately') - elif result_ttl > 0: - self.log.info('Result is kept for {0} seconds'.format(result_ttl)) - else: - self.log.warning('Result will never expire, clean up result key manually') + self.log.debug('Result: %s', yellow(log_result)) + + if self.log_result_lifespan: + result_ttl = job.get_result_ttl(self.default_result_ttl) + if result_ttl == 0: + self.log.info('Result discarded immediately') + elif result_ttl > 0: + self.log.info('Result is kept for {0} seconds'.format(result_ttl)) + else: + self.log.warning('Result will never expire, clean up result key manually') return True @@ -818,7 +822,7 @@ class Worker(object): }) for handler in reversed(self._exc_handlers): - self.log.debug('Invoking exception handler {0}'.format(handler)) + self.log.debug('Invoking exception handler %s', handler) fallthrough = handler(job, *exc_info) # Only handlers with explicit return values should disable further diff --git a/tests/test_worker.py b/tests/test_worker.py index 8e44d29..9b8bb81 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -736,6 +736,30 @@ class TestWorker(RQTestCase): self.assertEqual(job_check.meta['baz'], 10) self.assertEqual(job_check.meta['newinfo'], 'waka') + @mock.patch('rq.worker.logger.info') + def test_log_result_lifespan_true(self, mock_logger_info): + """Check that log_result_lifespan True causes job lifespan to be logged.""" + q = Queue() + + w = Worker([q]) + job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) + w.perform_job(job, q) + mock_logger_info.assert_called_with('Result is kept for 10 seconds') + self.assertIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list]) + + @mock.patch('rq.worker.logger.info') + def test_log_result_lifespan_false(self, mock_logger_info): + """Check that log_result_lifespan False causes job lifespan to not be logged.""" + q = Queue() + + class TestWorker(Worker): + log_result_lifespan = False + + w = TestWorker([q]) + job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) + w.perform_job(job, q) + self.assertNotIn('Result is kept for 10 seconds', [c[0][0] for c in mock_logger_info.call_args_list]) + def kill_worker(pid, double_kill): # wait for the worker to be started over on the main process