diff --git a/rq/queue.py b/rq/queue.py index 853cd2a..ef650d8 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -2,6 +2,7 @@ import uuid import sys import warnings import typing as t +import logging from collections import namedtuple from datetime import datetime, timezone @@ -17,7 +18,14 @@ from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus from .serializers import resolve_serializer -from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow +from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow + + +green = make_colorizer('darkgreen') +yellow = make_colorizer('darkyellow') +blue = make_colorizer('darkblue') + +logger = logging.getLogger("rq.queue") def compact(lst): @@ -78,6 +86,7 @@ class Queue: self._key = '{0}{1}'.format(prefix, name) self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT self._is_async = is_async + self.log = logger if 'async' in kwargs: self._is_async = kwargs['async'] @@ -204,8 +213,13 @@ class Queue: end = offset + (length - 1) else: end = length - return [as_text(job_id) for job_id in - self.connection.lrange(self.key, start, end)] + job_ids = [ + as_text(job_id) + for job_id + in self.connection.lrange(self.key, start, end) + ] + self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") + return job_ids def get_jobs(self, offset: int = 0, length: int = -1): """Returns a slice of jobs in the queue.""" @@ -293,9 +307,10 @@ class Queue: 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection if at_front: - connection.lpush(self.key, job_id) + result = connection.lpush(self.key, job_id) else: - connection.rpush(self.key, job_id) + result = connection.rpush(self.key, job_id) + self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, @@ -682,7 +697,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys, timeout, connection: t.Optional['Redis'] = None): + def lpop(cls, queue_keys, timeout: int, connection: t.Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -699,10 +714,13 @@ class Queue: if timeout is not None: # blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') + logger.debug(f"Starting BLPOP operation for queues {green(queue_keys)} with timeout of {timeout}") result = connection.blpop(queue_keys, timeout) if result is None: + logger.debug(f"BLPOP Timeout, no jobs found on queues {green(queue_keys)}") raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result + logger.debug(f"Dequeued job {blue(job_id)} from queue {green(queue_key)}") return queue_key, job_id else: # non-blocking variant for queue_key in queue_keys: diff --git a/rq/worker.py b/rq/worker.py index 7841faa..28ff6b9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -60,7 +60,7 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') -logger = logging.getLogger(__name__) +logger = logging.getLogger("rq.worker") class StopRequested(Exception): @@ -181,7 +181,7 @@ class Worker: if connection is None: connection = get_current_connection() self.connection = connection - + self.redis_server_version = None self.job_class = backend_class(self, 'job_class', override=job_class) @@ -690,10 +690,12 @@ class Worker: if self.should_run_maintenance_tasks: self.run_maintenance_tasks() + self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") result = self.queue_class.dequeue_any(self._ordered_queues, timeout, connection=self.connection, job_class=self.job_class, serializer=self.serializer) + self.log.debug(f"Dequeued job {result[1]} from {result[0]}") if result is not None: job, queue = result @@ -946,7 +948,7 @@ class Worker: """Performs misc bookkeeping like updating states prior to job execution. """ - + self.log.debug(f"Preparing for execution of Job ID {job.id}") with self.connection.pipeline() as pipeline: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) @@ -957,6 +959,7 @@ class Worker: job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() + self.log.debug(f"Job preparation finished.") msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) @@ -1084,12 +1087,14 @@ class Worker: def execute_success_callback(self, job: 'Job', result): """Executes success_callback with timeout""" + self.log.debug(f"Running success callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.success_callback(job, self.connection, result) def execute_failure_callback(self, job): """Executes failure_callback with timeout""" + self.log.debug(f"Running failure callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.failure_callback(job, self.connection, *sys.exc_info()) @@ -1101,6 +1106,7 @@ class Worker: push_connection(self.connection) started_job_registry = queue.started_job_registry + self.log.debug("Started Job Registry set.") try: self.prepare_job_execution(job) @@ -1108,7 +1114,9 @@ class Worker: job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): + self.log.debug("Performing Job...") rv = job.perform() + self.log.debug(f"Finished performing Job ID {job.id}") job.ended_at = utcnow() @@ -1123,6 +1131,7 @@ class Worker: queue=queue, started_job_registry=started_job_registry) except: # NOQA + self.log.debug(f"Job {job.id} raised an exception.") job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) @@ -1164,7 +1173,7 @@ class Worker: def handle_exception(self, job: 'Job', *exc_info): """Walks the exception handler stack to delegate exception handling.""" - + self.log.debug(f"Handling exception for {job.id}.") exc_string = ''.join(traceback.format_exception(*exc_info)) # If the job cannot be deserialized, it will raise when func_name or