diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 9c0ac1f..33ecf18 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -19,6 +19,6 @@ def cleanup_ghosts(conn=None): conn = conn if conn else get_current_connection() for worker in Worker.all(connection=conn): if conn.ttl(worker.key) == -1: - ttl = worker.default_worker_ttl + ttl = worker.worker_ttl conn.expire(worker.key, ttl) logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl)) diff --git a/rq/worker.py b/rq/worker.py index 8720504..3dd7bd6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -223,7 +223,11 @@ class Worker: serializer=None, ): # noqa - connection = self._set_connection(connection, default_worker_ttl) + self.default_result_ttl = default_result_ttl + self.worker_ttl = default_worker_ttl + self.job_monitoring_interval = job_monitoring_interval + + connection = self._set_connection(connection) self.connection = connection self.redis_server_version = None @@ -246,10 +250,6 @@ class Worker: self._ordered_queues = self.queues[:] self._exc_handlers: List[Callable] = [] - self.default_result_ttl = default_result_ttl - self.default_worker_ttl = default_worker_ttl - self.job_monitoring_interval = job_monitoring_interval - self._state: str = 'starting' self._is_horse: bool = False self._horse_pid: int = 0 @@ -296,21 +296,19 @@ class Worker: elif exception_handlers is not None: self.push_exc_handler(exception_handlers) - def _set_connection(self, connection: Optional['Redis'], default_worker_ttl: int) -> 'Redis': + def _set_connection(self, connection: Optional['Redis']) -> 'Redis': """Configures the Redis connection to have a socket timeout. This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP). If the connection provided already has a `socket_timeout` defined, skips. Args: connection (Optional[Redis]): The Redis Connection. - default_worker_ttl (int): The Default Worker TTL """ if connection is None: connection = get_current_connection() current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout") if current_socket_timeout is None: - timeout = self._get_timeout(default_worker_ttl) + 10 - timeout_config = {"socket_timeout": timeout} + timeout_config = {"socket_timeout": self.connection_timeout} connection.connection_pool.connection_kwargs.update(timeout_config) return connection @@ -361,11 +359,13 @@ class Worker: """Returns whether or not this is the worker or the work horse.""" return self._is_horse - def _get_timeout(self, worker_ttl: Optional[int] = None) -> int: - timeout = DEFAULT_WORKER_TTL - if worker_ttl: - timeout = worker_ttl - return max(1, timeout - 15) + @property + def dequeue_timeout(self) -> int: + return max(1, self.worker_ttl - 15) + + @property + def connection_timeout(self) -> int: + return self.dequeue_timeout + 10 def procline(self, message): """Changes the current procname for the process. @@ -405,7 +405,7 @@ class Worker: p.hmset(key, mapping) worker_registration.register(self, p) - p.expire(key, self.default_worker_ttl + 60) + p.expire(key, self.worker_ttl + 60) p.execute() def register_death(self): @@ -680,7 +680,7 @@ class Worker: self.log.info('Worker %s: stopping on request', self.key) break - timeout = None if burst else self._get_timeout() + timeout = None if burst else self.dequeue_timeout result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: if burst: @@ -789,10 +789,10 @@ class Worker: The next heartbeat should come before this time, or the worker will die (at least from the monitoring dashboards). - If no timeout is given, the default_worker_ttl will be used to update + If no timeout is given, the worker_ttl will be used to update the expiration time of the worker. """ - timeout = timeout or self.default_worker_ttl + 60 + timeout = timeout or self.worker_ttl + 60 connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) diff --git a/tests/test_worker.py b/tests/test_worker.py index b0bb3d7..cfce473 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -642,6 +642,16 @@ class TestWorker(RQTestCase): # Put it on the queue with a timeout value self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) + def test_worker_ttl_param_resolves_timeout(self): + """Ensures the worker_ttl param is being considered in the dequeue_timeout and connection_timeout params, takes into account 15 seconds gap (hard coded)""" + q = Queue() + w = Worker([q]) + self.assertEqual(w.dequeue_timeout, 405) + self.assertEqual(w.connection_timeout, 415) + w = Worker([q], default_worker_ttl=500) + self.assertEqual(w.dequeue_timeout, 485) + self.assertEqual(w.connection_timeout, 495) + def test_worker_sets_result_ttl(self): """Ensure that Worker properly sets result_ttl for individual jobs.""" q = Queue()