Propagate logging settings from worker to scheduler (#1366)

main
Chris de Graaf 4 years ago committed by GitHub
parent 676ec9b0ea
commit 5988889e57
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -7,25 +7,18 @@ import traceback
from datetime import datetime from datetime import datetime
from multiprocessing import Process from multiprocessing import Process
from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT
from .job import Job from .job import Job
from .logutils import setup_loghandlers
from .queue import Queue from .queue import Queue
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
from .utils import current_timestamp, enum from .utils import current_timestamp, enum
from .logutils import setup_loghandlers
from redis import Redis, SSLConnection from redis import Redis, SSLConnection
SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s'
SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s' SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s'
logger = logging.getLogger(__name__)
setup_loghandlers(
level=logging.INFO,
name="rq.scheduler",
log_format="%(asctime)s: %(message)s",
date_format="%H:%M:%S"
)
class RQScheduler(object): class RQScheduler(object):
# STARTED: scheduler has been started but sleeping # STARTED: scheduler has been started but sleeping
@ -39,7 +32,9 @@ class RQScheduler(object):
STOPPED='stopped' STOPPED='stopped'
) )
def __init__(self, queues, connection, interval=1): def __init__(self, queues, connection, interval=1, logging_level=logging.INFO,
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT):
self._queue_names = set(parse_names(queues)) self._queue_names = set(parse_names(queues))
self._acquired_locks = set() self._acquired_locks = set()
self._scheduled_job_registries = [] self._scheduled_job_registries = []
@ -54,6 +49,13 @@ class RQScheduler(object):
self._stop_requested = False self._stop_requested = False
self._status = self.Status.STOPPED self._status = self.Status.STOPPED
self._process = None self._process = None
self.log = logging.getLogger(__name__)
setup_loghandlers(
level=logging_level,
name=__name__,
log_format=log_format,
date_format=date_format,
)
@property @property
def connection(self): def connection(self):
@ -83,7 +85,7 @@ class RQScheduler(object):
"""Returns names of queue it successfully acquires lock on""" """Returns names of queue it successfully acquires lock on"""
successful_locks = set() successful_locks = set()
pid = os.getpid() pid = os.getpid()
logger.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names: for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60): if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60):
successful_locks.add(name) successful_locks.add(name)
@ -157,7 +159,8 @@ class RQScheduler(object):
def heartbeat(self): def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks""" """Updates the TTL on scheduler keys and the locks"""
logger.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) self.log.debug("Scheduler sending heartbeat to %s",
", ".join(self.acquired_locks))
if len(self._queue_names) > 1: if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for name in self._queue_names: for name in self._queue_names:
@ -169,8 +172,8 @@ class RQScheduler(object):
self.connection.expire(key, self.interval + 5) self.connection.expire(key, self.interval + 5)
def stop(self): def stop(self):
logger.info("Scheduler stopping, releasing locks for %s...", self.log.info("Scheduler stopping, releasing locks for %s...",
','.join(self._queue_names)) ','.join(self._queue_names))
keys = [self.get_locking_key(name) for name in self._queue_names] keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys) self.connection.delete(*keys)
self._status = self.Status.STOPPED self._status = self.Status.STOPPED
@ -201,17 +204,17 @@ class RQScheduler(object):
def run(scheduler): def run(scheduler):
logger.info("Scheduler for %s started with PID %s", scheduler.log.info("Scheduler for %s started with PID %s",
','.join(scheduler._queue_names), os.getpid()) ','.join(scheduler._queue_names), os.getpid())
try: try:
scheduler.work() scheduler.work()
except: # noqa except: # noqa
logger.error( scheduler.log.error(
'Scheduler [PID %s] raised an exception.\n%s', 'Scheduler [PID %s] raised an exception.\n%s',
os.getpid(), traceback.format_exc() os.getpid(), traceback.format_exc()
) )
raise raise
logger.info("Scheduler with PID %s has stopped", os.getpid()) scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())
def parse_names(queues_or_names): def parse_names(queues_or_names):

@ -541,7 +541,9 @@ class Worker(object):
self.log.info('*** Listening on %s...', green(', '.join(qnames))) self.log.info('*** Listening on %s...', green(', '.join(qnames)))
if with_scheduler: if with_scheduler:
self.scheduler = RQScheduler(self.queues, connection=self.connection) self.scheduler = RQScheduler(
self.queues, connection=self.connection, logging_level=logging_level,
date_format=date_format, log_format=log_format)
self.scheduler.acquire_locks() self.scheduler.acquire_locks()
# If lock is acquired, start scheduler # If lock is acquired, start scheduler
if self.scheduler.acquired_locks: if self.scheduler.acquired_locks:

Loading…
Cancel
Save