diff --git a/rq/queue.py b/rq/queue.py index b93ab4e..c9d7283 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -333,7 +333,7 @@ class Queue: else: end = length job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] - self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") + self.log.debug('Getting jobs for queue %s: %d found.', green(self.name), len(job_ids)) return job_ids def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']: @@ -452,7 +452,7 @@ class Queue: result = connection.lpush(self.key, job_id) else: result = connection.rpush(self.key, job_id) - self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") + self.log.debug('Pushed job %s into %s, %s job(s) are in queue.', blue(job_id), green(self.name), result) def create_job( self, diff --git a/rq/scheduler.py b/rq/scheduler.py index 6dfd3a4..e49b57e 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -109,7 +109,7 @@ class RQScheduler: """Returns names of queue it successfully acquires lock on""" successful_locks = set() pid = os.getpid() - self.log.debug("Trying to acquire locks for %s", ", ".join(self._queue_names)) + self.log.debug('Trying to acquire locks for %s', ', '.join(self._queue_names)) for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60): successful_locks.add(name) @@ -183,7 +183,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) + self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -195,7 +195,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) + self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -231,13 +231,13 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info('Scheduler for %s started with PID %s', ', '.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise - scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) + scheduler.log.info('Scheduler with PID %d has stopped', os.getpid()) def parse_names(queues_or_names): diff --git a/rq/worker.py b/rq/worker.py index 7852569..3e37425 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -729,7 +729,7 @@ class Worker: """ setup_loghandlers(logging_level, date_format, log_format) self.register_birth() - self.log.info("Worker %s: started, version %s", self.key, VERSION) + self.log.info('Worker %s: started, version %s', self.key, VERSION) self.subscribe() self.set_state(WorkerStatus.STARTED) qnames = self.queue_names() @@ -827,9 +827,9 @@ class Worker: result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time) if result is None: if burst: - self.log.info("Worker %s: done, quitting", self.key) + self.log.info('Worker %s: done, quitting', self.key) elif max_idle_time is not None: - self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time) + self.log.info('Worker %s: idle for %d seconds, quitting', self.key, max_idle_time) break job, queue = result @@ -839,11 +839,11 @@ class Worker: completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) + self.log.info('Worker %s: finished executing %d jobs, quitting', self.key, completed_jobs) break except redis.exceptions.TimeoutError: - self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...") + self.log.error('Worker %s: Redis connection timeout, quitting...', self.key) break except StopRequested: @@ -905,7 +905,7 @@ class Worker: if timeout is not None and idle_time_left is not None: timeout = min(timeout, idle_time_left) - self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") + self.log.debug('Dequeueing jobs on queues %s and timeout %d', green(qnames), timeout) result = self.queue_class.dequeue_any( self._ordered_queues, timeout, @@ -916,7 +916,7 @@ class Worker: if result is not None: job, queue = result self.reorder_queues(reference_queue=queue) - self.log.debug(f"Dequeued job {blue(job.id)} from {green(queue.name)}") + self.log.debug('Dequeued job %s from %s', blue(job.id), green(queue.name)) job.redis_server_version = self.get_redis_server_version() if self.log_job_description: self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) @@ -1154,15 +1154,15 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") + self.handle_job_failure(job, queue=queue, exc_string='Job stopped by user, work-horse terminated.') elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else "" + signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else '' exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; " - self.log.warning(f'Moving job to FailedJobRegistry ({exc_string})') + self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) self.handle_work_horse_killed(job, retpid, ret_val, rusage) self.handle_job_failure( @@ -1237,7 +1237,7 @@ class Worker: """Performs misc bookkeeping like updating states prior to job execution. """ - self.log.debug(f"Preparing for execution of Job ID {job.id}") + self.log.debug('Preparing for execution of Job ID %s', job.id) with self.connection.pipeline() as pipeline: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) @@ -1248,7 +1248,7 @@ class Worker: job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() - self.log.debug(f"Job preparation finished.") + self.log.debug('Job preparation finished.') msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) @@ -1348,7 +1348,7 @@ class Worker: result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: - self.log.debug(f"Saving job {job.id}'s successful execution result") + self.log.debug('Saving job %s\'s successful execution result', job.id) job._handle_success(result_ttl, pipeline=pipeline) job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) @@ -1369,7 +1369,7 @@ class Worker: job (Job): The Job result (Any): The job's result. """ - self.log.debug(f"Running success callbacks for {job.id}") + self.log.debug('Running success callbacks for %s', job.id) job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.success_callback(job, self.connection, result) @@ -1383,7 +1383,7 @@ class Worker: if not job.failure_callback: return - self.log.debug(f"Running failure callbacks for {job.id}") + self.log.debug('Running failure callbacks for %s', job.id) job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.failure_callback(job, self.connection, *exc_info) @@ -1401,7 +1401,7 @@ class Worker: """ push_connection(self.connection) started_job_registry = queue.started_job_registry - self.log.debug("Started Job Registry set.") + self.log.debug('Started Job Registry set.') try: self.prepare_job_execution(job) @@ -1409,9 +1409,9 @@ class Worker: job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): - self.log.debug("Performing Job...") + self.log.debug('Performing Job...') rv = job.perform() - self.log.debug(f"Finished performing Job ID {job.id}") + self.log.debug('Finished performing Job ID %s', job.id) job.ended_at = utcnow() @@ -1424,7 +1424,7 @@ class Worker: self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA - self.log.debug(f"Job {job.id} raised an exception.") + self.log.debug('Job %s raised an exception.', job.id) job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) @@ -1447,8 +1447,7 @@ class Worker: self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) if rv is not None: - log_result = "{0!r}".format(as_text(str(rv))) - self.log.debug('Result: %s', yellow(log_result)) + self.log.debug('Result: %r', yellow(as_text(str(rv)))) if self.log_result_lifespan: result_ttl = job.get_result_ttl(self.default_result_ttl) @@ -1467,7 +1466,7 @@ class Worker: the other properties are accessed, which will stop exceptions from being properly logged, so we guard against it here. """ - self.log.debug(f"Handling exception for {job.id}.") + self.log.debug('Handling exception for %s.', job.id) exc_string = ''.join(traceback.format_exception(*exc_info)) try: extra = { @@ -1484,7 +1483,9 @@ class Worker: extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) + self.log.error( + '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra + ) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler)