Fix - Use worker TTL for timeout (#1794)

* Use worker TTL for timeout

* add test

* renames

* test

* use dequeue_timeout
main
Rony Lutsky 2 years ago committed by GitHub
parent acdeff385d
commit b69ee10cbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,6 +19,6 @@ def cleanup_ghosts(conn=None):
conn = conn if conn else get_current_connection() conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn): for worker in Worker.all(connection=conn):
if conn.ttl(worker.key) == -1: if conn.ttl(worker.key) == -1:
ttl = worker.default_worker_ttl ttl = worker.worker_ttl
conn.expire(worker.key, ttl) conn.expire(worker.key, ttl)
logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl)) logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl))

@ -223,7 +223,11 @@ class Worker:
serializer=None, serializer=None,
): # noqa ): # noqa
connection = self._set_connection(connection, default_worker_ttl) self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval
connection = self._set_connection(connection)
self.connection = connection self.connection = connection
self.redis_server_version = None self.redis_server_version = None
@ -246,10 +250,6 @@ class Worker:
self._ordered_queues = self.queues[:] self._ordered_queues = self.queues[:]
self._exc_handlers: List[Callable] = [] self._exc_handlers: List[Callable] = []
self.default_result_ttl = default_result_ttl
self.default_worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval
self._state: str = 'starting' self._state: str = 'starting'
self._is_horse: bool = False self._is_horse: bool = False
self._horse_pid: int = 0 self._horse_pid: int = 0
@ -296,21 +296,19 @@ class Worker:
elif exception_handlers is not None: elif exception_handlers is not None:
self.push_exc_handler(exception_handlers) self.push_exc_handler(exception_handlers)
def _set_connection(self, connection: Optional['Redis'], default_worker_ttl: int) -> 'Redis': def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
"""Configures the Redis connection to have a socket timeout. """Configures the Redis connection to have a socket timeout.
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
If the connection provided already has a `socket_timeout` defined, skips. If the connection provided already has a `socket_timeout` defined, skips.
Args: Args:
connection (Optional[Redis]): The Redis Connection. connection (Optional[Redis]): The Redis Connection.
default_worker_ttl (int): The Default Worker TTL
""" """
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None: if current_socket_timeout is None:
timeout = self._get_timeout(default_worker_ttl) + 10 timeout_config = {"socket_timeout": self.connection_timeout}
timeout_config = {"socket_timeout": timeout}
connection.connection_pool.connection_kwargs.update(timeout_config) connection.connection_pool.connection_kwargs.update(timeout_config)
return connection return connection
@ -361,11 +359,13 @@ class Worker:
"""Returns whether or not this is the worker or the work horse.""" """Returns whether or not this is the worker or the work horse."""
return self._is_horse return self._is_horse
def _get_timeout(self, worker_ttl: Optional[int] = None) -> int: @property
timeout = DEFAULT_WORKER_TTL def dequeue_timeout(self) -> int:
if worker_ttl: return max(1, self.worker_ttl - 15)
timeout = worker_ttl
return max(1, timeout - 15) @property
def connection_timeout(self) -> int:
return self.dequeue_timeout + 10
def procline(self, message): def procline(self, message):
"""Changes the current procname for the process. """Changes the current procname for the process.
@ -405,7 +405,7 @@ class Worker:
p.hmset(key, mapping) p.hmset(key, mapping)
worker_registration.register(self, p) worker_registration.register(self, p)
p.expire(key, self.default_worker_ttl + 60) p.expire(key, self.worker_ttl + 60)
p.execute() p.execute()
def register_death(self): def register_death(self):
@ -680,7 +680,7 @@ class Worker:
self.log.info('Worker %s: stopping on request', self.key) self.log.info('Worker %s: stopping on request', self.key)
break break
timeout = None if burst else self._get_timeout() timeout = None if burst else self.dequeue_timeout
result = self.dequeue_job_and_maintain_ttl(timeout) result = self.dequeue_job_and_maintain_ttl(timeout)
if result is None: if result is None:
if burst: if burst:
@ -789,10 +789,10 @@ class Worker:
The next heartbeat should come before this time, or the worker will The next heartbeat should come before this time, or the worker will
die (at least from the monitoring dashboards). die (at least from the monitoring dashboards).
If no timeout is given, the default_worker_ttl will be used to update If no timeout is given, the worker_ttl will be used to update
the expiration time of the worker. the expiration time of the worker.
""" """
timeout = timeout or self.default_worker_ttl + 60 timeout = timeout or self.worker_ttl + 60
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout) connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))

@ -642,6 +642,16 @@ class TestWorker(RQTestCase):
# Put it on the queue with a timeout value # Put it on the queue with a timeout value
self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) self.assertIsNone(w.dequeue_job_and_maintain_ttl(None))
def test_worker_ttl_param_resolves_timeout(self):
"""Ensures the worker_ttl param is being considered in the dequeue_timeout and connection_timeout params, takes into account 15 seconds gap (hard coded)"""
q = Queue()
w = Worker([q])
self.assertEqual(w.dequeue_timeout, 405)
self.assertEqual(w.connection_timeout, 415)
w = Worker([q], default_worker_ttl=500)
self.assertEqual(w.dequeue_timeout, 485)
self.assertEqual(w.connection_timeout, 495)
def test_worker_sets_result_ttl(self): def test_worker_sets_result_ttl(self):
"""Ensure that Worker properly sets result_ttl for individual jobs.""" """Ensure that Worker properly sets result_ttl for individual jobs."""
q = Queue() q = Queue()

Loading…
Cancel
Save