diff --git a/rq/command.py b/rq/command.py index b98082c..0f8ef6e 100644 --- a/rq/command.py +++ b/rq/command.py @@ -15,9 +15,16 @@ from rq.job import Job PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' -def send_command(connection: 'Redis', worker_name: str, command, **kwargs): +def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs): """ - Use connection' pubsub mechanism to send a command + Sends a command to a worker. + A command is just a string, availble commands are: + - `shutdown`: Shuts down a worker + - `kill-horse`: Command for the worker to kill the current working horse + - `stop-job`: A command for the worker to stop the currently running job + + The command string will be parsed into a dictionary and send to a PubSub Topic. + Workers listen to the PubSub, and `handle` the specific command. Args: connection (Redis): A Redis Connection @@ -41,7 +48,7 @@ def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]: def send_shutdown_command(connection: 'Redis', worker_name: str): """ - Sends a shutdown command to the pubsub topic. + Sends a command to shutdown a worker. Args: connection (Redis): A Redis Connection @@ -77,7 +84,7 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None): def handle_command(worker: 'Worker', payload: Dict[Any, Any]): - """Parses payload and routes commands + """Parses payload and routes commands to the worker. Args: worker (Worker): The worker to use diff --git a/rq/connections.py b/rq/connections.py index fd88099..c5ebc20 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,5 +1,5 @@ from contextlib import contextmanager -import typing as t +from typing import Optional import warnings from redis import Redis @@ -11,16 +11,21 @@ class NoRedisConnectionException(Exception): @contextmanager -def Connection(connection: t.Optional['Redis'] = None): # noqa +def Connection(connection: Optional['Redis'] = None): # noqa """The context manager for handling connections in a clean way. It will push the connection to the LocalStack, and pop the connection when leaving the context + Example: - ..codeblock:python:: - with Connection(): - w = Worker() - w.work() + ..codeblock:python:: + + with Connection(): + w = Worker() + w.work() + + This method is deprecated on version 1.12.0 and will be removed in the future. + Pass the connection to the worker explicitly to handle Redis Connections. Args: connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. @@ -41,7 +46,7 @@ def Connection(connection: t.Optional['Redis'] = None): # noqa def push_connection(redis: 'Redis'): """ - Pushes the given connection on the stack. + Pushes the given connection to the stack. Args: redis (Redis): A Redis connection @@ -59,13 +64,13 @@ def pop_connection() -> 'Redis': return _connection_stack.pop() -def use_connection(redis: t.Optional['Redis'] = None): +def use_connection(redis: Optional['Redis'] = None): """ Clears the stack and uses the given connection. Protects against mixed use of use_connection() and stacked connection contexts. Args: - redis (t.Optional[Redis], optional): A Redis Connection. Defaults to None. + redis (Optional[Redis], optional): A Redis Connection. Defaults to None. """ assert len(_connection_stack) <= 1, \ 'You should not mix Connection contexts with use_connection()' @@ -87,13 +92,13 @@ def get_current_connection() -> 'Redis': return _connection_stack.top -def resolve_connection(connection: t.Optional['Redis'] = None) -> 'Redis': +def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis': """ Convenience function to resolve the given or the current connection. Raises an exception if it cannot resolve a connection now. Args: - connection (t.Optional[Redis], optional): A Redis connection. Defaults to None. + connection (Optional[Redis], optional): A Redis connection. Defaults to None. Raises: NoRedisConnectionException: If connection couldn't be resolved. diff --git a/rq/job.py b/rq/job.py index cfd7418..9104747 100644 --- a/rq/job.py +++ b/rq/job.py @@ -556,8 +556,8 @@ class Job: self._success_callback = UNEVALUATED self._failure_callback_name = None self._failure_callback = UNEVALUATED - self.description = None - self.origin = None + self.description: Optional[str] = None + self.origin: Optional[str] = None self.enqueued_at: Optional[datetime] = None self.started_at: Optional[datetime] = None self.ended_at: Optional[datetime] = None @@ -570,9 +570,9 @@ class Job: self.worker_name: Optional[str] = None self._status = None self._dependency_ids: List[str] = [] - self.meta = {} + self.meta: Optional[Dict] = {} self.serializer = resolve_serializer(serializer) - self.retries_left = None + self.retries_left: Optional[int] = None self.retry_intervals: Optional[List[int]] = None self.redis_server_version: Optional[Tuple[int, int, int]] = None self.last_heartbeat: Optional[datetime] = None diff --git a/rq/logutils.py b/rq/logutils.py index b3f78b0..36a404d 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -1,13 +1,23 @@ import logging import sys +from typing import Union from rq.utils import ColorizingStreamHandler from rq.defaults import (DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) -def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, name='rq.worker'): +def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'): + """Sets up a log handler. + + Args: + level (Union[int, str, None], optional): The log level. + Access an integer level (10-50) or a string level ("info", "debug" etc). Defaults to None. + date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S'). + log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). + name (str, optional): The looger name. Defaults to 'rq.worker'. + """ logger = logging.getLogger(name) if not _has_effective_handler(logger): @@ -27,12 +37,15 @@ def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT, logger.setLevel(level if isinstance(level, int) else level.upper()) -def _has_effective_handler(logger): +def _has_effective_handler(logger) -> bool: """ Checks if a logger has a handler that will catch its messages in its logger hierarchy. - :param `logging.Logger` logger: The logger to be checked. - :return: True if a handler is found for the logger, False otherwise. - :rtype: bool + + Args: + logger (logging.Logger): The logger to be checked. + + Returns: + is_configured (bool): True if a handler is found for the logger, False otherwise. """ while True: if logger.handlers: diff --git a/rq/queue.py b/rq/queue.py index 08da3b1..54b72a0 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,23 +1,24 @@ import uuid import sys import warnings -import typing as t import logging from collections import namedtuple -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta from functools import total_ordering - +from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union from redis import WatchError -if t.TYPE_CHECKING: +if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline + from .job import Retry from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus +from .types import FunctionReferenceType, JobDependencyType from .serializers import resolve_serializer from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact @@ -43,14 +44,22 @@ class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout" @total_ordering class Queue: - job_class: t.Type['Job'] = Job + job_class: Type['Job'] = Job DEFAULT_TIMEOUT: int = 180 # Default timeout seconds. redis_queue_namespace_prefix: str = 'rq:queue:' redis_queues_keys: str = 'rq:queues' @classmethod - def all(cls, connection: t.Optional['Redis'] = None, job_class: t.Optional[t.Type['Job']] = None, serializer=None): + def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']: """Returns an iterable of all Queues. + + Args: + connection (Optional[Redis], optional): The Redis Connection. Defaults to None. + job_class (Optional[Job], optional): The Job class to use. Defaults to None. + serializer (optional): The serializer to use. Defaults to None. + + Returns: + queues (List[Queue]): A list of all queues. """ connection = resolve_connection(connection) @@ -59,16 +68,28 @@ class Queue: connection=connection, job_class=job_class, serializer=serializer) - return [to_queue(rq_key) - for rq_key in connection.smembers(cls.redis_queues_keys) - if rq_key] + all_registerd_queues = connection.smembers(cls.redis_queues_keys) + all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key] + return all_queues @classmethod - def from_queue_key(cls, queue_key, connection: t.Optional['Redis'] = None, - job_class: t.Optional[t.Type['Job']] = None, serializer=None): + def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their Redis keys. + + Args: + queue_key (str): The queue key + connection (Optional[Redis], optional): Redis connection. Defaults to None. + job_class (Optional[Job], optional): Job class. Defaults to None. + serializer (Any, optional): Serializer. Defaults to None. + + Raises: + ValueError: If the queue_key doesn't start with the defined prefix + + Returns: + queue (Queue): The Queue object """ prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): @@ -76,8 +97,19 @@ class Queue: name = queue_key[len(prefix):] return cls(name, connection=connection, job_class=job_class, serializer=serializer) - def __init__(self, name='default', default_timeout=None, connection: t.Optional['Redis'] = None, - is_async=True, job_class=None, serializer=None, **kwargs): + def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None, + is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs): + """Initializes a Queue object. + + Args: + name (str, optional): The queue name. Defaults to 'default'. + default_timeout (Optional[int], optional): Queue's default timeout. Defaults to None. + connection (Optional[Redis], optional): Redis connection. Defaults to None. + is_async (bool, optional): Whether jobs should run "async" (using the worker). + If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. + job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. + serializer (Any, optional): Serializer. Defaults to None. + """ self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name @@ -97,7 +129,7 @@ class Queue: self.job_class = job_class self.serializer = resolve_serializer(serializer) - self.redis_server_version = None + self.redis_server_version: Optional[Tuple[int, int, int]] = None def __len__(self): return self.count @@ -111,8 +143,12 @@ class Queue: def __iter__(self): yield self - def get_redis_server_version(self): - """Return Redis server version of connection""" + def get_redis_server_version(self) -> Tuple[int, int, int]: + """Return Redis server version of connection + + Returns: + redis_version (Tuple): A tuple with the parsed Redis version (eg: (5,0,0)) + """ if not self.redis_server_version: self.redis_server_version = get_version(self.connection) return self.redis_server_version @@ -127,14 +163,28 @@ class Queue: """Redis key used to indicate this queue has been cleaned.""" return 'rq:clean_registries:%s' % self.name - def acquire_cleaning_lock(self): + def acquire_cleaning_lock(self) -> bool: """Returns a boolean indicating whether a lock to clean this queue is acquired. A lock expires in 899 seconds (15 minutes - 1 second) + + Returns: + lock_acquired (bool) """ - return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899) + lock_acquired = self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899) + if not lock_acquired: + return False + return lock_acquired def empty(self): - """Removes all messages on the queue.""" + """Removes all messages on the queue. + This is currently being done using a Lua script, + which iterates all queue messages and deletes the jobs and it's dependents. + It registers the Lua script and calls it. + Even though is currently being returned, this is not strictly necessary. + + Returns: + script (...): The Lua Script is called. + """ script = """ local prefix = "{0}" local q = KEYS[1] @@ -156,7 +206,11 @@ class Queue: return script(keys=[self.key]) def delete(self, delete_jobs: bool = True): - """Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first.""" + """Deletes the queue. + + Args: + delete_jobs (bool): If true, removes all the associated messages on the queue first. + """ if delete_jobs: self.empty() @@ -165,8 +219,12 @@ class Queue: pipeline.delete(self._key) pipeline.execute() - def is_empty(self): - """Returns whether the current queue is empty.""" + def is_empty(self) -> bool: + """Returns whether the current queue is empty. + + Returns: + is_empty (bool): Whether the queue is empty + """ return self.count == 0 @property @@ -174,7 +232,17 @@ class Queue: """Returns whether the current queue is async.""" return bool(self._is_async) - def fetch_job(self, job_id: str): + def fetch_job(self, job_id: str) -> Optional['Job']: + """Fetch a single job by Job ID. + If the job key is not found, will run the `remove` method, to exclude the key. + If the job has the same name as as the current job origin, returns the Job + + Args: + job_id (str): The Job ID + + Returns: + job (Optional[Job]): The job if found + """ try: job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: @@ -183,13 +251,19 @@ class Queue: if job.origin == self.name: return job - def get_job_position(self, job_or_id: t.Union[Job, str]): + def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]: """Returns the position of a job within the queue Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of worse than O(N) and should not be used for very long job queues. Redis and redis-py version afterwards should support the LPOS command handling job positions within Redis c implementation. + + Args: + job_or_id (Union[Job, str]): The Job instance or Job ID + + Returns: + _type_: _description_ """ job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id @@ -204,8 +278,16 @@ class Queue: return self.job_ids.index(job_id) return None - def get_job_ids(self, offset: int = 0, length: int = -1): - """Returns a slice of job IDs in the queue.""" + def get_job_ids(self, offset: int = 0, length: int = -1) -> List[str]: + """Returns a slice of job IDs in the queue. + + Args: + offset (int, optional): The offset. Defaults to 0. + length (int, optional): The slice length. Defaults to -1 (last element). + + Returns: + _type_: _description_ + """ start = offset if length >= 0: end = offset + (length - 1) @@ -219,18 +301,26 @@ class Queue: self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") return job_ids - def get_jobs(self, offset: int = 0, length: int = -1): - """Returns a slice of jobs in the queue.""" + def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']: + """Returns a slice of jobs in the queue. + + Args: + offset (int, optional): The offset. Defaults to 0. + length (int, optional): The slice length. Defaults to -1. + + Returns: + _type_: _description_ + """ job_ids = self.get_job_ids(offset, length) return compact([self.fetch_job(job_id) for job_id in job_ids]) @property - def job_ids(self) -> t.List[str]: + def job_ids(self) -> List[str]: """Returns a list of all job IDS in the queue.""" return self.get_job_ids() @property - def jobs(self) -> t.List['Job']: + def jobs(self) -> List['Job']: """Returns a list of all (valid) jobs in the queue.""" return self.get_jobs() @@ -276,8 +366,16 @@ class Queue: from rq.registry import CanceledJobRegistry return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) - def remove(self, job_or_id: t.Union['Job', str], pipeline: t.Optional['Pipeline'] = None): - """Removes Job from queue, accepts either a Job instance or ID.""" + def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None): + """Removes Job from queue, accepts either a Job instance or ID. + + Args: + job_or_id (Union[Job, str]): The Job instance or Job ID string. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + + Returns: + _type_: _description_ + """ job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id if pipeline is not None: @@ -286,8 +384,8 @@ class Queue: return self.connection.lrem(self.key, 1, job_id) def compact(self): - """Removes all "dead" jobs from the queue by cycling through it, while - guaranteeing FIFO semantics. + """Removes all "dead" jobs from the queue by cycling through it, + while guaranteeing FIFO semantics. """ COMPACT_QUEUE = '{0}_compact:{1}'.format( self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa @@ -300,9 +398,15 @@ class Queue: if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id: str, pipeline: t.Optional['Pipeline'] = None, at_front=False): + def push_job_id(self, job_id: str, pipeline: Optional['Pipeline'] = None, at_front: bool = False): """Pushes a job ID on the corresponding Redis queue. - 'at_front' allows you to push the job onto the front instead of the back of the queue""" + 'at_front' allows you to push the job onto the front instead of the back of the queue + + Args: + job_id (str): The Job ID + pipeline (Optional[Pipeline], optional): The Redis Pipeline to use. Defaults to None. + at_front (bool, optional): Whether to push the job to front of the queue. Defaults to False. + """ connection = pipeline if pipeline is not None else self.connection if at_front: result = connection.lpush(self.key, job_id) @@ -310,12 +414,38 @@ class Queue: result = connection.rpush(self.key, job_id) self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") - def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, - result_ttl=None, ttl=None, failure_ttl=None, - description=None, depends_on=None, job_id=None, - meta=None, status=JobStatus.QUEUED, retry=None, *, - on_success=None, on_failure=None) -> Job: - """Creates a job based on parameters given.""" + def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None, + job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None) -> Job: + """Creates a job based on parameters given + + Args: + func (FunctionReferenceType): The function referce: a callable or the path. + args (Union[Tuple, List, None], optional): The `*args` to pass to the function. Defaults to None. + kwargs (Optional[Dict], optional): The `**kwargs` to pass to the function. Defaults to None. + timeout (Optional[int], optional): Function timeout. Defaults to None. + result_ttl (Optional[int], optional): Result time to live. Defaults to None. + ttl (Optional[int], optional): Time to live. Defaults to None. + failure_ttl (Optional[int], optional): Failure time to live. Defaults to None. + description (Optional[str], optional): The description. Defaults to None. + depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None. + job_id (Optional[str], optional): Job ID. Defaults to None. + meta (Optional[Dict], optional): Job metadata. Defaults to None. + status (JobStatus, optional): Job status. Defaults to JobStatus.QUEUED. + retry (Optional[Retry], optional): The Retry Object. Defaults to None. + on_success (Optional[Callable], optional): On success callable. Defaults to None. + on_failure (Optional[Callable], optional): On failure callable. Defaults to None. + + Raises: + ValueError: If the timeout is 0 + ValueError: If the job TTL is 0 or negative + + Returns: + Job: The created job + """ timeout = parse_timeout(timeout) if timeout is None: @@ -345,14 +475,22 @@ class Queue: return job - def setup_dependencies(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None): - # If a _dependent_ job depends on any unfinished job, register all the - # _dependent_ job's dependencies instead of enqueueing it. - # - # `Job#fetch_dependencies` sets WATCH on all dependencies. If - # WatchError is raised in the when the pipeline is executed, that means - # something else has modified either the set of dependencies or the - # status of one of them. In this case, we simply retry. + def setup_dependencies(self, job: 'Job', pipeline: Optional['Pipeline'] = None) -> 'Job': + """If a _dependent_ job depends on any unfinished job, register all the + _dependent_ job's dependencies instead of enqueueing it. + + `Job#fetch_dependencies` sets WATCH on all dependencies. If + WatchError is raised in the when the pipeline is executed, that means + something else has modified either the set of dependencies or the + status of one of them. In this case, we simply retry. + + Args: + job (Job): The job + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + + Returns: + job (Job): The Job + """ if len(job._dependency_ids) > 0: orig_status = job.get_status(refresh=False) pipe = pipeline if pipeline is not None else self.connection.pipeline() @@ -397,16 +535,39 @@ class Queue: pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job - def enqueue_call(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, - result_ttl=None, ttl=None, failure_ttl=None, description=None, - depends_on=None, job_id: str = None, at_front: bool = False, meta=None, - retry=None, on_success=None, on_failure=None, pipeline=None) -> Job: + def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job: """Creates a job to represent the delayed function call and enqueues it. It is much like `.enqueue()`, except that it takes the function's args and kwargs as explicit arguments. Any kwargs passed to this function contain options for RQ itself. - """ + + Args: + func (FunctionReferenceType): The reference to the function + args (Union[Tuple, List, None], optional): THe `*args` to pass to the function. Defaults to None. + kwargs (Optional[Dict], optional): THe `**kwargs` to pass to the function. Defaults to None. + timeout (Optional[int], optional): Function timeout. Defaults to None. + result_ttl (Optional[int], optional): Result time to live. Defaults to None. + ttl (Optional[int], optional): Time to live. Defaults to None. + failure_ttl (Optional[int], optional): Failure time to live. Defaults to None. + description (Optional[str], optional): The job description. Defaults to None. + depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None. + job_id (Optional[str], optional): The job ID. Defaults to None. + at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False. + meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None. + retry (Optional[Retry], optional): Retry object. Defaults to None. + on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None. + on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + + Returns: + Job: The enqueued Job + """ job = self.create_job( func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, @@ -425,12 +586,33 @@ class Queue: return job @staticmethod - def prepare_data(func, args=None, kwargs=None, timeout=None, - result_ttl=None, ttl=None, failure_ttl=None, - description=None, job_id=None, - at_front=False, meta=None, retry=None, on_success=None, on_failure=None) -> EnqueueData: - # Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples - # And can keep this logic within EnqueueData + def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None, + at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData: + """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples + And can keep this logic within EnqueueData + + Args: + func (FunctionReferenceType): The reference to the function + args (Union[Tuple, List, None], optional): THe `*args` to pass to the function. Defaults to None. + kwargs (Optional[Dict], optional): THe `**kwargs` to pass to the function. Defaults to None. + timeout (Optional[int], optional): Function timeout. Defaults to None. + result_ttl (Optional[int], optional): Result time to live. Defaults to None. + ttl (Optional[int], optional): Time to live. Defaults to None. + failure_ttl (Optional[int], optional): Failure time to live. Defaults to None. + description (Optional[str], optional): The job description. Defaults to None. + job_id (Optional[str], optional): The job ID. Defaults to None. + at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False. + meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None. + retry (Optional[Retry], optional): Retry object. Defaults to None. + on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None. + on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None. + + Returns: + EnqueueData: The EnqueueData + """ return EnqueueData( func, args, kwargs, timeout, result_ttl, ttl, failure_ttl, @@ -438,10 +620,16 @@ class Queue: at_front, meta, retry, on_success, on_failure ) - def enqueue_many(self, job_datas, pipeline: t.Optional['Pipeline'] = None) -> t.List[Job]: - """ - Creates multiple jobs (created via `Queue.prepare_data` calls) + def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]: + """Creates multiple jobs (created via `Queue.prepare_data` calls) to represent the delayed function calls and enqueues them. + + Args: + job_datas (List['EnqueueData']): A List of job data + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + + Returns: + List[Job]: A list of enqueued jobs """ pipe = pipeline if pipeline is not None else self.connection.pipeline() jobs = [ @@ -467,6 +655,14 @@ class Queue: return jobs def run_job(self, job: 'Job') -> Job: + """Run the job + + Args: + job (Job): The job to run + + Returns: + Job: _description_ + """ job.perform() job.set_status(JobStatus.FINISHED) job.save(include_meta=False) @@ -474,7 +670,7 @@ class Queue: return job @classmethod - def parse_args(cls, f: t.Union[t.Callable[..., t.Any], str], *args, **kwargs): + def parse_args(cls, f: 'FunctionReferenceType', *args, **kwargs): """ Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()` @@ -484,6 +680,11 @@ class Queue: * A reference to an object's instance method * A string, representing the location of a function (must be meaningful to the import context of the workers) + + Args: + f (FunctionReferenceType): The function reference + args (*args): function args + kwargs (*kwargs): function kargs """ if not isinstance(f, str) and f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed ' @@ -514,9 +715,18 @@ class Queue: depends_on, job_id, at_front, meta, retry, on_success, on_failure, pipeline, args, kwargs) - def enqueue(self, f, *args, **kwargs): - """Creates a job to represent the delayed function call and enqueues it.""" + def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job': + """Creates a job to represent the delayed function call and enqueues it. + Receives the same parameters accepted by the `enqueue_call` method. + + Args: + f (FunctionReferenceType): The function reference + args (*args): function args + kwargs (*kwargs): function kargs + Returns: + job (Job): The created Job + """ (f, timeout, description, result_ttl, ttl, failure_ttl, depends_on, job_id, at_front, meta, retry, on_success, on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) @@ -530,8 +740,15 @@ class Queue: ) def enqueue_at(self, datetime: datetime, f, *args, **kwargs): - """Schedules a job to be enqueued at specified time""" + """Schedules a job to be enqueued at specified time + Args: + datetime (datetime): _description_ + f (_type_): _description_ + + Returns: + _type_: _description_ + """ (f, timeout, description, result_ttl, ttl, failure_ttl, depends_on, job_id, at_front, meta, retry, on_success, on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) @@ -544,8 +761,17 @@ class Queue: job.enqueue_at_front = True return self.schedule_job(job, datetime, pipeline=pipeline) - def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None): - """Puts job on ScheduledJobRegistry""" + def schedule_job(self, job: 'Job', datetime: datetime, pipeline: Optional['Pipeline'] = None): + """Puts job on ScheduledJobRegistry + + Args: + job (Job): _description_ + datetime (datetime): _description_ + pipeline (Optional[Pipeline], optional): _description_. Defaults to None. + + Returns: + _type_: _description_ + """ from .registry import ScheduledJobRegistry registry = ScheduledJobRegistry(queue=self) @@ -559,15 +785,31 @@ class Queue: pipe.execute() return job - def enqueue_in(self, time_delta, func, *args, **kwargs): - """Schedules a job to be executed in a given `timedelta` object""" + def enqueue_in(self, time_delta: timedelta, func: 'FunctionReferenceType', *args, **kwargs) -> 'Job': + """Schedules a job to be executed in a given `timedelta` object + + Args: + time_delta (timedelta): The timedelta object + func (FunctionReferenceType): The function reference + + Returns: + job (Job): The enqueued Job + """ return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) - def enqueue_job(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, at_front: bool = False) -> Job: + def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution. If Queue is instantiated with is_async=False, job is executed immediately. + + Args: + job (Job): The job to enqueue + pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None. + at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False. + + Returns: + Job: The enqued job """ pipe = pipeline if pipeline is not None else self.connection.pipeline() @@ -596,6 +838,14 @@ class Queue: return job def run_sync(self, job: 'Job') -> 'Job': + """Run a job synchronously, meaning on the same process the method was called. + + Args: + job (Job): The job to run + + Returns: + Job: The job instance + """ with self.connection.pipeline() as pipeline: job.prepare_for_execution('sync', pipeline) @@ -611,12 +861,17 @@ class Queue: return job - def enqueue_dependents(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, exclude_job_id=None): + def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None): """Enqueues all jobs in the given job's dependents set and clears it. When called without a pipeline, this method uses WATCH/MULTI/EXEC. If you pass a pipeline, only MULTI is called. The rest is up to the caller. + + Args: + job (Job): The Job to enqueue the dependents + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + exclude_job_id (Optional[str], optional): Whether to exclude the job id. Defaults to None. """ from .registry import DeferredJobRegistry @@ -690,12 +945,16 @@ class Queue: # handle it raise - def pop_job_id(self): - """Pops a given job ID from this Redis queue.""" + def pop_job_id(self) -> Optional[str]: + """Pops a given job ID from this Redis queue. + + Returns: + job_id (str): The job id + """ return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys, timeout: int, connection: t.Optional['Redis'] = None): + def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -707,6 +966,18 @@ class Queue: The timeout parameter is interpreted as follows: None - non-blocking (return immediately) > 0 - maximum number of seconds to block + + Args: + queue_keys (_type_): _description_ + timeout (int): _description_ + connection (Optional[Redis], optional): _description_. Defaults to None. + + Raises: + ValueError: If timeout of 0 was passed + DequeueTimeout: BLPOP Timeout + + Returns: + _type_: _description_ """ connection = resolve_connection(connection) if timeout is not None: # blocking variant @@ -728,8 +999,8 @@ class Queue: return None @classmethod - def dequeue_any(cls, queues, timeout, connection: t.Optional['Redis'] = None, - job_class: t.Optional[t.Type['Job']] = None, serializer=None): + def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -739,8 +1010,21 @@ class Queue: None. See the documentation of cls.lpop for the interpretation of timeout. + + Args: + queues (List[Queue]): List of queue objects + timeout (int): Timeout for the LPOP + connection (Optional[Redis], optional): Redis Connection. Defaults to None. + job_class (Optional[Job], optional): The job classification. Defaults to None. + serializer (Any, optional): Serializer to use. Defaults to None. + + Raises: + e: Any exception + + Returns: + job, queue (Tuple[Job, Queue]): A tuple of Job, Queue """ - job_class = backend_class(cls, 'job_class', override=job_class) + job_class: Job = backend_class(cls, 'job_class', override=job_class) while True: queue_keys = [q.key for q in queues] diff --git a/rq/registry.py b/rq/registry.py index d3c141d..f581776 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,10 +1,10 @@ -import typing as t import calendar from rq.serializers import resolve_serializer import time from datetime import datetime, timedelta, timezone +from typing import TYPE_CHECKING, Any, List, Optional, Type, Union -if t.TYPE_CHECKING: +if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline @@ -26,8 +26,8 @@ class BaseRegistry: job_class = Job key_template = 'rq:registry:{0}' - def __init__(self, name='default', connection: t.Optional['Redis'] = None, - job_class: t.Optional[t.Type['Job']] = None, queue=None, serializer=None): + def __init__(self, name: str = 'default', connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None): if queue: self.name = queue.name self.connection = resolve_connection(queue.connection) @@ -50,7 +50,7 @@ class BaseRegistry: self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) - def __contains__(self, item: t.Union[str, 'Job']): + def __contains__(self, item: Union[str, 'Job']): """ Returns a boolean indicating registry contains the given job instance or job id. @@ -64,19 +64,26 @@ class BaseRegistry: return self.connection.zscore(self.key, job_id) is not None @property - def count(self): - """Returns the number of jobs in this registry""" + def count(self) -> int: + """Returns the number of jobs in this registry + + Returns: + int: _description_ + """ self.cleanup() return self.connection.zcard(self.key) - def add(self, job: 'Job', ttl=0, pipeline: t.Optional['Pipeline'] = None, xx: bool = False): + def add(self, job: 'Job', ttl=0, pipeline: Optional['Pipeline'] = None, xx: bool = False) -> int: """Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf Args: job (Job): The Job to add ttl (int, optional): The time to live. Defaults to 0. - pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. xx (bool, optional): .... Defaults to False. + + Returns: + result (int): The ZADD command result """ score = ttl if ttl < 0 else current_timestamp() + ttl if score == -1: @@ -86,12 +93,12 @@ class BaseRegistry: return self.connection.zadd(self.key, {job.id: score}, xx=xx) - def remove(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, delete_job: bool = False): + def remove(self, job: 'Job', pipeline: Optional['Pipeline'] = None, delete_job: bool = False): """Removes job from registry and deletes it if `delete_job == True` Args: job (Job): The Job to remove from the registry - pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. delete_job (bool, optional): If should delete the job.. Defaults to False. """ connection = pipeline if pipeline is not None else self.connection @@ -105,7 +112,7 @@ class BaseRegistry: job_instance.delete() return result - def get_expired_job_ids(self, timestamp: t.Optional[float] = None): + def get_expired_job_ids(self, timestamp: Optional[float] = None): """Returns job ids whose score are less than current timestamp. Returns ids for jobs with an expiry time earlier than timestamp, @@ -113,11 +120,19 @@ class BaseRegistry: time if unspecified. """ score = timestamp if timestamp is not None else current_timestamp() - return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, score)] + expired_jobs = self.connection.zrangebyscore(self.key, 0, score) + return [as_text(job_id) for job_id in expired_jobs] def get_job_ids(self, start: int = 0, end: int = -1): - """Returns list of all job ids.""" + """Returns list of all job ids. + + Args: + start (int, optional): _description_. Defaults to 0. + end (int, optional): _description_. Defaults to -1. + + Returns: + _type_: _description_ + """ self.cleanup() return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] @@ -135,11 +150,11 @@ class BaseRegistry: score = self.connection.zscore(self.key, job.id) return datetime.utcfromtimestamp(score) - def requeue(self, job_or_id: t.Union['Job', str], at_front: bool = False) -> 'Job': + def requeue(self, job_or_id: Union['Job', str], at_front: bool = False) -> 'Job': """Requeues the job with the given job ID. Args: - job_or_id (t.Union['Job', str]): The Job or the Job ID + job_or_id (Union['Job', str]): The Job or the Job ID at_front (bool, optional): If the Job should be put at the front of the queue. Defaults to False. Raises: @@ -182,7 +197,7 @@ class StartedJobRegistry(BaseRegistry): """ key_template = 'rq:wip:{0}' - def cleanup(self, timestamp: t.Optional[float] = None): + def cleanup(self, timestamp: Optional[float] = None): """Remove expired jobs from registry and add them to FailedJobRegistry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -233,7 +248,7 @@ class FinishedJobRegistry(BaseRegistry): """ key_template = 'rq:finished:{0}' - def cleanup(self, timestamp: t.Optional[float] = None): + def cleanup(self, timestamp: Optional[float] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -250,7 +265,7 @@ class FailedJobRegistry(BaseRegistry): """ key_template = 'rq:failed:{0}' - def cleanup(self, timestamp: t.Optional[float] = None): + def cleanup(self, timestamp: Optional[float] = None): """Remove expired jobs from registry. Removes jobs with an expiry time earlier than timestamp, specified as @@ -260,7 +275,7 @@ class FailedJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) - def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None, + def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None, _save_exc_to_job: bool = False): """ Adds a job to a registry with expiry time of now + ttl. @@ -310,7 +325,7 @@ class ScheduledJobRegistry(BaseRegistry): # make sense in this context self.get_jobs_to_enqueue = self.get_expired_job_ids - def schedule(self, job: 'Job', scheduled_datetime, pipeline: t.Optional['Pipeline'] = None): + def schedule(self, job: 'Job', scheduled_datetime, pipeline: Optional['Pipeline'] = None): """ Adds job to registry, scored by its execution time (in UTC). If datetime has no tzinfo, it will assume localtimezone. @@ -329,20 +344,43 @@ class ScheduledJobRegistry(BaseRegistry): implemented in BaseRegistry.""" pass - def remove_jobs(self, timestamp: t.Optional[datetime] = None, pipeline: t.Optional['Pipeline'] = None): - """Remove jobs whose timestamp is in the past from registry.""" + def remove_jobs(self, timestamp: Optional[datetime] = None, pipeline: Optional['Pipeline'] = None): + """Remove jobs whose timestamp is in the past from registry. + + Args: + timestamp (Optional[datetime], optional): The timestamp. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis pipeline. Defaults to None. + """ connection = pipeline if pipeline is not None else self.connection score = timestamp if timestamp is not None else current_timestamp() return connection.zremrangebyscore(self.key, 0, score) - def get_jobs_to_schedule(self, timestamp: t.Optional[datetime] = None, chunk_size: int = 1000): - """Get's a list of job IDs that should be scheduled.""" + def get_jobs_to_schedule(self, timestamp: Optional[datetime] = None, chunk_size: int = 1000) -> List[str]: + """Get's a list of job IDs that should be scheduled. + + Args: + timestamp (Optional[datetime], optional): _description_. Defaults to None. + chunk_size (int, optional): _description_. Defaults to 1000. + + Returns: + jobs (List[str]): A list of Job ids + """ score = timestamp if timestamp is not None else current_timestamp() - return [as_text(job_id) for job_id in - self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)] + jobs_to_schedule = self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size) + return [as_text(job_id) for job_id in jobs_to_schedule] - def get_scheduled_time(self, job_or_id: t.Union['Job', str]): - """Returns datetime (UTC) at which job is scheduled to be enqueued""" + def get_scheduled_time(self, job_or_id: Union['Job', str]) -> datetime: + """Returns datetime (UTC) at which job is scheduled to be enqueued + + Args: + job_or_id (Union[Job, str]): The Job instance or Job ID + + Raises: + NoSuchJobError: If the job was not found + + Returns: + datetime (datetime): The scheduled time as datetime object + """ if isinstance(job_or_id, self.job_class): job_id = job_or_id.id else: @@ -358,7 +396,7 @@ class ScheduledJobRegistry(BaseRegistry): class CanceledJobRegistry(BaseRegistry): key_template = 'rq:canceled:{0}' - def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None): + def get_expired_job_ids(self, timestamp: Optional[datetime] = None): raise NotImplementedError def cleanup(self): @@ -368,8 +406,12 @@ class CanceledJobRegistry(BaseRegistry): pass -def clean_registries(queue): - """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.""" +def clean_registries(queue: 'Queue'): + """Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue. + + Args: + queue (Queue): The queue to clean + """ registry = FinishedJobRegistry(name=queue.name, connection=queue.connection, job_class=queue.job_class, diff --git a/rq/results.py b/rq/results.py index cf85a5b..0df131a 100644 --- a/rq/results.py +++ b/rq/results.py @@ -25,7 +25,7 @@ class Result(object): def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None, created_at: Optional[datetime] = None, return_value: Optional[Any] = None, - exc_string: Optional[str] = None, serializer=None): + exc_string: Optional[str] = None, serializer=None): self.return_value = return_value self.exc_string = exc_string self.type = type diff --git a/rq/suspension.py b/rq/suspension.py index c49dd15..77df9b8 100644 --- a/rq/suspension.py +++ b/rq/suspension.py @@ -1,6 +1,6 @@ -import typing as t +from typing import TYPE_CHECKING, Optional -if t.TYPE_CHECKING: +if TYPE_CHECKING: from redis import Redis from rq.worker import Worker @@ -8,30 +8,30 @@ if t.TYPE_CHECKING: WORKERS_SUSPENDED = 'rq:suspended' -def is_suspended(connection: 'Redis', worker: t.Optional['Worker'] = None): +def is_suspended(connection: 'Redis', worker: Optional['Worker'] = None): """Checks whether a Worker is suspendeed on a given connection + PS: pipeline returns a list of responses + Ref: https://github.com/andymccurdy/redis-py#pipelines Args: connection (Redis): The Redis Connection - worker (t.Optional[Worker], optional): The Worker. Defaults to None. + worker (Optional[Worker], optional): The Worker. Defaults to None. """ with connection.pipeline() as pipeline: if worker is not None: worker.heartbeat(pipeline=pipeline) pipeline.exists(WORKERS_SUSPENDED) - # pipeline returns a list of responses - # https://github.com/andymccurdy/redis-py#pipelines return pipeline.execute()[-1] -def suspend(connection: 'Redis', ttl: int = None): +def suspend(connection: 'Redis', ttl: Optional[int] = None): """ Suspends. TTL of 0 will invalidate right away. Args: connection (Redis): The Redis connection to use.. - ttl (int): time to live in seconds. Defaults to `None` + ttl (Optional[int], optional): time to live in seconds. Defaults to `None` """ connection.set(WORKERS_SUSPENDED, 1) if ttl is not None: diff --git a/rq/timeouts.py b/rq/timeouts.py index 0e4c5d5..c9f1e44 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -92,7 +92,8 @@ class TimerDeathPenalty(BaseDeathPenalty): self._exception.__init__ = init_with_message def new_timer(self): - """Returns a new timer since timers can only be used once.""" + """Returns a new timer since timers can only be used once. + """ return threading.Timer(self._timeout, self.handle_death_penalty) def handle_death_penalty(self): @@ -110,11 +111,13 @@ class TimerDeathPenalty(BaseDeathPenalty): raise SystemError("PyThreadState_SetAsyncExc failed") def setup_death_penalty(self): - """Starts the timer.""" + """Starts the timer. + """ self._timer = self.new_timer() self._timer.start() def cancel_death_penalty(self): - """Cancels the timer.""" + """Cancels the timer. + """ self._timer.cancel() self._timer = None diff --git a/rq/utils.py b/rq/utils.py index 7f9e90b..8993487 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -12,10 +12,11 @@ import logging import numbers import sys import datetime as dt -import typing as t from collections.abc import Iterable +from typing import TYPE_CHECKING, Dict, List, Optional, Any, Callable, Tuple, Union -if t.TYPE_CHECKING: + +if TYPE_CHECKING: from redis import Redis from redis.exceptions import ResponseError @@ -125,7 +126,7 @@ class ColorizingStreamHandler(logging.StreamHandler): return message -def compact(lst: t.List[t.Any]) -> t.List[t.Any]: +def compact(lst: List[Any]) -> List[Any]: """Excludes `None` values from a list-like object. Args: @@ -137,7 +138,18 @@ def compact(lst: t.List[t.Any]) -> t.List[t.Any]: return [item for item in lst if item is not None] -def as_text(v): +def as_text(v: Union[bytes, str]) -> Optional[str]: + """Converts a bytes value to a string using `utf-8`. + + Args: + v (Union[bytes, str]): The value (bytes or string) + + Raises: + ValueError: If the value is not bytes or string + + Returns: + value (Optional[str]): Either the decoded string or None + """ if v is None: return None elif isinstance(v, bytes): @@ -148,7 +160,7 @@ def as_text(v): raise ValueError('Unknown type %r' % type(v)) -def decode_redis_hash(h) -> t.Dict[str, t.Any]: +def decode_redis_hash(h) -> Dict[str, Any]: """Decodes the Redis hash, ensuring that keys are strings Most importantly, decodes bytes strings, ensuring the dict has str keys. @@ -156,12 +168,12 @@ def decode_redis_hash(h) -> t.Dict[str, t.Any]: h (Dict[Any, Any]): The Redis hash Returns: - Dict[str, t.Any]: The decoded Redis data (Dictionary) + Dict[str, Any]: The decoded Redis data (Dictionary) """ return dict((as_text(k), h[k]) for k in h) -def import_attribute(name: str) -> t.Callable[..., t.Any]: +def import_attribute(name: str) -> Callable[..., Any]: """Returns an attribute from a dotted path name. Example: `path.to.func`. When the attribute we look for is a staticmethod, module name in its @@ -181,7 +193,7 @@ def import_attribute(name: str) -> t.Callable[..., t.Any]: ValueError: If no module is found or invalid attribute name. Returns: - t.Any: An attribute (normally a Callable) + Any: An attribute (normally a Callable) """ name_bits = name.split('.') module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]] @@ -218,7 +230,8 @@ def utcnow(): def now(): - """Return now in UTC""" + """Return now in UTC + """ return datetime.datetime.now(datetime.timezone.utc) @@ -237,9 +250,8 @@ def utcparse(string: str) -> dt.datetime: return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ') -def first(iterable: t.Iterable, default=None, key=None): - """ - Return first element of `iterable` that evaluates true, else return None +def first(iterable: Iterable, default=None, key=None): + """Return first element of `iterable` that evaluates true, else return None (or an optional default value). >>> first([0, False, None, [], (), 42]) @@ -263,6 +275,13 @@ def first(iterable: t.Iterable, default=None, key=None): >>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0) 4 + Args: + iterable (t.Iterable): _description_ + default (_type_, optional): _description_. Defaults to None. + key (_type_, optional): _description_. Defaults to None. + + Returns: + _type_: _description_ """ if key is None: for el in iterable: @@ -276,26 +295,51 @@ def first(iterable: t.Iterable, default=None, key=None): return default -def is_nonstring_iterable(obj: t.Any) -> bool: - """Returns whether the obj is an iterable, but not a string""" - return isinstance(obj, Iterable) and not isinstance(obj, str) +def is_nonstring_iterable(obj: Any) -> bool: + """Returns whether the obj is an iterable, but not a string + Args: + obj (Any): _description_ -def ensure_list(obj: t.Any) -> t.List: + Returns: + bool: _description_ """ - When passed an iterable of objects, does nothing, otherwise, it returns + return isinstance(obj, Iterable) and not isinstance(obj, str) + + +def ensure_list(obj: Any) -> List: + """When passed an iterable of objects, does nothing, otherwise, it returns a list with just that object in it. + + Args: + obj (Any): _description_ + + Returns: + List: _description_ """ return obj if is_nonstring_iterable(obj) else [obj] def current_timestamp() -> int: - """Returns current UTC timestamp""" + """Returns current UTC timestamp + + Returns: + int: _description_ + """ return calendar.timegm(datetime.datetime.utcnow().utctimetuple()) def backend_class(holder, default_name, override=None): - """Get a backend class using its default attribute name or an override""" + """Get a backend class using its default attribute name or an override + + Args: + holder (_type_): _description_ + default_name (_type_): _description_ + override (_type_, optional): _description_. Defaults to None. + + Returns: + _type_: _description_ + """ if override is None: return getattr(holder, default_name) elif isinstance(override, str): @@ -304,15 +348,16 @@ def backend_class(holder, default_name, override=None): return override -def str_to_date(date_str: t.Optional[str]) -> t.Union[dt.datetime, t.Any]: +def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]: if not date_str: return else: return utcparse(date_str.decode()) -def parse_timeout(timeout: t.Any): - """Transfer all kinds of timeout format to an integer representing seconds""" +def parse_timeout(timeout: Any): + """Transfer all kinds of timeout format to an integer representing seconds + """ if not isinstance(timeout, numbers.Integral) and timeout is not None: try: timeout = int(timeout) @@ -329,7 +374,7 @@ def parse_timeout(timeout: t.Any): return timeout -def get_version(connection: 'Redis') -> t.Tuple[int, int, int]: +def get_version(connection: 'Redis') -> Tuple[int, int, int]: """ Returns tuple of Redis server version. This function also correctly handles 4 digit redis server versions. @@ -354,15 +399,23 @@ def get_version(connection: 'Redis') -> t.Tuple[int, int, int]: def ceildiv(a, b): - """Ceiling division. Returns the ceiling of the quotient of a division operation""" + """Ceiling division. Returns the ceiling of the quotient of a division operation + + Args: + a (_type_): _description_ + b (_type_): _description_ + + Returns: + _type_: _description_ + """ return -(-a // b) -def split_list(a_list: t.List[t.Any], segment_size: int): +def split_list(a_list: List[Any], segment_size: int): """Splits a list into multiple smaller lists having size `segment_size` Args: - a_list (t.List[t.Any]): A list to split + a_list (List[Any]): A list to split segment_size (int): The segment size to split into Yields: @@ -372,12 +425,12 @@ def split_list(a_list: t.List[t.Any], segment_size: int): yield a_list[i:i + segment_size] -def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str: +def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: """Truncate arguments with representation longer than max_length Args: data (str): The data to truncate - max_length (t.Optional[int], optional): The max length. Defaults to None. + max_length (Optional[int], optional): The max length. Defaults to None. Returns: truncated (str): The truncated string @@ -387,8 +440,8 @@ def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str: return (data[:max_length] + '...') if len(data) > max_length else data -def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.Any, t.Any], - max_length: t.Optional[int] = None) -> t.Optional[str]: +def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], + max_length: Optional[int] = None) -> Optional[str]: """ Returns a string representation of the call, formatted as a regular Python function invocation statement. If max_length is not None, truncate @@ -396,8 +449,8 @@ def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.An Args: func_name (str): The funtion name - args (t.Any): The function arguments - kwargs (t.Dict[t.Any, t.Any]): The function kwargs + args (Any): The function arguments + kwargs (Dict[Any, Any]): The function kwargs max_length (int, optional): The max length. Defaults to None. Returns: diff --git a/rq/worker.py b/rq/worker.py index 6d01371..49c53fd 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -8,9 +8,11 @@ import sys import time import traceback import warnings -import typing as t -if t.TYPE_CHECKING: + +from typing import TYPE_CHECKING, Type, List, Dict, Any + +if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline @@ -112,13 +114,16 @@ class Worker: @classmethod def all( cls, - connection: t.Optional['Redis'] = None, - job_class: t.Type['Job'] = None, - queue_class: t.Optional[t.Type['Queue']] = None, - queue: t.Optional['Queue'] = None, + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + queue_class: Optional[Type['Queue']] = None, + queue: Optional['Queue'] = None, serializer=None - ) -> t.List['Worker']: + ) -> List['Worker']: """Returns an iterable of all Workers. + + Returns: + workers (List[Worker]): A list of workers """ if queue: connection = queue.connection @@ -134,18 +139,35 @@ class Worker: return compact(workers) @classmethod - def all_keys(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None): + def all_keys(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> List[str]: + """List of worker keys + + Args: + connection (Optional[Redis], optional): A Redis Connection. Defaults to None. + queue (Optional[Queue], optional): The Queue. Defaults to None. + + Returns: + list_keys (List[str]): A list of worker keys + """ return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod - def count(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None): - """Returns the number of workers by queue or connection""" + def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): + """Returns the number of workers by queue or connection + + Args: + connection (Optional['Redis'], optional): _description_. Defaults to None. + queue (Optional['Queue'], optional): _description_. Defaults to None. + + Returns: + _type_: _description_ + """ return len(get_keys(queue=queue, connection=connection)) @classmethod - def find_by_key(cls, worker_key: str, connection: t.Optional['Redis'] = None, job_class: t.Type['Job'] = None, - queue_class: t.Type['Queue'] = None, serializer=None): + def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None, + queue_class: Type['Queue'] = None, serializer=None): """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. @@ -168,9 +190,9 @@ class Worker: return worker - def __init__(self, queues, name: t.Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, - connection: t.Optional['Redis'] = None, exc_handler=None, exception_handlers=None, - default_worker_ttl=DEFAULT_WORKER_TTL, job_class: t.Type['Job'] = None, + def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, + connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None, queue_class=None, log_job_description: bool = True, job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, disable_default_exception_handler: bool = False, @@ -217,7 +239,7 @@ class Worker: self.total_working_time: int = 0 self.current_job_working_time: int = 0 self.birth_date = None - self.scheduler: t.Optional[RQScheduler] = None + self.scheduler: Optional[RQScheduler] = None self.pubsub = None self.pubsub_thread = None @@ -384,7 +406,7 @@ class Worker: if death_timestamp is not None: return utcparse(as_text(death_timestamp)) - def set_state(self, state, pipeline: t.Optional['Pipeline'] = None): + def set_state(self, state, pipeline: Optional['Pipeline'] = None): self._state = state connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'state', state) @@ -410,12 +432,12 @@ class Worker: state = property(_get_state, _set_state) - def set_current_job_working_time(self, current_job_working_time, pipeline: t.Optional['Pipeline'] = None): + def set_current_job_working_time(self, current_job_working_time, pipeline: Optional['Pipeline'] = None): self.current_job_working_time = current_job_working_time connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'current_job_working_time', current_job_working_time) - def set_current_job_id(self, job_id: t.Optional[str] = None, pipeline: t.Optional['Pipeline'] = None): + def set_current_job_id(self, job_id: Optional[str] = None, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection if job_id is None: @@ -423,7 +445,7 @@ class Worker: else: connection.hset(self.key, 'current_job', job_id) - def get_current_job_id(self, pipeline: t.Optional['Pipeline'] = None): + def get_current_job_id(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection return as_text(connection.hget(self.key, 'current_job')) @@ -719,7 +741,7 @@ class Worker: self.heartbeat() return result - def heartbeat(self, timeout=None, pipeline: t.Optional['Pipeline'] = None): + def heartbeat(self, timeout=None, pipeline: Optional['Pipeline'] = None): """Specifies a new worker timeout, typically by extending the expiration time of the worker, effectively making this a "heartbeat" to not expire the worker until the timeout passes. @@ -777,11 +799,11 @@ class Worker: job_class=self.job_class, serializer=self.serializer) for queue in queues.split(',')] - def increment_failed_job_count(self, pipeline: t.Optional['Pipeline'] = None): + def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'failed_job_count', 1) - def increment_successful_job_count(self, pipeline: t.Optional['Pipeline'] = None): + def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection connection.hincrby(self.key, 'successful_job_count', 1) @@ -962,7 +984,7 @@ class Worker: self.procline(msg.format(job.func_name, job.origin, time.time())) def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, - exc_string=''): + exc_string=''): """ Handles the failure or an executing job by: 1. Setting the job status to failed diff --git a/rq/worker_registration.py b/rq/worker_registration.py index c1e1181..94c41a1 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -1,6 +1,6 @@ -import typing as t +from typing import Optional, TYPE_CHECKING, Any, Set -if t.TYPE_CHECKING: +if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline from .worker import Worker @@ -15,13 +15,13 @@ REDIS_WORKER_KEYS = 'rq:workers' MAX_KEYS = 1000 -def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): +def register(worker: 'Worker', pipeline: Optional['Pipeline'] = None): """ Store worker key in Redis so we can easily discover active workers. Args: worker (Worker): The Worker - pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. + pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None. """ connection = pipeline if pipeline is not None else worker.connection connection.sadd(worker.redis_workers_keys, worker.key) @@ -30,12 +30,12 @@ def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): connection.sadd(redis_key, worker.key) -def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): +def unregister(worker: 'Worker', pipeline: Optional['Pipeline'] = None): """Remove Worker key from Redis Args: worker (Worker): The Worker - pipeline (t.Optional[Pipeline], optional): Redis Pipeline. Defaults to None. + pipeline (Optional[Pipeline], optional): Redis Pipeline. Defaults to None. """ if pipeline is None: connection = worker.connection.pipeline() @@ -51,12 +51,12 @@ def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None): connection.execute() -def get_keys(queue: t.Optional['Queue'] = None, connection: t.Optional['Redis'] = None) -> t.Set[t.Any]: +def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[Any]: """Returns a list of worker keys for a given queue. Args: - queue (t.Optional['Queue'], optional): The Queue. Defaults to None. - connection (t.Optional['Redis'], optional): The Redis Connection. Defaults to None. + queue (Optional['Queue'], optional): The Queue. Defaults to None. + connection (Optional['Redis'], optional): The Redis Connection. Defaults to None. Raises: ValueError: If no Queue or Connection is provided.