Worker - max_idle_time feature (#1795)

* fix accessing None when dequeued result is None (burst=True, or timeout=None)

* add a test

* implement + tests

* fix if

* adjust test

* merge

* test

* test

* merge master

* take max_idle_time into account for dequeue_timeout

* refactor a bit

* potential bug fix

* tests

* math.ceil

* buffer tests
main
Rony Lutsky 2 years ago committed by GitHub
parent 41406db3eb
commit aedc9b9e06
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -223,6 +223,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--pid', help='Write the process ID number to a file at the specified path') @click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler') @click.option('--disable-default-exception-handler', '-d', is_flag=True, help='Disable RQ\'s default exception handler')
@click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute') @click.option('--max-jobs', type=int, default=None, help='Maximum number of jobs to execute')
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@ -246,6 +247,7 @@ def worker(
pid, pid,
disable_default_exception_handler, disable_default_exception_handler,
max_jobs, max_jobs,
max_idle_time,
with_scheduler, with_scheduler,
queues, queues,
log_format, log_format,
@ -317,6 +319,7 @@ def worker(
date_format=date_format, date_format=date_format,
log_format=log_format, log_format=log_format,
max_jobs=max_jobs, max_jobs=max_jobs,
max_idle_time=max_idle_time,
with_scheduler=with_scheduler, with_scheduler=with_scheduler,
) )
except ConnectionError as e: except ConnectionError as e:

@ -1140,7 +1140,7 @@ class Queue:
return as_text(self.connection.lpop(self.key)) return as_text(self.connection.lpop(self.key))
@classmethod @classmethod
def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None): def lpop(cls, queue_keys: List[str], timeout: Optional[int], connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some """Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to accepts multiple. So if we want the non-blocking LPOP, we need to
@ -1155,7 +1155,7 @@ class Queue:
Args: Args:
queue_keys (_type_): _description_ queue_keys (_type_): _description_
timeout (int): _description_ timeout (Optional[int]): _description_
connection (Optional[Redis], optional): _description_. Defaults to None. connection (Optional[Redis], optional): _description_. Defaults to None.
Raises: Raises:
@ -1188,7 +1188,7 @@ class Queue:
def dequeue_any( def dequeue_any(
cls, cls,
queues: List['Queue'], queues: List['Queue'],
timeout: int, timeout: Optional[int],
connection: Optional['Redis'] = None, connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None, job_class: Optional['Job'] = None,
serializer: Any = None, serializer: Any = None,
@ -1205,7 +1205,7 @@ class Queue:
Args: Args:
queues (List[Queue]): List of queue objects queues (List[Queue]): List of queue objects
timeout (int): Timeout for the LPOP timeout (Optional[int]): Timeout for the LPOP
connection (Optional[Redis], optional): Redis Connection. Defaults to None. connection (Optional[Redis], optional): Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The job classification. Defaults to None. job_class (Optional[Job], optional): The job classification. Defaults to None.
serializer (Any, optional): Serializer to use. Defaults to None. serializer (Any, optional): Serializer to use. Defaults to None.

@ -1,6 +1,7 @@
import contextlib import contextlib
import errno import errno
import logging import logging
import math
import os import os
import random import random
import resource import resource
@ -746,6 +747,7 @@ class Worker:
date_format: str = DEFAULT_LOGGING_DATE_FORMAT, date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT,
max_jobs: Optional[int] = None, max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None,
with_scheduler: bool = False, with_scheduler: bool = False,
) -> bool: ) -> bool:
"""Starts the work loop. """Starts the work loop.
@ -753,6 +755,7 @@ class Worker:
Pops and performs all jobs on the current list of queues. When all Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` mode is enabled. queues, unless `burst` mode is enabled.
If `max_idle_time` is provided, worker will die when it's idle for more than the provided value.
The return value indicates whether any jobs were processed. The return value indicates whether any jobs were processed.
@ -762,6 +765,7 @@ class Worker:
date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT. date_format (str, optional): Date Format. Defaults to DEFAULT_LOGGING_DATE_FORMAT.
log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT. log_format (str, optional): Log Format. Defaults to DEFAULT_LOGGING_FORMAT.
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
Returns: Returns:
@ -786,10 +790,12 @@ class Worker:
break break
timeout = None if burst else self.dequeue_timeout timeout = None if burst else self.dequeue_timeout
result = self.dequeue_job_and_maintain_ttl(timeout) result = self.dequeue_job_and_maintain_ttl(timeout, max_idle_time)
if result is None: if result is None:
if burst: if burst:
self.log.info("Worker %s: done, quitting", self.key) self.log.info("Worker %s: done, quitting", self.key)
elif max_idle_time is not None:
self.log.info("Worker %s: idle for %d seconds, quitting", self.key, max_idle_time)
break break
job, queue = result job, queue = result
@ -841,7 +847,7 @@ class Worker:
pass pass
self.scheduler._process.join() self.scheduler._process.join()
def dequeue_job_and_maintain_ttl(self, timeout: int) -> Tuple['Job', 'Queue']: def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']:
"""Dequeues a job while maintaining the TTL. """Dequeues a job while maintaining the TTL.
Returns: Returns:
@ -854,6 +860,8 @@ class Worker:
self.procline('Listening on ' + qnames) self.procline('Listening on ' + qnames)
self.log.debug('*** Listening on %s...', green(qnames)) self.log.debug('*** Listening on %s...', green(qnames))
connection_wait_time = 1.0 connection_wait_time = 1.0
idle_since = utcnow()
idle_time_left = max_idle_time
while True: while True:
try: try:
self.heartbeat() self.heartbeat()
@ -861,6 +869,9 @@ class Worker:
if self.should_run_maintenance_tasks: if self.should_run_maintenance_tasks:
self.run_maintenance_tasks() self.run_maintenance_tasks()
if timeout is not None and idle_time_left is not None:
timeout = min(timeout, idle_time_left)
self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}") self.log.debug(f"Dequeueing jobs on queues {green(qnames)} and timeout {timeout}")
result = self.queue_class.dequeue_any( result = self.queue_class.dequeue_any(
self._ordered_queues, self._ordered_queues,
@ -880,7 +891,11 @@ class Worker:
break break
except DequeueTimeout: except DequeueTimeout:
pass if max_idle_time is not None:
idle_for = (utcnow() - idle_since).total_seconds()
idle_time_left = math.ceil(max_idle_time - idle_for)
if idle_time_left <= 0:
break
except redis.exceptions.ConnectionError as conn_err: except redis.exceptions.ConnectionError as conn_err:
self.log.error( self.log.error(
'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time

@ -608,6 +608,31 @@ class TestWorker(RQTestCase):
# Should not have created evidence of execution # Should not have created evidence of execution
self.assertEqual(os.path.exists(SENTINEL_FILE), False) self.assertEqual(os.path.exists(SENTINEL_FILE), False)
@slow
def test_max_idle_time(self):
q = Queue()
w = Worker([q])
q.enqueue(say_hello, args=('Frank',))
self.assertIsNotNone(w.dequeue_job_and_maintain_ttl(1))
# idle for 1 second
self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=1))
# idle for 3 seconds
now = utcnow()
self.assertIsNone(w.dequeue_job_and_maintain_ttl(1, max_idle_time=3))
self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
# idle for 2 seconds because idle_time is less than timeout
now = utcnow()
self.assertIsNone(w.dequeue_job_and_maintain_ttl(3, max_idle_time=2))
self.assertLess((utcnow()-now).total_seconds(), 4) # 4 for some buffer
# idle for 3 seconds because idle_time is less than two rounds of timeout
now = utcnow()
self.assertIsNone(w.dequeue_job_and_maintain_ttl(2, max_idle_time=3))
self.assertLess((utcnow()-now).total_seconds(), 5) # 5 for some buffer
@slow # noqa @slow # noqa
def test_timeouts(self): def test_timeouts(self):
"""Worker kills jobs after timeout.""" """Worker kills jobs after timeout."""
@ -640,7 +665,6 @@ class TestWorker(RQTestCase):
q = Queue() q = Queue()
w = Worker([q]) w = Worker([q])
# Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self): def test_worker_ttl_param_resolves_timeout(self):

Loading…
Cancel
Save