diff --git a/docs/docs/connections.md b/docs/docs/connections.md index 7e87b3a..8e563d9 100644 --- a/docs/docs/connections.md +++ b/docs/docs/connections.md @@ -142,3 +142,23 @@ SENTINEL: {'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379) 'DB': 2, 'MASTER_NAME': 'master'} ``` + + +### Timeout + +To avoid potential issues with hanging Redis commands, specifically the blocking `BLPOP` command, +RQ automatically sets a `socket_timeout` value that is 10 seconds higher than the `default_worker_ttl`. + +If you prefer to manually set the `socket_timeout` value, +make sure that the value being set is higher than the `default_worker_ttl` (which is 420 by default). + +```python +from redis import Redis +from rq import Queue + +conn = Redis('localhost', 6379, socket_timeout=500) +q = Queue(connection=conn) +``` + +Setting a `socket_timeout` with a lower value than the `default_worker_ttl` will cause a `TimeoutError` +since it will interrupt the worker while it gets new jobs from the queue. diff --git a/rq/worker.py b/rq/worker.py index 49c53fd..35355c7 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -197,10 +197,9 @@ class Worker: job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None): # noqa - if connection is None: - connection = get_current_connection() - self.connection = connection + connection = self._set_connection(connection, default_worker_ttl) + self.connection = connection self.redis_server_version = None self.job_class = backend_class(self, 'job_class', override=job_class) @@ -281,6 +280,24 @@ class Worker: elif exception_handlers is not None: self.push_exc_handler(exception_handlers) + def _set_connection(self, connection: Optional['Redis'], default_worker_ttl: int) -> 'Redis': + """Configures the Redis connection to have a socket timeout. + This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). + If the connection provided already has a `socket_timeout` defined, skips. + + Args: + connection (Optional[Redis]): The Redis Connection. + default_worker_ttl (int): The Default Worker TTL + """ + if connection is None: + connection = get_current_connection() + current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") + if current_socket_timeout is None: + timeout = self._get_timeout(default_worker_ttl) + 10 + timeout_config = {"socket_timeout": timeout} + connection.connection_pool.connection_kwargs.update(timeout_config) + return connection + def get_redis_server_version(self): """Return Redis server version of connection""" if not self.redis_server_version: @@ -328,6 +345,12 @@ class Worker: """Returns whether or not this is the worker or the work horse.""" return self._is_horse + def _get_timeout(self, worker_ttl: Optional[int] = None) -> int: + timeout = DEFAULT_WORKER_TTL + if worker_ttl: + timeout = worker_ttl + return max(1, timeout - 15) + def procline(self, message): """Changes the current procname for the process. @@ -626,7 +649,6 @@ class Worker: self.scheduler.start() self._install_signal_handlers() - try: while True: try: @@ -639,7 +661,7 @@ class Worker: self.log.info('Worker %s: stopping on request', self.key) break - timeout = None if burst else max(1, self.default_worker_ttl - 15) + timeout = None if burst else self._get_timeout() result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: @@ -660,6 +682,10 @@ class Worker: ) break + except redis.exceptions.TimeoutError: + self.log.error(f"Worker {self.key}: Redis connection timeout, quitting...") + break + except StopRequested: break diff --git a/tests/test_worker.py b/tests/test_worker.py index 1977c93..e001a95 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1049,6 +1049,13 @@ class TestWorker(RQTestCase): w.dequeue_job_and_maintain_ttl(10) self.assertNotIn("Frank", mock_logger_info.call_args[0][2]) + def test_worker_configures_socket_timeout(self): + """Ensures that the worker correctly updates Redis client connection to have a socket_timeout""" + q = Queue() + _ = Worker([q]) + connection_kwargs = q.connection.connection_pool.connection_kwargs + self.assertEqual(connection_kwargs["socket_timeout"], 415) + def test_worker_version(self): q = Queue() w = Worker([q])