diff --git a/rq/worker.py b/rq/worker.py index 0eaae0a..6fdc6ca 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -28,7 +28,6 @@ import redis.exceptions from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection from .defaults import ( @@ -48,9 +47,8 @@ from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text from .version import VERSION -from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer try: @@ -132,7 +130,7 @@ class Worker: elif connection is None: connection = get_current_connection() - worker_keys = get_keys(queue=queue, connection=connection) + worker_keys = worker_registration.get_keys(queue=queue, connection=connection) workers = [ cls.find_by_key( key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer @@ -152,7 +150,7 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys """ - return [as_text(key) for key in get_keys(queue=queue, connection=connection)] + return [as_text(key) for key in worker_registration.get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int: @@ -165,7 +163,7 @@ class Worker: Returns: length (int): The queue length. """ - return len(get_keys(queue=queue, connection=connection)) + return len(worker_registration.get_keys(queue=queue, connection=connection)) @classmethod def find_by_key( @@ -818,14 +816,16 @@ class Worker: self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: - if not self.is_horse: - if self.scheduler: - self.stop_scheduler() - - self.register_death() - self.unsubscribe() + self.teardown() return bool(completed_jobs) + def teardown(self): + if not self.is_horse: + if self.scheduler: + self.stop_scheduler() + self.register_death() + self.unsubscribe() + def stop_scheduler(self): """Ensure scheduler process is stopped Will send the kill signal to scheduler process, @@ -1471,7 +1471,7 @@ class Worker: if queue.acquire_cleaning_lock(): self.log.info('Cleaning registries for queue: %s', queue.name) clean_registries(queue) - clean_worker_registry(queue) + worker_registration.clean_worker_registry(queue) self.last_cleaned_at = utcnow() @property