feat: splits the work loop into a bootstrap method (#1816)

main
lowercase00 2 years ago committed by GitHub
parent 5798cddd04
commit fc6d69529f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -679,36 +679,23 @@ class Worker:
""" """
pass pass
def work( def bootstrap(
self, self,
burst: bool = False,
logging_level: str = "INFO", logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT, date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None, ):
with_scheduler: bool = False, """Bootstraps the worker.
) -> bool: Runs the basic tasks that should run when the worker actually starts working.
"""Starts the work loop. Used so that new workers can focus on the work loop implementation rather
than the full bootstraping process.
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed.
Args: Args:
burst (bool, optional): Whether to work on burst mode. Defaults to False.
logging_level (str, optional): Logging level to use. Defaults to "INFO". logging_level (str, optional): Logging level to use. Defaults to "INFO".
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns:
worked (bool): Will return True if any job was processed, False otherwise.
""" """
setup_loghandlers(logging_level, date_format, log_format) setup_loghandlers(logging_level, date_format, log_format)
completed_jobs = 0
self.register_birth() self.register_birth()
self.log.info("Worker %s: started, version %s", self.key, VERSION) self.log.info("Worker %s: started, version %s", self.key, VERSION)
self.subscribe() self.subscribe()
@ -716,7 +703,26 @@ class Worker:
qnames = self.queue_names() qnames = self.queue_names()
self.log.info('*** Listening on %s...', green(', '.join(qnames))) self.log.info('*** Listening on %s...', green(', '.join(qnames)))
if with_scheduler: def _start_scheduler(
self,
burst: bool = False,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
):
"""Starts the scheduler process.
This is specifically designed to be run by the worker when running the `work()` method.
Instanciates the RQScheduler and tries to acquire a lock.
If the lock is acquired, start scheduler.
If worker is on burst mode just enqueues scheduled jobs and quits,
otherwise, starts the scheduler in a separate process.
Args:
burst (bool, optional): Whether to work on burst mode. Defaults to False.
logging_level (str, optional): Logging level to use. Defaults to "INFO".
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
"""
self.scheduler = RQScheduler( self.scheduler = RQScheduler(
self.queues, self.queues,
connection=self.connection, connection=self.connection,
@ -726,16 +732,46 @@ class Worker:
serializer=self.serializer, serializer=self.serializer,
) )
self.scheduler.acquire_locks() self.scheduler.acquire_locks()
# If lock is acquired, start scheduler
if self.scheduler.acquired_locks: if self.scheduler.acquired_locks:
# If worker is run on burst mode, enqueue_scheduled_jobs()
# before working. Otherwise, start scheduler in a separate process
if burst: if burst:
self.scheduler.enqueue_scheduled_jobs() self.scheduler.enqueue_scheduled_jobs()
self.scheduler.release_locks() self.scheduler.release_locks()
else: else:
self.scheduler.start() self.scheduler.start()
def work(
self,
burst: bool = False,
logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None,
with_scheduler: bool = False,
) -> bool:
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled.
The return value indicates whether any jobs were processed.
Args:
burst (bool, optional): Whether to work on burst mode. Defaults to False.
logging_level (str, optional): Logging level to use. Defaults to "INFO".
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns:
worked (bool): Will return True if any job was processed, False otherwise.
"""
self.bootstrap(logging_level, date_format, log_format)
completed_jobs = 0
if with_scheduler:
self._start_scheduler(burst, logging_level, date_format, log_format)
self._install_signal_handlers() self._install_signal_handlers()
try: try:
while True: while True:

Loading…
Cancel
Save