diff --git a/rq/scheduler.py b/rq/scheduler.py index 1be2e29..561ee5a 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -7,25 +7,18 @@ import traceback from datetime import datetime from multiprocessing import Process +from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT from .job import Job +from .logutils import setup_loghandlers from .queue import Queue from .registry import ScheduledJobRegistry from .utils import current_timestamp, enum -from .logutils import setup_loghandlers from redis import Redis, SSLConnection SCHEDULER_KEY_TEMPLATE = 'rq:scheduler:%s' SCHEDULER_LOCKING_KEY_TEMPLATE = 'rq:scheduler-lock:%s' -logger = logging.getLogger(__name__) -setup_loghandlers( - level=logging.INFO, - name="rq.scheduler", - log_format="%(asctime)s: %(message)s", - date_format="%H:%M:%S" -) - class RQScheduler(object): # STARTED: scheduler has been started but sleeping @@ -39,7 +32,9 @@ class RQScheduler(object): STOPPED='stopped' ) - def __init__(self, queues, connection, interval=1): + def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() self._scheduled_job_registries = [] @@ -54,6 +49,13 @@ class RQScheduler(object): self._stop_requested = False self._status = self.Status.STOPPED self._process = None + self.log = logging.getLogger(__name__) + setup_loghandlers( + level=logging_level, + name=__name__, + log_format=log_format, + date_format=date_format, + ) @property def connection(self): @@ -83,7 +85,7 @@ class RQScheduler(object): """Returns names of queue it successfully acquires lock on""" successful_locks = set() pid = os.getpid() - logger.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) + self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) for name in self._queue_names: if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60): successful_locks.add(name) @@ -157,7 +159,8 @@ class RQScheduler(object): def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - logger.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) + self.log.debug("Scheduler sending heartbeat to %s", + ", ".join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._queue_names: @@ -169,8 +172,8 @@ class RQScheduler(object): self.connection.expire(key, self.interval + 5) def stop(self): - logger.info("Scheduler stopping, releasing locks for %s...", - ','.join(self._queue_names)) + self.log.info("Scheduler stopping, releasing locks for %s...", + ','.join(self._queue_names)) keys = [self.get_locking_key(name) for name in self._queue_names] self.connection.delete(*keys) self._status = self.Status.STOPPED @@ -201,17 +204,17 @@ class RQScheduler(object): def run(scheduler): - logger.info("Scheduler for %s started with PID %s", - ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info("Scheduler for %s started with PID %s", + ','.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa - logger.error( + scheduler.log.error( 'Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc() ) raise - logger.info("Scheduler with PID %s has stopped", os.getpid()) + scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) def parse_names(queues_or_names): diff --git a/rq/worker.py b/rq/worker.py index cfbc675..1e2dba5 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -447,7 +447,7 @@ class Worker(object): self.handle_warm_shutdown_request() self._shutdown() - + def _shutdown(self): """ If shutdown is requested in the middle of a job, wait until @@ -504,7 +504,7 @@ class Worker(object): if self.scheduler and not self.scheduler._process: self.scheduler.acquire_locks(auto_start=True) self.clean_registries() - + def subscribe(self): """Subscribe to this worker's channel""" self.log.info('Subscribing to channel %s', self.pubsub_channel_name) @@ -541,7 +541,9 @@ class Worker(object): self.log.info('*** Listening on %s...', green(', '.join(qnames))) if with_scheduler: - self.scheduler = RQScheduler(self.queues, connection=self.connection) + self.scheduler = RQScheduler( + self.queues, connection=self.connection, logging_level=logging_level, + date_format=date_format, log_format=log_format) self.scheduler.acquire_locks() # If lock is acquired, start scheduler if self.scheduler.acquired_locks: