diff --git a/rq/cli/cli.py b/rq/cli/cli.py index fbfe2bf..55421f6 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -31,13 +31,12 @@ from rq.defaults import ( DEFAULT_MAINTENANCE_TASK_INTERVAL, ) from rq.exceptions import InvalidJobOperationError +from rq.job import JobStatus +from rq.logutils import blue from rq.registry import FailedJobRegistry, clean_registries -from rq.utils import import_attribute, get_call_string, make_colorizer from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended +from rq.utils import import_attribute, get_call_string from rq.worker_registration import clean_worker_registry -from rq.job import JobStatus - -blue = make_colorizer('darkblue') @click.group() diff --git a/rq/job.py b/rq/job.py index 07ec6cb..a0574e6 100644 --- a/rq/job.py +++ b/rq/job.py @@ -158,7 +158,7 @@ class Job: serializer=None, *, on_success: Optional[Union['Callback', Callable[..., Any]]] = None, - on_failure: Optional[Union['Callback', Callable[..., Any]]] = None + on_failure: Optional[Union['Callback', Callable[..., Any]]] = None, ) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -239,16 +239,18 @@ class Job: if on_success: if not isinstance(on_success, Callback): - warnings.warn('Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', - DeprecationWarning) + warnings.warn( + 'Passing a `Callable` `on_success` is deprecated, pass `Callback` instead', DeprecationWarning + ) on_success = Callback(on_success) # backward compatibility job._success_callback_name = on_success.name job._success_callback_timeout = on_success.timeout if on_failure: if not isinstance(on_failure, Callback): - warnings.warn('Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', - DeprecationWarning) + warnings.warn( + 'Passing a `Callable` `on_failure` is deprecated, pass `Callback` instead', DeprecationWarning + ) on_failure = Callback(on_failure) # backward compatibility job._failure_callback_name = on_failure.name job._failure_callback_timeout = on_failure.timeout @@ -302,7 +304,8 @@ class Job: status (JobStatus): The Job Status """ if refresh: - self._status = as_text(self.connection.hget(self.key, 'status')) + status = self.connection.hget(self.key, 'status') + self._status = as_text(status) if status else None return self._status def set_status(self, status: JobStatus, pipeline: Optional['Pipeline'] = None) -> None: @@ -872,9 +875,9 @@ class Job: self.data = raw_data self.created_at = str_to_date(obj.get('created_at')) - self.origin = as_text(obj.get('origin')) + self.origin = as_text(obj.get('origin')) if obj.get('origin') else None self.worker_name = obj.get('worker_name').decode() if obj.get('worker_name') else None - self.description = as_text(obj.get('description')) + self.description = as_text(obj.get('description')) if obj.get('description') else None self.enqueued_at = str_to_date(obj.get('enqueued_at')) self.started_at = str_to_date(obj.get('started_at')) self.ended_at = str_to_date(obj.get('ended_at')) @@ -1360,8 +1363,7 @@ class Job: self.success_callback(self, self.connection, result) def execute_failure_callback(self, death_penalty_class: Type[BaseDeathPenalty], *exc_info): - """Executes failure_callback with possible timeout - """ + """Executes failure_callback with possible timeout""" if not self.failure_callback: return @@ -1369,7 +1371,7 @@ class Job: try: with death_penalty_class(self.failure_callback_timeout, JobTimeoutException, job_id=self.id): self.failure_callback(self, self.connection, *exc_info) - except Exception: # noqa + except Exception: # noqa logger.exception(f'Job {self.id}: error while executing failure callback') raise diff --git a/rq/logutils.py b/rq/logutils.py index 33e0949..b36ece8 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -2,10 +2,109 @@ import logging import sys from typing import Union -from rq.utils import ColorizingStreamHandler from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT +class _Colorizer: + def __init__(self): + esc = "\x1b[" + + self.codes = {} + self.codes[""] = "" + self.codes["reset"] = esc + "39;49;00m" + + self.codes["bold"] = esc + "01m" + self.codes["faint"] = esc + "02m" + self.codes["standout"] = esc + "03m" + self.codes["underline"] = esc + "04m" + self.codes["blink"] = esc + "05m" + self.codes["overline"] = esc + "06m" + + dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] + light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] + + x = 30 + for d, l in zip(dark_colors, light_colors): + self.codes[d] = esc + "%im" % x + self.codes[l] = esc + "%i;01m" % x + x += 1 + + del d, l, x + + self.codes["darkteal"] = self.codes["turquoise"] + self.codes["darkyellow"] = self.codes["brown"] + self.codes["fuscia"] = self.codes["fuchsia"] + self.codes["white"] = self.codes["bold"] + + if hasattr(sys.stdout, "isatty"): + self.notty = not sys.stdout.isatty() + else: + self.notty = True + + def colorize(self, color_key, text): + if self.notty: + return text + else: + return self.codes[color_key] + text + self.codes["reset"] + + +colorizer = _Colorizer() + + +def make_colorizer(color: str): + """Creates a function that colorizes text with the given color. + + For example:: + + ..codeblock::python + + >>> green = make_colorizer('darkgreen') + >>> red = make_colorizer('red') + >>> + >>> # You can then use: + >>> print("It's either " + green('OK') + ' or ' + red('Oops')) + """ + + def inner(text): + return colorizer.colorize(color, text) + + return inner + + +green = make_colorizer('darkgreen') +yellow = make_colorizer('darkyellow') +blue = make_colorizer('darkblue') +red = make_colorizer('darkred') + + +class ColorizingStreamHandler(logging.StreamHandler): + levels = { + logging.WARNING: yellow, + logging.ERROR: red, + logging.CRITICAL: red, + } + + def __init__(self, exclude=None, *args, **kwargs): + self.exclude = exclude + super().__init__(*args, **kwargs) + + @property + def is_tty(self): + isatty = getattr(self.stream, 'isatty', None) + return isatty and isatty() + + def format(self, record): + message = logging.StreamHandler.format(self, record) + if self.is_tty: + # Don't colorize any traceback + parts = message.split('\n', 1) + parts[0] = " ".join([parts[0].split(" ", 1)[0], parts[0].split(" ", 1)[1]]) + + message = '\n'.join(parts) + + return message + + def setup_loghandlers( level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT, diff --git a/rq/queue.py b/rq/queue.py index 7a2737f..ebfdd47 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -22,13 +22,11 @@ from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus +from .logutils import blue, green, yellow from .types import FunctionReferenceType, JobDependencyType from .serializers import resolve_serializer -from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact +from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow, compact -green = make_colorizer('darkgreen') -yellow = make_colorizer('darkyellow') -blue = make_colorizer('darkblue') logger = logging.getLogger("rq.queue") @@ -71,8 +69,11 @@ class Queue: @classmethod def all( - cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, - serializer=None, death_penalty_class: Optional[Type[BaseDeathPenalty]] = None + cls, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer=None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> List['Queue']: """Returns an iterable of all Queues. @@ -89,8 +90,11 @@ class Queue: def to_queue(queue_key): return cls.from_queue_key( - as_text(queue_key), connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class + as_text(queue_key), + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) @@ -99,12 +103,12 @@ class Queue: @classmethod def from_queue_key( - cls, - queue_key: str, - connection: Optional['Redis'] = None, - job_class: Optional[Type['Job']] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their @@ -126,20 +130,25 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix):] - return cls(name, connection=connection, job_class=job_class, serializer=serializer, - death_penalty_class=death_penalty_class) + name = queue_key[len(prefix) :] + return cls( + name, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) def __init__( - self, - name: str = 'default', - default_timeout: Optional[int] = None, - connection: Optional['Redis'] = None, - is_async: bool = True, - job_class: Union[str, Type['Job'], None] = None, - serializer: Any = None, - death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, - **kwargs, + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + death_penalty_class: Type[BaseDeathPenalty] = UnixSignalDeathPenalty, + **kwargs, ): """Initializes a Queue object. @@ -207,6 +216,7 @@ class Queue: @property def scheduler_pid(self) -> int: from rq.scheduler import RQScheduler + pid = self.connection.get(RQScheduler.get_locking_key(self.name)) return int(pid.decode()) if pid is not None else None @@ -444,10 +454,10 @@ class Queue: self.connection.rename(self.key, COMPACT_QUEUE) while True: - job_id = as_text(self.connection.lpop(COMPACT_QUEUE)) + job_id = self.connection.lpop(COMPACT_QUEUE) if job_id is None: break - if self.job_class.exists(job_id, self.connection): + if self.job_class.exists(as_text(job_id), self.connection): self.connection.rpush(self.key, job_id) def push_job_id(self, job_id: str, pipeline: Optional['Pipeline'] = None, at_front: bool = False): @@ -469,23 +479,23 @@ class Queue: self.log.debug('Pushed job %s into %s', blue(job_id), green(self.name)) def create_job( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - meta: Optional[Dict] = None, - status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, - *, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> Job: """Creates a job based on parameters given @@ -611,23 +621,23 @@ class Queue: return job def enqueue_call( - self, - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, - pipeline: Optional['Pipeline'] = None, + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, ) -> Job: """Creates a job to represent the delayed function call and enqueues it. @@ -678,20 +688,20 @@ class Queue: @staticmethod def prepare_data( - func: 'FunctionReferenceType', - args: Union[Tuple, List, None] = None, - kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, - result_ttl: Optional[int] = None, - ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, - description: Optional[str] = None, - job_id: Optional[str] = None, - at_front: bool = False, - meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -1003,7 +1013,6 @@ class Queue: return self._enqueue_job(job, pipeline=pipeline, at_front=at_front) return job - def _enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution without checking dependencies. @@ -1073,7 +1082,7 @@ class Queue: return job def enqueue_dependents( - self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None ): """Enqueues all jobs in the given job's dependents set and clears it. @@ -1110,7 +1119,7 @@ class Queue: dependent_job_ids, connection=self.connection, serializer=self.serializer ) if dependent_job - and dependent_job.dependencies_are_met( + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -1210,13 +1219,13 @@ class Queue: @classmethod def dequeue_any( - cls, - queues: List['Queue'], - timeout: Optional[int], - connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, - serializer: Any = None, - death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, + cls, + queues: List['Queue'], + timeout: Optional[int], + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, + death_penalty_class: Optional[Type[BaseDeathPenalty]] = None, ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1250,8 +1259,13 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, - serializer=serializer, death_penalty_class=death_penalty_class) + queue = cls.from_queue_key( + queue_key, + connection=connection, + job_class=job_class, + serializer=serializer, + death_penalty_class=death_penalty_class, + ) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: diff --git a/rq/utils.py b/rq/utils.py index b10e262..ca51779 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -26,105 +26,6 @@ from .exceptions import TimeoutFormatError logger = logging.getLogger(__name__) -class _Colorizer: - def __init__(self): - esc = "\x1b[" - - self.codes = {} - self.codes[""] = "" - self.codes["reset"] = esc + "39;49;00m" - - self.codes["bold"] = esc + "01m" - self.codes["faint"] = esc + "02m" - self.codes["standout"] = esc + "03m" - self.codes["underline"] = esc + "04m" - self.codes["blink"] = esc + "05m" - self.codes["overline"] = esc + "06m" - - dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] - light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] - - x = 30 - for d, l in zip(dark_colors, light_colors): - self.codes[d] = esc + "%im" % x - self.codes[l] = esc + "%i;01m" % x - x += 1 - - del d, l, x - - self.codes["darkteal"] = self.codes["turquoise"] - self.codes["darkyellow"] = self.codes["brown"] - self.codes["fuscia"] = self.codes["fuchsia"] - self.codes["white"] = self.codes["bold"] - - if hasattr(sys.stdout, "isatty"): - self.notty = not sys.stdout.isatty() - else: - self.notty = True - - def reset_color(self): - return self.codes["reset"] - - def colorize(self, color_key, text): - if self.notty: - return text - else: - return self.codes[color_key] + text + self.codes["reset"] - - -colorizer = _Colorizer() - - -def make_colorizer(color: str): - """Creates a function that colorizes text with the given color. - - For example:: - - ..codeblock::python - - >>> green = make_colorizer('darkgreen') - >>> red = make_colorizer('red') - >>> - >>> # You can then use: - >>> print("It's either " + green('OK') + ' or ' + red('Oops')) - """ - - def inner(text): - return colorizer.colorize(color, text) - - return inner - - -class ColorizingStreamHandler(logging.StreamHandler): - levels = { - logging.WARNING: make_colorizer('darkyellow'), - logging.ERROR: make_colorizer('darkred'), - logging.CRITICAL: make_colorizer('darkred'), - } - - def __init__(self, exclude=None, *args, **kwargs): - self.exclude = exclude - super().__init__(*args, **kwargs) - - @property - def is_tty(self): - isatty = getattr(self.stream, 'isatty', None) - return isatty and isatty() - - def format(self, record): - message = logging.StreamHandler.format(self, record) - if self.is_tty: - colorize = self.levels.get(record.levelno, lambda x: x) - - # Don't colorize any traceback - parts = message.split('\n', 1) - parts[0] = " ".join([parts[0].split(" ", 1)[0], colorize(parts[0].split(" ", 1)[1])]) - - message = '\n'.join(parts) - - return message - - def compact(lst: List[Any]) -> List[Any]: """Excludes `None` values from a list-like object. @@ -137,7 +38,7 @@ def compact(lst: List[Any]) -> List[Any]: return [item for item in lst if item is not None] -def as_text(v: Union[bytes, str]) -> Optional[str]: +def as_text(v: Union[bytes, str]) -> str: """Converts a bytes value to a string using `utf-8`. Args: @@ -149,9 +50,7 @@ def as_text(v: Union[bytes, str]) -> Optional[str]: Returns: value (Optional[str]): Either the decoded string or None """ - if v is None: - return None - elif isinstance(v, bytes): + if isinstance(v, bytes): return v.decode('utf-8') elif isinstance(v, str): return v @@ -181,7 +80,7 @@ def import_attribute(name: str) -> Callable[..., Any]: E.g.: package_a.package_b.module_a.ClassA.my_static_method Thus we remove the bits from the end of the name until we can import it - + Args: name (str): The name (reference) to the path. diff --git a/rq/worker.py b/rq/worker.py index 34c6c95..2b41aad 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,8 +13,7 @@ import warnings from datetime import timedelta from enum import Enum from random import shuffle -from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, - Union) +from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union from uuid import uuid4 if TYPE_CHECKING: @@ -50,14 +49,23 @@ from .defaults import ( from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus -from .logutils import setup_loghandlers +from .logutils import blue, green, setup_loghandlers, yellow from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text +from .utils import ( + backend_class, + ensure_list, + get_version, + utcformat, + utcnow, + utcparse, + compact, + as_text, +) from .version import VERSION from .serializers import resolve_serializer @@ -70,10 +78,6 @@ except ImportError: pass -green = make_colorizer('darkgreen') -yellow = make_colorizer('darkyellow') -blue = make_colorizer('darkblue') - logger = logging.getLogger("rq.worker") @@ -112,12 +116,13 @@ class WorkerStatus(str, Enum): IDLE = 'idle' -class Worker: +class BaseWorker: redis_worker_namespace_prefix = 'rq:worker:' redis_workers_keys = worker_registration.REDIS_WORKER_KEYS death_penalty_class = UnixSignalDeathPenalty queue_class = Queue job_class = Job + # `log_result_lifespan` controls whether "Result is kept for XXX seconds" # messages are logged after every job, by default they are. log_result_lifespan = True @@ -182,6 +187,51 @@ class Worker: """ return len(worker_registration.get_keys(queue=queue, connection=connection)) + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = get_version(self.connection) + return self.redis_server_version + + def validate_queues(self): + """Sanity check for the given queues.""" + for queue in self.queues: + if not isinstance(queue, self.queue_class): + raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class)) + + def queue_names(self) -> List[str]: + """Returns the queue names of this worker's queues. + + Returns: + List[str]: The queue names. + """ + return [queue.name for queue in self.queues] + + def queue_keys(self) -> List[str]: + """Returns the Redis keys representing this worker's queues. + + Returns: + List[str]: The list of strings with queues keys + """ + return [queue.key for queue in self.queues] + + @property + def key(self): + """Returns the worker's Redis hash key.""" + return self.redis_worker_namespace_prefix + self.name + + @property + def pubsub_channel_name(self): + """Returns the worker's Redis hash key.""" + return PUBSUB_CHANNEL_TEMPLATE % self.name + + @property + def supports_redis_streams(self) -> bool: + """Only supported by Redis server >= 5.0 is required.""" + return self.get_redis_server_version() >= (5, 0, 0) + + +class Worker(BaseWorker): @classmethod def find_by_key( cls, @@ -249,7 +299,7 @@ class Worker: disable_default_exception_handler: bool = False, prepare_for_work: bool = True, serializer=None, - work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None + work_horse_killed_handler: Optional[Callable[[Job, int, int, 'struct_rusage'], None]] = None, ): # noqa self.default_result_ttl = default_result_ttl self.worker_ttl = default_worker_ttl @@ -267,8 +317,13 @@ class Worker: self.serializer = resolve_serializer(serializer) queues = [ - self.queue_class(name=q, connection=connection, job_class=self.job_class, - serializer=self.serializer, death_penalty_class=self.death_penalty_class,) + self.queue_class( + name=q, + connection=connection, + job_class=self.job_class, + serializer=self.serializer, + death_penalty_class=self.death_penalty_class, + ) if isinstance(q, str) else q for q in ensure_list(queues) @@ -344,49 +399,6 @@ class Worker: connection.connection_pool.connection_kwargs.update(timeout_config) return connection - def get_redis_server_version(self): - """Return Redis server version of connection""" - if not self.redis_server_version: - self.redis_server_version = get_version(self.connection) - return self.redis_server_version - - def validate_queues(self): - """Sanity check for the given queues.""" - for queue in self.queues: - if not isinstance(queue, self.queue_class): - raise TypeError('{0} is not of type {1} or string types'.format(queue, self.queue_class)) - - def queue_names(self) -> List[str]: - """Returns the queue names of this worker's queues. - - Returns: - List[str]: The queue names. - """ - return [queue.name for queue in self.queues] - - def queue_keys(self) -> List[str]: - """Returns the Redis keys representing this worker's queues. - - Returns: - List[str]: The list of strings with queues keys - """ - return [queue.key for queue in self.queues] - - @property - def key(self): - """Returns the worker's Redis hash key.""" - return self.redis_worker_namespace_prefix + self.name - - @property - def pubsub_channel_name(self): - """Returns the worker's Redis hash key.""" - return PUBSUB_CHANNEL_TEMPLATE % self.name - - @property - def supports_redis_streams(self) -> bool: - """Only supported by Redis server >= 5.0 is required.""" - return self.get_redis_server_version() >= (5, 0, 0) - @property def horse_pid(self): """The horse's process ID. Only available in the worker. Will return @@ -538,7 +550,10 @@ class Worker: job_id (Optional[str): The job id """ connection = pipeline if pipeline is not None else self.connection - return as_text(connection.hget(self.key, 'current_job')) + result = connection.hget(self.key, 'current_job') + if result is None: + return None + return as_text(result) def get_current_job(self) -> Optional['Job']: """Returns the currently executing job instance. @@ -706,7 +721,7 @@ class Worker: return if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN: pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] return if self._dequeue_strategy == DequeueStrategy.RANDOM: shuffle(self._ordered_queues) @@ -716,7 +731,7 @@ class Worker: self, logging_level: str = "INFO", date_format: str = DEFAULT_LOGGING_DATE_FORMAT, - log_format: str = DEFAULT_LOGGING_FORMAT + log_format: str = DEFAULT_LOGGING_FORMAT, ): """Bootstraps the worker. Runs the basic tasks that should run when the worker actually starts working. @@ -781,7 +796,7 @@ class Worker: max_jobs: Optional[int] = None, max_idle_time: Optional[int] = None, with_scheduler: bool = False, - dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT + dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT, ) -> bool: """Starts the work loop. @@ -881,7 +896,9 @@ class Worker: pass self.scheduler._process.join() - def dequeue_job_and_maintain_ttl(self, timeout: Optional[int], max_idle_time: Optional[int] = None) -> Tuple['Job', 'Queue']: + def dequeue_job_and_maintain_ttl( + self, timeout: Optional[int], max_idle_time: Optional[int] = None + ) -> Tuple['Job', 'Queue']: """Dequeues a job while maintaining the TTL. Returns: @@ -1167,10 +1184,7 @@ class Worker: self.log.warning('Moving job to FailedJobRegistry (%s)', exc_string) self.handle_work_horse_killed(job, retpid, ret_val, rusage) - self.handle_job_failure( - job, queue=queue, - exc_string=exc_string - ) + self.handle_job_failure(job, queue=queue, exc_string=exc_string) def execute_job(self, job: 'Job', queue: 'Queue'): """Spawns a work horse to perform the actual work and passes it a job. @@ -1458,9 +1472,7 @@ class Worker: extra.update({'queue': job.origin, 'job_id': job.id}) # func_name - self.log.error( - '[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra - ) + self.log.error('[Job %s]: exception raised while executing (%s)\n' + exc_string, job.id, func_name, extra=extra) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1598,7 +1610,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] class RandomWorker(Worker):