Enhance debug logs (#1752)

* Add debug logs to dequeue

There were reports where a worker wouldn't fetch new jobs.
Since one possible cause is the infinite hanging of the `BLPOP` command,
this adds a couple of logs surrounding the `dequeue_any` function,
that interacts with Redis using `BLPOP`,

* Update worker.py

Fix Typo

* More logs

* Logs

* Update worker.py

* Add debug logs to the queue

* Reviewed logs

* Remove queue count before job push

* New log wording

* Remove logs
main
lowercase00 2 years ago committed by GitHub
parent 0f5f18bec7
commit 3be814c6d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,7 @@ import uuid
import sys
import warnings
import typing as t
import logging
from collections import namedtuple
from datetime import datetime, timezone
@ -17,7 +18,14 @@ from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow
from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
logger = logging.getLogger("rq.queue")
def compact(lst):
@ -78,6 +86,7 @@ class Queue:
self._key = '{0}{1}'.format(prefix, name)
self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT
self._is_async = is_async
self.log = logger
if 'async' in kwargs:
self._is_async = kwargs['async']
@ -204,8 +213,13 @@ class Queue:
end = offset + (length - 1)
else:
end = length
return [as_text(job_id) for job_id in
self.connection.lrange(self.key, start, end)]
job_ids = [
as_text(job_id)
for job_id
in self.connection.lrange(self.key, start, end)
]
self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
return job_ids
def get_jobs(self, offset: int = 0, length: int = -1):
"""Returns a slice of jobs in the queue."""
@ -293,9 +307,10 @@ class Queue:
'at_front' allows you to push the job onto the front instead of the back of the queue"""
connection = pipeline if pipeline is not None else self.connection
if at_front:
connection.lpush(self.key, job_id)
result = connection.lpush(self.key, job_id)
else:
connection.rpush(self.key, job_id)
result = connection.rpush(self.key, job_id)
self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
@ -682,7 +697,7 @@ class Queue:
return as_text(self.connection.lpop(self.key))
@classmethod
def lpop(cls, queue_keys, timeout, connection: t.Optional['Redis'] = None):
def lpop(cls, queue_keys, timeout: int, connection: t.Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@ -699,10 +714,13 @@ class Queue:
if timeout is not None: # blocking variant
if timeout == 0:
raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0')
logger.debug(f"Starting BLPOP operation for queues {green(queue_keys)} with timeout of {timeout}")
result = connection.blpop(queue_keys, timeout)
if result is None:
logger.debug(f"BLPOP Timeout, no jobs found on queues {green(queue_keys)}")
raise DequeueTimeout(timeout, queue_keys)
queue_key, job_id = result
logger.debug(f"Dequeued job {blue(job_id)} from queue {green(queue_key)}")
return queue_key, job_id
else: # non-blocking variant
for queue_key in queue_keys:

@ -60,7 +60,7 @@ green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue')
logger = logging.getLogger(__name__)
logger = logging.getLogger("rq.worker")
class StopRequested(Exception):
@ -181,7 +181,7 @@ class Worker:
if connection is None:
connection = get_current_connection()
self.connection = connection
self.redis_server_version = None
self.job_class = backend_class(self, 'job_class', override=job_class)
@ -690,10 +690,12 @@ class Worker:
if self.should_run_maintenance_tasks:
self.run_maintenance_tasks()
self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}")
result = self.queue_class.dequeue_any(self._ordered_queues, timeout,
connection=self.connection,
job_class=self.job_class,
serializer=self.serializer)
self.log.debug(f"Dequeued job {result[1]} from {result[0]}")
if result is not None:
job, queue = result
@ -946,7 +948,7 @@ class Worker:
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
self.log.debug(f"Preparing for execution of Job ID {job.id}")
with self.connection.pipeline() as pipeline:
self.set_current_job_id(job.id, pipeline=pipeline)
self.set_current_job_working_time(0, pipeline=pipeline)
@ -957,6 +959,7 @@ class Worker:
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
self.log.debug(f"Job preparation finished.")
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
@ -1084,12 +1087,14 @@ class Worker:
def execute_success_callback(self, job: 'Job', result):
"""Executes success_callback with timeout"""
self.log.debug(f"Running success callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.success_callback(job, self.connection, result)
def execute_failure_callback(self, job):
"""Executes failure_callback with timeout"""
self.log.debug(f"Running failure callbacks for {job.id}")
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection, *sys.exc_info())
@ -1101,6 +1106,7 @@ class Worker:
push_connection(self.connection)
started_job_registry = queue.started_job_registry
self.log.debug("Started Job Registry set.")
try:
self.prepare_job_execution(job)
@ -1108,7 +1114,9 @@ class Worker:
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):
self.log.debug("Performing Job...")
rv = job.perform()
self.log.debug(f"Finished performing Job ID {job.id}")
job.ended_at = utcnow()
@ -1123,6 +1131,7 @@ class Worker:
queue=queue,
started_job_registry=started_job_registry)
except: # NOQA
self.log.debug(f"Job {job.id} raised an exception.")
job.ended_at = utcnow()
exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info))
@ -1164,7 +1173,7 @@ class Worker:
def handle_exception(self, job: 'Job', *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
self.log.debug(f"Handling exception for {job.id}.")
exc_string = ''.join(traceback.format_exception(*exc_info))
# If the job cannot be deserialized, it will raise when func_name or

Loading…
Cancel
Save