refactor: new teardown worker method (#1820)

* refactor: new teardown worker method

* refactor: consistent worker_registration usage

* refactor: consistent utils import
main
lowercase00 2 years ago committed by GitHub
parent 5192b1d2e4
commit a985445486
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -28,7 +28,6 @@ import redis.exceptions
from . import worker_registration from . import worker_registration
from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import ( from .defaults import (
@ -48,9 +47,8 @@ from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler from .scheduler import RQScheduler
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
from .version import VERSION from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer from .serializers import resolve_serializer
try: try:
@ -132,7 +130,7 @@ class Worker:
elif connection is None: elif connection is None:
connection = get_current_connection() connection = get_current_connection()
worker_keys = get_keys(queue=queue, connection=connection) worker_keys = worker_registration.get_keys(queue=queue, connection=connection)
workers = [ workers = [
cls.find_by_key( cls.find_by_key(
key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer key, connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
@ -152,7 +150,7 @@ class Worker:
Returns: Returns:
list_keys (List[str]): A list of worker keys list_keys (List[str]): A list of worker keys
""" """
return [as_text(key) for key in get_keys(queue=queue, connection=connection)] return [as_text(key) for key in worker_registration.get_keys(queue=queue, connection=connection)]
@classmethod @classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int: def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> int:
@ -165,7 +163,7 @@ class Worker:
Returns: Returns:
length (int): The queue length. length (int): The queue length.
""" """
return len(get_keys(queue=queue, connection=connection)) return len(worker_registration.get_keys(queue=queue, connection=connection))
@classmethod @classmethod
def find_by_key( def find_by_key(
@ -818,13 +816,15 @@ class Worker:
self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
break break
finally: finally:
self.teardown()
return bool(completed_jobs)
def teardown(self):
if not self.is_horse: if not self.is_horse:
if self.scheduler: if self.scheduler:
self.stop_scheduler() self.stop_scheduler()
self.register_death() self.register_death()
self.unsubscribe() self.unsubscribe()
return bool(completed_jobs)
def stop_scheduler(self): def stop_scheduler(self):
"""Ensure scheduler process is stopped """Ensure scheduler process is stopped
@ -1471,7 +1471,7 @@ class Worker:
if queue.acquire_cleaning_lock(): if queue.acquire_cleaning_lock():
self.log.info('Cleaning registries for queue: %s', queue.name) self.log.info('Cleaning registries for queue: %s', queue.name)
clean_registries(queue) clean_registries(queue)
clean_worker_registry(queue) worker_registration.clean_worker_registry(queue)
self.last_cleaned_at = utcnow() self.last_cleaned_at = utcnow()
@property @property

Loading…
Cancel
Save