Queue Docstrings (#1782)

* Docstrings

* Add commands docstrings

* More docstrings

* Fix Result.Type error

* Remove unfinished docstrings
main
lowercase00 2 years ago committed by GitHub
parent 271e7a8727
commit 436250d36b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -15,9 +15,16 @@ from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(connection: 'Redis', worker_name: str, command, **kwargs):
def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs):
"""
Use connection' pubsub mechanism to send a command
Sends a command to a worker.
A command is just a string, availble commands are:
- `shutdown`: Shuts down a worker
- `kill-horse`: Command for the worker to kill the current working horse
- `stop-job`: A command for the worker to stop the currently running job
The command string will be parsed into a dictionary and send to a PubSub Topic.
Workers listen to the PubSub, and `handle` the specific command.
Args:
connection (Redis): A Redis Connection
@ -41,7 +48,7 @@ def parse_payload(payload: Dict[Any, Any]) -> Dict[Any, Any]:
def send_shutdown_command(connection: 'Redis', worker_name: str):
"""
Sends a shutdown command to the pubsub topic.
Sends a command to shutdown a worker.
Args:
connection (Redis): A Redis Connection
@ -77,7 +84,7 @@ def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
def handle_command(worker: 'Worker', payload: Dict[Any, Any]):
"""Parses payload and routes commands
"""Parses payload and routes commands to the worker.
Args:
worker (Worker): The worker to use

@ -1,5 +1,5 @@
from contextlib import contextmanager
import typing as t
from typing import Optional
import warnings
from redis import Redis
@ -11,16 +11,21 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection: t.Optional['Redis'] = None): # noqa
def Connection(connection: Optional['Redis'] = None): # noqa
"""The context manager for handling connections in a clean way.
It will push the connection to the LocalStack, and pop the connection
when leaving the context
Example:
..codeblock:python::
with Connection():
w = Worker()
w.work()
..codeblock:python::
with Connection():
w = Worker()
w.work()
This method is deprecated on version 1.12.0 and will be removed in the future.
Pass the connection to the worker explicitly to handle Redis Connections.
Args:
connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
@ -41,7 +46,7 @@ def Connection(connection: t.Optional['Redis'] = None): # noqa
def push_connection(redis: 'Redis'):
"""
Pushes the given connection on the stack.
Pushes the given connection to the stack.
Args:
redis (Redis): A Redis connection
@ -59,13 +64,13 @@ def pop_connection() -> 'Redis':
return _connection_stack.pop()
def use_connection(redis: t.Optional['Redis'] = None):
def use_connection(redis: Optional['Redis'] = None):
"""
Clears the stack and uses the given connection. Protects against mixed
use of use_connection() and stacked connection contexts.
Args:
redis (t.Optional[Redis], optional): A Redis Connection. Defaults to None.
redis (Optional[Redis], optional): A Redis Connection. Defaults to None.
"""
assert len(_connection_stack) <= 1, \
'You should not mix Connection contexts with use_connection()'
@ -87,13 +92,13 @@ def get_current_connection() -> 'Redis':
return _connection_stack.top
def resolve_connection(connection: t.Optional['Redis'] = None) -> 'Redis':
def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
"""
Convenience function to resolve the given or the current connection.
Raises an exception if it cannot resolve a connection now.
Args:
connection (t.Optional[Redis], optional): A Redis connection. Defaults to None.
connection (Optional[Redis], optional): A Redis connection. Defaults to None.
Raises:
NoRedisConnectionException: If connection couldn't be resolved.

@ -556,8 +556,8 @@ class Job:
self._success_callback = UNEVALUATED
self._failure_callback_name = None
self._failure_callback = UNEVALUATED
self.description = None
self.origin = None
self.description: Optional[str] = None
self.origin: Optional[str] = None
self.enqueued_at: Optional[datetime] = None
self.started_at: Optional[datetime] = None
self.ended_at: Optional[datetime] = None
@ -570,9 +570,9 @@ class Job:
self.worker_name: Optional[str] = None
self._status = None
self._dependency_ids: List[str] = []
self.meta = {}
self.meta: Optional[Dict] = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
self.retries_left: Optional[int] = None
self.retry_intervals: Optional[List[int]] = None
self.redis_server_version: Optional[Tuple[int, int, int]] = None
self.last_heartbeat: Optional[datetime] = None

@ -1,13 +1,23 @@
import logging
import sys
from typing import Union
from rq.utils import ColorizingStreamHandler
from rq.defaults import (DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT)
def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, name='rq.worker'):
def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'):
"""Sets up a log handler.
Args:
level (Union[int, str, None], optional): The log level.
Access an integer level (10-50) or a string level ("info", "debug" etc). Defaults to None.
date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S').
log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s').
name (str, optional): The looger name. Defaults to 'rq.worker'.
"""
logger = logging.getLogger(name)
if not _has_effective_handler(logger):
@ -27,12 +37,15 @@ def setup_loghandlers(level=None, date_format=DEFAULT_LOGGING_DATE_FORMAT,
logger.setLevel(level if isinstance(level, int) else level.upper())
def _has_effective_handler(logger):
def _has_effective_handler(logger) -> bool:
"""
Checks if a logger has a handler that will catch its messages in its logger hierarchy.
:param `logging.Logger` logger: The logger to be checked.
:return: True if a handler is found for the logger, False otherwise.
:rtype: bool
Args:
logger (logging.Logger): The logger to be checked.
Returns:
is_configured (bool): True if a handler is found for the logger, False otherwise.
"""
while True:
if logger.handlers:

@ -1,23 +1,24 @@
import uuid
import sys
import warnings
import typing as t
import logging
from collections import namedtuple
from datetime import datetime, timezone
from datetime import datetime, timezone, timedelta
from functools import total_ordering
from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union
from redis import WatchError
if t.TYPE_CHECKING:
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .job import Retry
from .utils import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
from .exceptions import DequeueTimeout, NoSuchJobError
from .job import Job, JobStatus
from .types import FunctionReferenceType, JobDependencyType
from .serializers import resolve_serializer
from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow, compact
@ -43,14 +44,22 @@ class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout"
@total_ordering
class Queue:
job_class: t.Type['Job'] = Job
job_class: Type['Job'] = Job
DEFAULT_TIMEOUT: int = 180 # Default timeout seconds.
redis_queue_namespace_prefix: str = 'rq:queue:'
redis_queues_keys: str = 'rq:queues'
@classmethod
def all(cls, connection: t.Optional['Redis'] = None, job_class: t.Optional[t.Type['Job']] = None, serializer=None):
def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']:
"""Returns an iterable of all Queues.
Args:
connection (Optional[Redis], optional): The Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The Job class to use. Defaults to None.
serializer (optional): The serializer to use. Defaults to None.
Returns:
queues (List[Queue]): A list of all queues.
"""
connection = resolve_connection(connection)
@ -59,16 +68,28 @@ class Queue:
connection=connection,
job_class=job_class, serializer=serializer)
return [to_queue(rq_key)
for rq_key in connection.smembers(cls.redis_queues_keys)
if rq_key]
all_registerd_queues = connection.smembers(cls.redis_queues_keys)
all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key]
return all_queues
@classmethod
def from_queue_key(cls, queue_key, connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, serializer=None):
def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys.
Args:
queue_key (str): The queue key
connection (Optional[Redis], optional): Redis connection. Defaults to None.
job_class (Optional[Job], optional): Job class. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
Raises:
ValueError: If the queue_key doesn't start with the defined prefix
Returns:
queue (Queue): The Queue object
"""
prefix = cls.redis_queue_namespace_prefix
if not queue_key.startswith(prefix):
@ -76,8 +97,19 @@ class Queue:
name = queue_key[len(prefix):]
return cls(name, connection=connection, job_class=job_class, serializer=serializer)
def __init__(self, name='default', default_timeout=None, connection: t.Optional['Redis'] = None,
is_async=True, job_class=None, serializer=None, **kwargs):
def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None,
is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs):
"""Initializes a Queue object.
Args:
name (str, optional): The queue name. Defaults to 'default'.
default_timeout (Optional[int], optional): Queue's default timeout. Defaults to None.
connection (Optional[Redis], optional): Redis connection. Defaults to None.
is_async (bool, optional): Whether jobs should run "async" (using the worker).
If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True.
job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None.
serializer (Any, optional): Serializer. Defaults to None.
"""
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
self.name = name
@ -97,7 +129,7 @@ class Queue:
self.job_class = job_class
self.serializer = resolve_serializer(serializer)
self.redis_server_version = None
self.redis_server_version: Optional[Tuple[int, int, int]] = None
def __len__(self):
return self.count
@ -111,8 +143,12 @@ class Queue:
def __iter__(self):
yield self
def get_redis_server_version(self):
"""Return Redis server version of connection"""
def get_redis_server_version(self) -> Tuple[int, int, int]:
"""Return Redis server version of connection
Returns:
redis_version (Tuple): A tuple with the parsed Redis version (eg: (5,0,0))
"""
if not self.redis_server_version:
self.redis_server_version = get_version(self.connection)
return self.redis_server_version
@ -127,14 +163,28 @@ class Queue:
"""Redis key used to indicate this queue has been cleaned."""
return 'rq:clean_registries:%s' % self.name
def acquire_cleaning_lock(self):
def acquire_cleaning_lock(self) -> bool:
"""Returns a boolean indicating whether a lock to clean this queue
is acquired. A lock expires in 899 seconds (15 minutes - 1 second)
Returns:
lock_acquired (bool)
"""
return self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
lock_acquired = self.connection.set(self.registry_cleaning_key, 1, nx=1, ex=899)
if not lock_acquired:
return False
return lock_acquired
def empty(self):
"""Removes all messages on the queue."""
"""Removes all messages on the queue.
This is currently being done using a Lua script,
which iterates all queue messages and deletes the jobs and it's dependents.
It registers the Lua script and calls it.
Even though is currently being returned, this is not strictly necessary.
Returns:
script (...): The Lua Script is called.
"""
script = """
local prefix = "{0}"
local q = KEYS[1]
@ -156,7 +206,11 @@ class Queue:
return script(keys=[self.key])
def delete(self, delete_jobs: bool = True):
"""Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first."""
"""Deletes the queue.
Args:
delete_jobs (bool): If true, removes all the associated messages on the queue first.
"""
if delete_jobs:
self.empty()
@ -165,8 +219,12 @@ class Queue:
pipeline.delete(self._key)
pipeline.execute()
def is_empty(self):
"""Returns whether the current queue is empty."""
def is_empty(self) -> bool:
"""Returns whether the current queue is empty.
Returns:
is_empty (bool): Whether the queue is empty
"""
return self.count == 0
@property
@ -174,7 +232,17 @@ class Queue:
"""Returns whether the current queue is async."""
return bool(self._is_async)
def fetch_job(self, job_id: str):
def fetch_job(self, job_id: str) -> Optional['Job']:
"""Fetch a single job by Job ID.
If the job key is not found, will run the `remove` method, to exclude the key.
If the job has the same name as as the current job origin, returns the Job
Args:
job_id (str): The Job ID
Returns:
job (Optional[Job]): The job if found
"""
try:
job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError:
@ -183,13 +251,19 @@ class Queue:
if job.origin == self.name:
return job
def get_job_position(self, job_or_id: t.Union[Job, str]):
def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]:
"""Returns the position of a job within the queue
Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of
worse than O(N) and should not be used for very long job queues. Redis
and redis-py version afterwards should support the LPOS command
handling job positions within Redis c implementation.
Args:
job_or_id (Union[Job, str]): The Job instance or Job ID
Returns:
_type_: _description_
"""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
@ -204,8 +278,16 @@ class Queue:
return self.job_ids.index(job_id)
return None
def get_job_ids(self, offset: int = 0, length: int = -1):
"""Returns a slice of job IDs in the queue."""
def get_job_ids(self, offset: int = 0, length: int = -1) -> List[str]:
"""Returns a slice of job IDs in the queue.
Args:
offset (int, optional): The offset. Defaults to 0.
length (int, optional): The slice length. Defaults to -1 (last element).
Returns:
_type_: _description_
"""
start = offset
if length >= 0:
end = offset + (length - 1)
@ -219,18 +301,26 @@ class Queue:
self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
return job_ids
def get_jobs(self, offset: int = 0, length: int = -1):
"""Returns a slice of jobs in the queue."""
def get_jobs(self, offset: int = 0, length: int = -1) -> List['Job']:
"""Returns a slice of jobs in the queue.
Args:
offset (int, optional): The offset. Defaults to 0.
length (int, optional): The slice length. Defaults to -1.
Returns:
_type_: _description_
"""
job_ids = self.get_job_ids(offset, length)
return compact([self.fetch_job(job_id) for job_id in job_ids])
@property
def job_ids(self) -> t.List[str]:
def job_ids(self) -> List[str]:
"""Returns a list of all job IDS in the queue."""
return self.get_job_ids()
@property
def jobs(self) -> t.List['Job']:
def jobs(self) -> List['Job']:
"""Returns a list of all (valid) jobs in the queue."""
return self.get_jobs()
@ -276,8 +366,16 @@ class Queue:
from rq.registry import CanceledJobRegistry
return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id: t.Union['Job', str], pipeline: t.Optional['Pipeline'] = None):
"""Removes Job from queue, accepts either a Job instance or ID."""
def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None):
"""Removes Job from queue, accepts either a Job instance or ID.
Args:
job_or_id (Union[Job, str]): The Job instance or Job ID string.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
_type_: _description_
"""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None:
@ -286,8 +384,8 @@ class Queue:
return self.connection.lrem(self.key, 1, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guaranteeing FIFO semantics.
"""Removes all "dead" jobs from the queue by cycling through it,
while guaranteeing FIFO semantics.
"""
COMPACT_QUEUE = '{0}_compact:{1}'.format(
self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa
@ -300,9 +398,15 @@ class Queue:
if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id: str, pipeline: t.Optional['Pipeline'] = None, at_front=False):
def push_job_id(self, job_id: str, pipeline: Optional['Pipeline'] = None, at_front: bool = False):
"""Pushes a job ID on the corresponding Redis queue.
'at_front' allows you to push the job onto the front instead of the back of the queue"""
'at_front' allows you to push the job onto the front instead of the back of the queue
Args:
job_id (str): The Job ID
pipeline (Optional[Pipeline], optional): The Redis Pipeline to use. Defaults to None.
at_front (bool, optional): Whether to push the job to front of the queue. Defaults to False.
"""
connection = pipeline if pipeline is not None else self.connection
if at_front:
result = connection.lpush(self.key, job_id)
@ -310,12 +414,38 @@ class Queue:
result = connection.rpush(self.key, job_id)
self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED, retry=None, *,
on_success=None, on_failure=None) -> Job:
"""Creates a job based on parameters given."""
def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None,
job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None) -> Job:
"""Creates a job based on parameters given
Args:
func (FunctionReferenceType): The function referce: a callable or the path.
args (Union[Tuple, List, None], optional): The `*args` to pass to the function. Defaults to None.
kwargs (Optional[Dict], optional): The `**kwargs` to pass to the function. Defaults to None.
timeout (Optional[int], optional): Function timeout. Defaults to None.
result_ttl (Optional[int], optional): Result time to live. Defaults to None.
ttl (Optional[int], optional): Time to live. Defaults to None.
failure_ttl (Optional[int], optional): Failure time to live. Defaults to None.
description (Optional[str], optional): The description. Defaults to None.
depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None.
job_id (Optional[str], optional): Job ID. Defaults to None.
meta (Optional[Dict], optional): Job metadata. Defaults to None.
status (JobStatus, optional): Job status. Defaults to JobStatus.QUEUED.
retry (Optional[Retry], optional): The Retry Object. Defaults to None.
on_success (Optional[Callable], optional): On success callable. Defaults to None.
on_failure (Optional[Callable], optional): On failure callable. Defaults to None.
Raises:
ValueError: If the timeout is 0
ValueError: If the job TTL is 0 or negative
Returns:
Job: The created job
"""
timeout = parse_timeout(timeout)
if timeout is None:
@ -345,14 +475,22 @@ class Queue:
return job
def setup_dependencies(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None):
# If a _dependent_ job depends on any unfinished job, register all the
# _dependent_ job's dependencies instead of enqueueing it.
#
# `Job#fetch_dependencies` sets WATCH on all dependencies. If
# WatchError is raised in the when the pipeline is executed, that means
# something else has modified either the set of dependencies or the
# status of one of them. In this case, we simply retry.
def setup_dependencies(self, job: 'Job', pipeline: Optional['Pipeline'] = None) -> 'Job':
"""If a _dependent_ job depends on any unfinished job, register all the
_dependent_ job's dependencies instead of enqueueing it.
`Job#fetch_dependencies` sets WATCH on all dependencies. If
WatchError is raised in the when the pipeline is executed, that means
something else has modified either the set of dependencies or the
status of one of them. In this case, we simply retry.
Args:
job (Job): The job
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
job (Job): The Job
"""
if len(job._dependency_ids) > 0:
orig_status = job.get_status(refresh=False)
pipe = pipeline if pipeline is not None else self.connection.pipeline()
@ -397,16 +535,39 @@ class Queue:
pipeline.multi() # Ensure pipeline in multi mode before returning to caller
return job
def enqueue_call(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, description=None,
depends_on=None, job_id: str = None, at_front: bool = False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None) -> Job:
def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None,
retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
"""
Args:
func (FunctionReferenceType): The reference to the function
args (Union[Tuple, List, None], optional): THe `*args` to pass to the function. Defaults to None.
kwargs (Optional[Dict], optional): THe `**kwargs` to pass to the function. Defaults to None.
timeout (Optional[int], optional): Function timeout. Defaults to None.
result_ttl (Optional[int], optional): Result time to live. Defaults to None.
ttl (Optional[int], optional): Time to live. Defaults to None.
failure_ttl (Optional[int], optional): Failure time to live. Defaults to None.
description (Optional[str], optional): The job description. Defaults to None.
depends_on (Optional[JobDependencyType], optional): The job dependencies. Defaults to None.
job_id (Optional[str], optional): The job ID. Defaults to None.
at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False.
meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None.
retry (Optional[Retry], optional): Retry object. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
Job: The enqueued Job
"""
job = self.create_job(
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl,
@ -425,12 +586,33 @@ class Queue:
return job
@staticmethod
def prepare_data(func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, job_id=None,
at_front=False, meta=None, retry=None, on_success=None, on_failure=None) -> EnqueueData:
# Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
# And can keep this logic within EnqueueData
def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None,
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None,
failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None,
at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData
Args:
func (FunctionReferenceType): The reference to the function
args (Union[Tuple, List, None], optional): THe `*args` to pass to the function. Defaults to None.
kwargs (Optional[Dict], optional): THe `**kwargs` to pass to the function. Defaults to None.
timeout (Optional[int], optional): Function timeout. Defaults to None.
result_ttl (Optional[int], optional): Result time to live. Defaults to None.
ttl (Optional[int], optional): Time to live. Defaults to None.
failure_ttl (Optional[int], optional): Failure time to live. Defaults to None.
description (Optional[str], optional): The job description. Defaults to None.
job_id (Optional[str], optional): The job ID. Defaults to None.
at_front (bool, optional): Whether to enqueue the job at the front. Defaults to False.
meta (Optional[Dict], optional): Metadata to attach to the job. Defaults to None.
retry (Optional[Retry], optional): Retry object. Defaults to None.
on_success (Optional[Callable[..., Any]], optional): Callable for on success. Defaults to None.
on_failure (Optional[Callable[..., Any]], optional): Callable for on failure. Defaults to None.
Returns:
EnqueueData: The EnqueueData
"""
return EnqueueData(
func, args, kwargs, timeout,
result_ttl, ttl, failure_ttl,
@ -438,10 +620,16 @@ class Queue:
at_front, meta, retry, on_success, on_failure
)
def enqueue_many(self, job_datas, pipeline: t.Optional['Pipeline'] = None) -> t.List[Job]:
"""
Creates multiple jobs (created via `Queue.prepare_data` calls)
def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]:
"""Creates multiple jobs (created via `Queue.prepare_data` calls)
to represent the delayed function calls and enqueues them.
Args:
job_datas (List['EnqueueData']): A List of job data
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
Returns:
List[Job]: A list of enqueued jobs
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
jobs = [
@ -467,6 +655,14 @@ class Queue:
return jobs
def run_job(self, job: 'Job') -> Job:
"""Run the job
Args:
job (Job): The job to run
Returns:
Job: _description_
"""
job.perform()
job.set_status(JobStatus.FINISHED)
job.save(include_meta=False)
@ -474,7 +670,7 @@ class Queue:
return job
@classmethod
def parse_args(cls, f: t.Union[t.Callable[..., t.Any], str], *args, **kwargs):
def parse_args(cls, f: 'FunctionReferenceType', *args, **kwargs):
"""
Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()`
@ -484,6 +680,11 @@ class Queue:
* A reference to an object's instance method
* A string, representing the location of a function (must be
meaningful to the import context of the workers)
Args:
f (FunctionReferenceType): The function reference
args (*args): function args
kwargs (*kwargs): function kargs
"""
if not isinstance(f, str) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed '
@ -514,9 +715,18 @@ class Queue:
depends_on, job_id, at_front, meta, retry, on_success, on_failure,
pipeline, args, kwargs)
def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues it."""
def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job':
"""Creates a job to represent the delayed function call and enqueues it.
Receives the same parameters accepted by the `enqueue_call` method.
Args:
f (FunctionReferenceType): The function reference
args (*args): function args
kwargs (*kwargs): function kargs
Returns:
job (Job): The created Job
"""
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, on_success,
on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
@ -530,8 +740,15 @@ class Queue:
)
def enqueue_at(self, datetime: datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
"""Schedules a job to be enqueued at specified time
Args:
datetime (datetime): _description_
f (_type_): _description_
Returns:
_type_: _description_
"""
(f, timeout, description, result_ttl, ttl, failure_ttl,
depends_on, job_id, at_front, meta, retry, on_success, on_failure,
pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs)
@ -544,8 +761,17 @@ class Queue:
job.enqueue_at_front = True
return self.schedule_job(job, datetime, pipeline=pipeline)
def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None):
"""Puts job on ScheduledJobRegistry"""
def schedule_job(self, job: 'Job', datetime: datetime, pipeline: Optional['Pipeline'] = None):
"""Puts job on ScheduledJobRegistry
Args:
job (Job): _description_
datetime (datetime): _description_
pipeline (Optional[Pipeline], optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self)
@ -559,15 +785,31 @@ class Queue:
pipe.execute()
return job
def enqueue_in(self, time_delta, func, *args, **kwargs):
"""Schedules a job to be executed in a given `timedelta` object"""
def enqueue_in(self, time_delta: timedelta, func: 'FunctionReferenceType', *args, **kwargs) -> 'Job':
"""Schedules a job to be executed in a given `timedelta` object
Args:
time_delta (timedelta): The timedelta object
func (FunctionReferenceType): The function reference
Returns:
job (Job): The enqueued Job
"""
return self.enqueue_at(datetime.now(timezone.utc) + time_delta,
func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, at_front: bool = False) -> Job:
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution.
If Queue is instantiated with is_async=False, job is executed immediately.
Args:
job (Job): The job to enqueue
pipeline (Optional[Pipeline], optional): The Redis pipeline to use. Defaults to None.
at_front (bool, optional): Whether should enqueue at the front of the queue. Defaults to False.
Returns:
Job: The enqued job
"""
pipe = pipeline if pipeline is not None else self.connection.pipeline()
@ -596,6 +838,14 @@ class Queue:
return job
def run_sync(self, job: 'Job') -> 'Job':
"""Run a job synchronously, meaning on the same process the method was called.
Args:
job (Job): The job to run
Returns:
Job: The job instance
"""
with self.connection.pipeline() as pipeline:
job.prepare_for_execution('sync', pipeline)
@ -611,12 +861,17 @@ class Queue:
return job
def enqueue_dependents(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, exclude_job_id=None):
def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None):
"""Enqueues all jobs in the given job's dependents set and clears it.
When called without a pipeline, this method uses WATCH/MULTI/EXEC.
If you pass a pipeline, only MULTI is called. The rest is up to the
caller.
Args:
job (Job): The Job to enqueue the dependents
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
exclude_job_id (Optional[str], optional): Whether to exclude the job id. Defaults to None.
"""
from .registry import DeferredJobRegistry
@ -690,12 +945,16 @@ class Queue:
# handle it
raise
def pop_job_id(self):
"""Pops a given job ID from this Redis queue."""
def pop_job_id(self) -> Optional[str]:
"""Pops a given job ID from this Redis queue.
Returns:
job_id (str): The job id
"""
return as_text(self.connection.lpop(self.key))
@classmethod
def lpop(cls, queue_keys, timeout: int, connection: t.Optional['Redis'] = None):
def lpop(cls, queue_keys: List[str], timeout: int, connection: Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@ -707,6 +966,18 @@ class Queue:
The timeout parameter is interpreted as follows:
None - non-blocking (return immediately)
> 0 - maximum number of seconds to block
Args:
queue_keys (_type_): _description_
timeout (int): _description_
connection (Optional[Redis], optional): _description_. Defaults to None.
Raises:
ValueError: If timeout of 0 was passed
DequeueTimeout: BLPOP Timeout
Returns:
_type_: _description_
"""
connection = resolve_connection(connection)
if timeout is not None: # blocking variant
@ -728,8 +999,8 @@ class Queue:
return None
@classmethod
def dequeue_any(cls, queues, timeout, connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, serializer=None):
def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.
@ -739,8 +1010,21 @@ class Queue:
None.
See the documentation of cls.lpop for the interpretation of timeout.
Args:
queues (List[Queue]): List of queue objects
timeout (int): Timeout for the LPOP
connection (Optional[Redis], optional): Redis Connection. Defaults to None.
job_class (Optional[Job], optional): The job classification. Defaults to None.
serializer (Any, optional): Serializer to use. Defaults to None.
Raises:
e: Any exception
Returns:
job, queue (Tuple[Job, Queue]): A tuple of Job, Queue
"""
job_class = backend_class(cls, 'job_class', override=job_class)
job_class: Job = backend_class(cls, 'job_class', override=job_class)
while True:
queue_keys = [q.key for q in queues]

@ -1,10 +1,10 @@
import typing as t
import calendar
from rq.serializers import resolve_serializer
import time
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, List, Optional, Type, Union
if t.TYPE_CHECKING:
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
@ -26,8 +26,8 @@ class BaseRegistry:
job_class = Job
key_template = 'rq:registry:{0}'
def __init__(self, name='default', connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, queue=None, serializer=None):
def __init__(self, name: str = 'default', connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None):
if queue:
self.name = queue.name
self.connection = resolve_connection(queue.connection)
@ -50,7 +50,7 @@ class BaseRegistry:
self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
)
def __contains__(self, item: t.Union[str, 'Job']):
def __contains__(self, item: Union[str, 'Job']):
"""
Returns a boolean indicating registry contains the given
job instance or job id.
@ -64,19 +64,26 @@ class BaseRegistry:
return self.connection.zscore(self.key, job_id) is not None
@property
def count(self):
"""Returns the number of jobs in this registry"""
def count(self) -> int:
"""Returns the number of jobs in this registry
Returns:
int: _description_
"""
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job: 'Job', ttl=0, pipeline: t.Optional['Pipeline'] = None, xx: bool = False):
def add(self, job: 'Job', ttl=0, pipeline: Optional['Pipeline'] = None, xx: bool = False) -> int:
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf
Args:
job (Job): The Job to add
ttl (int, optional): The time to live. Defaults to 0.
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
xx (bool, optional): .... Defaults to False.
Returns:
result (int): The ZADD command result
"""
score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1:
@ -86,12 +93,12 @@ class BaseRegistry:
return self.connection.zadd(self.key, {job.id: score}, xx=xx)
def remove(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, delete_job: bool = False):
def remove(self, job: 'Job', pipeline: Optional['Pipeline'] = None, delete_job: bool = False):
"""Removes job from registry and deletes it if `delete_job == True`
Args:
job (Job): The Job to remove from the registry
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
delete_job (bool, optional): If should delete the job.. Defaults to False.
"""
connection = pipeline if pipeline is not None else self.connection
@ -105,7 +112,7 @@ class BaseRegistry:
job_instance.delete()
return result
def get_expired_job_ids(self, timestamp: t.Optional[float] = None):
def get_expired_job_ids(self, timestamp: Optional[float] = None):
"""Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
@ -113,11 +120,19 @@ class BaseRegistry:
time if unspecified.
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score)]
expired_jobs = self.connection.zrangebyscore(self.key, 0, score)
return [as_text(job_id) for job_id in expired_jobs]
def get_job_ids(self, start: int = 0, end: int = -1):
"""Returns list of all job ids."""
"""Returns list of all job ids.
Args:
start (int, optional): _description_. Defaults to 0.
end (int, optional): _description_. Defaults to -1.
Returns:
_type_: _description_
"""
self.cleanup()
return [as_text(job_id) for job_id in
self.connection.zrange(self.key, start, end)]
@ -135,11 +150,11 @@ class BaseRegistry:
score = self.connection.zscore(self.key, job.id)
return datetime.utcfromtimestamp(score)
def requeue(self, job_or_id: t.Union['Job', str], at_front: bool = False) -> 'Job':
def requeue(self, job_or_id: Union['Job', str], at_front: bool = False) -> 'Job':
"""Requeues the job with the given job ID.
Args:
job_or_id (t.Union[&#39;Job&#39;, str]): The Job or the Job ID
job_or_id (Union[&#39;Job&#39;, str]): The Job or the Job ID
at_front (bool, optional): If the Job should be put at the front of the queue. Defaults to False.
Raises:
@ -182,7 +197,7 @@ class StartedJobRegistry(BaseRegistry):
"""
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp: t.Optional[float] = None):
def cleanup(self, timestamp: Optional[float] = None):
"""Remove expired jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -233,7 +248,7 @@ class FinishedJobRegistry(BaseRegistry):
"""
key_template = 'rq:finished:{0}'
def cleanup(self, timestamp: t.Optional[float] = None):
def cleanup(self, timestamp: Optional[float] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -250,7 +265,7 @@ class FailedJobRegistry(BaseRegistry):
"""
key_template = 'rq:failed:{0}'
def cleanup(self, timestamp: t.Optional[float] = None):
def cleanup(self, timestamp: Optional[float] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -260,7 +275,7 @@ class FailedJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None,
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None,
_save_exc_to_job: bool = False):
"""
Adds a job to a registry with expiry time of now + ttl.
@ -310,7 +325,7 @@ class ScheduledJobRegistry(BaseRegistry):
# make sense in this context
self.get_jobs_to_enqueue = self.get_expired_job_ids
def schedule(self, job: 'Job', scheduled_datetime, pipeline: t.Optional['Pipeline'] = None):
def schedule(self, job: 'Job', scheduled_datetime, pipeline: Optional['Pipeline'] = None):
"""
Adds job to registry, scored by its execution time (in UTC).
If datetime has no tzinfo, it will assume localtimezone.
@ -329,20 +344,43 @@ class ScheduledJobRegistry(BaseRegistry):
implemented in BaseRegistry."""
pass
def remove_jobs(self, timestamp: t.Optional[datetime] = None, pipeline: t.Optional['Pipeline'] = None):
"""Remove jobs whose timestamp is in the past from registry."""
def remove_jobs(self, timestamp: Optional[datetime] = None, pipeline: Optional['Pipeline'] = None):
"""Remove jobs whose timestamp is in the past from registry.
Args:
timestamp (Optional[datetime], optional): The timestamp. Defaults to None.
pipeline (Optional[Pipeline], optional): The Redis pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else self.connection
score = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
def get_jobs_to_schedule(self, timestamp: t.Optional[datetime] = None, chunk_size: int = 1000):
"""Get's a list of job IDs that should be scheduled."""
def get_jobs_to_schedule(self, timestamp: Optional[datetime] = None, chunk_size: int = 1000) -> List[str]:
"""Get's a list of job IDs that should be scheduled.
Args:
timestamp (Optional[datetime], optional): _description_. Defaults to None.
chunk_size (int, optional): _description_. Defaults to 1000.
Returns:
jobs (List[str]): A list of Job ids
"""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)]
jobs_to_schedule = self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)
return [as_text(job_id) for job_id in jobs_to_schedule]
def get_scheduled_time(self, job_or_id: t.Union['Job', str]):
"""Returns datetime (UTC) at which job is scheduled to be enqueued"""
def get_scheduled_time(self, job_or_id: Union['Job', str]) -> datetime:
"""Returns datetime (UTC) at which job is scheduled to be enqueued
Args:
job_or_id (Union[Job, str]): The Job instance or Job ID
Raises:
NoSuchJobError: If the job was not found
Returns:
datetime (datetime): The scheduled time as datetime object
"""
if isinstance(job_or_id, self.job_class):
job_id = job_or_id.id
else:
@ -358,7 +396,7 @@ class ScheduledJobRegistry(BaseRegistry):
class CanceledJobRegistry(BaseRegistry):
key_template = 'rq:canceled:{0}'
def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None):
def get_expired_job_ids(self, timestamp: Optional[datetime] = None):
raise NotImplementedError
def cleanup(self):
@ -368,8 +406,12 @@ class CanceledJobRegistry(BaseRegistry):
pass
def clean_registries(queue):
"""Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue."""
def clean_registries(queue: 'Queue'):
"""Cleans StartedJobRegistry, FinishedJobRegistry and FailedJobRegistry of a queue.
Args:
queue (Queue): The queue to clean
"""
registry = FinishedJobRegistry(name=queue.name,
connection=queue.connection,
job_class=queue.job_class,

@ -25,7 +25,7 @@ class Result(object):
def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None,
created_at: Optional[datetime] = None, return_value: Optional[Any] = None,
exc_string: Optional[str] = None, serializer=None):
exc_string: Optional[str] = None, serializer=None):
self.return_value = return_value
self.exc_string = exc_string
self.type = type

@ -1,6 +1,6 @@
import typing as t
from typing import TYPE_CHECKING, Optional
if t.TYPE_CHECKING:
if TYPE_CHECKING:
from redis import Redis
from rq.worker import Worker
@ -8,30 +8,30 @@ if t.TYPE_CHECKING:
WORKERS_SUSPENDED = 'rq:suspended'
def is_suspended(connection: 'Redis', worker: t.Optional['Worker'] = None):
def is_suspended(connection: 'Redis', worker: Optional['Worker'] = None):
"""Checks whether a Worker is suspendeed on a given connection
PS: pipeline returns a list of responses
Ref: https://github.com/andymccurdy/redis-py#pipelines
Args:
connection (Redis): The Redis Connection
worker (t.Optional[Worker], optional): The Worker. Defaults to None.
worker (Optional[Worker], optional): The Worker. Defaults to None.
"""
with connection.pipeline() as pipeline:
if worker is not None:
worker.heartbeat(pipeline=pipeline)
pipeline.exists(WORKERS_SUSPENDED)
# pipeline returns a list of responses
# https://github.com/andymccurdy/redis-py#pipelines
return pipeline.execute()[-1]
def suspend(connection: 'Redis', ttl: int = None):
def suspend(connection: 'Redis', ttl: Optional[int] = None):
"""
Suspends.
TTL of 0 will invalidate right away.
Args:
connection (Redis): The Redis connection to use..
ttl (int): time to live in seconds. Defaults to `None`
ttl (Optional[int], optional): time to live in seconds. Defaults to `None`
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:

@ -92,7 +92,8 @@ class TimerDeathPenalty(BaseDeathPenalty):
self._exception.__init__ = init_with_message
def new_timer(self):
"""Returns a new timer since timers can only be used once."""
"""Returns a new timer since timers can only be used once.
"""
return threading.Timer(self._timeout, self.handle_death_penalty)
def handle_death_penalty(self):
@ -110,11 +111,13 @@ class TimerDeathPenalty(BaseDeathPenalty):
raise SystemError("PyThreadState_SetAsyncExc failed")
def setup_death_penalty(self):
"""Starts the timer."""
"""Starts the timer.
"""
self._timer = self.new_timer()
self._timer.start()
def cancel_death_penalty(self):
"""Cancels the timer."""
"""Cancels the timer.
"""
self._timer.cancel()
self._timer = None

@ -12,10 +12,11 @@ import logging
import numbers
import sys
import datetime as dt
import typing as t
from collections.abc import Iterable
from typing import TYPE_CHECKING, Dict, List, Optional, Any, Callable, Tuple, Union
if t.TYPE_CHECKING:
if TYPE_CHECKING:
from redis import Redis
from redis.exceptions import ResponseError
@ -125,7 +126,7 @@ class ColorizingStreamHandler(logging.StreamHandler):
return message
def compact(lst: t.List[t.Any]) -> t.List[t.Any]:
def compact(lst: List[Any]) -> List[Any]:
"""Excludes `None` values from a list-like object.
Args:
@ -137,7 +138,18 @@ def compact(lst: t.List[t.Any]) -> t.List[t.Any]:
return [item for item in lst if item is not None]
def as_text(v):
def as_text(v: Union[bytes, str]) -> Optional[str]:
"""Converts a bytes value to a string using `utf-8`.
Args:
v (Union[bytes, str]): The value (bytes or string)
Raises:
ValueError: If the value is not bytes or string
Returns:
value (Optional[str]): Either the decoded string or None
"""
if v is None:
return None
elif isinstance(v, bytes):
@ -148,7 +160,7 @@ def as_text(v):
raise ValueError('Unknown type %r' % type(v))
def decode_redis_hash(h) -> t.Dict[str, t.Any]:
def decode_redis_hash(h) -> Dict[str, Any]:
"""Decodes the Redis hash, ensuring that keys are strings
Most importantly, decodes bytes strings, ensuring the dict has str keys.
@ -156,12 +168,12 @@ def decode_redis_hash(h) -> t.Dict[str, t.Any]:
h (Dict[Any, Any]): The Redis hash
Returns:
Dict[str, t.Any]: The decoded Redis data (Dictionary)
Dict[str, Any]: The decoded Redis data (Dictionary)
"""
return dict((as_text(k), h[k]) for k in h)
def import_attribute(name: str) -> t.Callable[..., t.Any]:
def import_attribute(name: str) -> Callable[..., Any]:
"""Returns an attribute from a dotted path name. Example: `path.to.func`.
When the attribute we look for is a staticmethod, module name in its
@ -181,7 +193,7 @@ def import_attribute(name: str) -> t.Callable[..., t.Any]:
ValueError: If no module is found or invalid attribute name.
Returns:
t.Any: An attribute (normally a Callable)
Any: An attribute (normally a Callable)
"""
name_bits = name.split('.')
module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]]
@ -218,7 +230,8 @@ def utcnow():
def now():
"""Return now in UTC"""
"""Return now in UTC
"""
return datetime.datetime.now(datetime.timezone.utc)
@ -237,9 +250,8 @@ def utcparse(string: str) -> dt.datetime:
return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ')
def first(iterable: t.Iterable, default=None, key=None):
"""
Return first element of `iterable` that evaluates true, else return None
def first(iterable: Iterable, default=None, key=None):
"""Return first element of `iterable` that evaluates true, else return None
(or an optional default value).
>>> first([0, False, None, [], (), 42])
@ -263,6 +275,13 @@ def first(iterable: t.Iterable, default=None, key=None):
>>> first([1, 1, 3, 4, 5], key=lambda x: x % 2 == 0)
4
Args:
iterable (t.Iterable): _description_
default (_type_, optional): _description_. Defaults to None.
key (_type_, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
if key is None:
for el in iterable:
@ -276,26 +295,51 @@ def first(iterable: t.Iterable, default=None, key=None):
return default
def is_nonstring_iterable(obj: t.Any) -> bool:
"""Returns whether the obj is an iterable, but not a string"""
return isinstance(obj, Iterable) and not isinstance(obj, str)
def is_nonstring_iterable(obj: Any) -> bool:
"""Returns whether the obj is an iterable, but not a string
Args:
obj (Any): _description_
def ensure_list(obj: t.Any) -> t.List:
Returns:
bool: _description_
"""
When passed an iterable of objects, does nothing, otherwise, it returns
return isinstance(obj, Iterable) and not isinstance(obj, str)
def ensure_list(obj: Any) -> List:
"""When passed an iterable of objects, does nothing, otherwise, it returns
a list with just that object in it.
Args:
obj (Any): _description_
Returns:
List: _description_
"""
return obj if is_nonstring_iterable(obj) else [obj]
def current_timestamp() -> int:
"""Returns current UTC timestamp"""
"""Returns current UTC timestamp
Returns:
int: _description_
"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
def backend_class(holder, default_name, override=None):
"""Get a backend class using its default attribute name or an override"""
"""Get a backend class using its default attribute name or an override
Args:
holder (_type_): _description_
default_name (_type_): _description_
override (_type_, optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
if override is None:
return getattr(holder, default_name)
elif isinstance(override, str):
@ -304,15 +348,16 @@ def backend_class(holder, default_name, override=None):
return override
def str_to_date(date_str: t.Optional[str]) -> t.Union[dt.datetime, t.Any]:
def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]:
if not date_str:
return
else:
return utcparse(date_str.decode())
def parse_timeout(timeout: t.Any):
"""Transfer all kinds of timeout format to an integer representing seconds"""
def parse_timeout(timeout: Any):
"""Transfer all kinds of timeout format to an integer representing seconds
"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:
try:
timeout = int(timeout)
@ -329,7 +374,7 @@ def parse_timeout(timeout: t.Any):
return timeout
def get_version(connection: 'Redis') -> t.Tuple[int, int, int]:
def get_version(connection: 'Redis') -> Tuple[int, int, int]:
"""
Returns tuple of Redis server version.
This function also correctly handles 4 digit redis server versions.
@ -354,15 +399,23 @@ def get_version(connection: 'Redis') -> t.Tuple[int, int, int]:
def ceildiv(a, b):
"""Ceiling division. Returns the ceiling of the quotient of a division operation"""
"""Ceiling division. Returns the ceiling of the quotient of a division operation
Args:
a (_type_): _description_
b (_type_): _description_
Returns:
_type_: _description_
"""
return -(-a // b)
def split_list(a_list: t.List[t.Any], segment_size: int):
def split_list(a_list: List[Any], segment_size: int):
"""Splits a list into multiple smaller lists having size `segment_size`
Args:
a_list (t.List[t.Any]): A list to split
a_list (List[Any]): A list to split
segment_size (int): The segment size to split into
Yields:
@ -372,12 +425,12 @@ def split_list(a_list: t.List[t.Any], segment_size: int):
yield a_list[i:i + segment_size]
def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str:
def truncate_long_string(data: str, max_length: Optional[int] = None) -> str:
"""Truncate arguments with representation longer than max_length
Args:
data (str): The data to truncate
max_length (t.Optional[int], optional): The max length. Defaults to None.
max_length (Optional[int], optional): The max length. Defaults to None.
Returns:
truncated (str): The truncated string
@ -387,8 +440,8 @@ def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str:
return (data[:max_length] + '...') if len(data) > max_length else data
def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.Any, t.Any],
max_length: t.Optional[int] = None) -> t.Optional[str]:
def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any],
max_length: Optional[int] = None) -> Optional[str]:
"""
Returns a string representation of the call, formatted as a regular
Python function invocation statement. If max_length is not None, truncate
@ -396,8 +449,8 @@ def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.An
Args:
func_name (str): The funtion name
args (t.Any): The function arguments
kwargs (t.Dict[t.Any, t.Any]): The function kwargs
args (Any): The function arguments
kwargs (Dict[Any, Any]): The function kwargs
max_length (int, optional): The max length. Defaults to None.
Returns:

@ -8,9 +8,11 @@ import sys
import time
import traceback
import warnings
import typing as t
if t.TYPE_CHECKING:
from typing import TYPE_CHECKING, Type, List, Dict, Any
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
@ -112,13 +114,16 @@ class Worker:
@classmethod
def all(
cls,
connection: t.Optional['Redis'] = None,
job_class: t.Type['Job'] = None,
queue_class: t.Optional[t.Type['Queue']] = None,
queue: t.Optional['Queue'] = None,
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
queue_class: Optional[Type['Queue']] = None,
queue: Optional['Queue'] = None,
serializer=None
) -> t.List['Worker']:
) -> List['Worker']:
"""Returns an iterable of all Workers.
Returns:
workers (List[Worker]): A list of workers
"""
if queue:
connection = queue.connection
@ -134,18 +139,35 @@ class Worker:
return compact(workers)
@classmethod
def all_keys(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None):
def all_keys(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None) -> List[str]:
"""List of worker keys
Args:
connection (Optional[Redis], optional): A Redis Connection. Defaults to None.
queue (Optional[Queue], optional): The Queue. Defaults to None.
Returns:
list_keys (List[str]): A list of worker keys
"""
return [as_text(key)
for key in get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None):
"""Returns the number of workers by queue or connection"""
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None):
"""Returns the number of workers by queue or connection
Args:
connection (Optional[&#39;Redis&#39;], optional): _description_. Defaults to None.
queue (Optional[&#39;Queue&#39;], optional): _description_. Defaults to None.
Returns:
_type_: _description_
"""
return len(get_keys(queue=queue, connection=connection))
@classmethod
def find_by_key(cls, worker_key: str, connection: t.Optional['Redis'] = None, job_class: t.Type['Job'] = None,
queue_class: t.Type['Queue'] = None, serializer=None):
def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None,
queue_class: Type['Queue'] = None, serializer=None):
"""Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys.
@ -168,9 +190,9 @@ class Worker:
return worker
def __init__(self, queues, name: t.Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL,
connection: t.Optional['Redis'] = None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class: t.Type['Job'] = None,
def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL,
connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None,
queue_class=None, log_job_description: bool = True,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler: bool = False,
@ -217,7 +239,7 @@ class Worker:
self.total_working_time: int = 0
self.current_job_working_time: int = 0
self.birth_date = None
self.scheduler: t.Optional[RQScheduler] = None
self.scheduler: Optional[RQScheduler] = None
self.pubsub = None
self.pubsub_thread = None
@ -384,7 +406,7 @@ class Worker:
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline: t.Optional['Pipeline'] = None):
def set_state(self, state, pipeline: Optional['Pipeline'] = None):
self._state = state
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'state', state)
@ -410,12 +432,12 @@ class Worker:
state = property(_get_state, _set_state)
def set_current_job_working_time(self, current_job_working_time, pipeline: t.Optional['Pipeline'] = None):
def set_current_job_working_time(self, current_job_working_time, pipeline: Optional['Pipeline'] = None):
self.current_job_working_time = current_job_working_time
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'current_job_working_time', current_job_working_time)
def set_current_job_id(self, job_id: t.Optional[str] = None, pipeline: t.Optional['Pipeline'] = None):
def set_current_job_id(self, job_id: Optional[str] = None, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
if job_id is None:
@ -423,7 +445,7 @@ class Worker:
else:
connection.hset(self.key, 'current_job', job_id)
def get_current_job_id(self, pipeline: t.Optional['Pipeline'] = None):
def get_current_job_id(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
return as_text(connection.hget(self.key, 'current_job'))
@ -719,7 +741,7 @@ class Worker:
self.heartbeat()
return result
def heartbeat(self, timeout=None, pipeline: t.Optional['Pipeline'] = None):
def heartbeat(self, timeout=None, pipeline: Optional['Pipeline'] = None):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
@ -777,11 +799,11 @@ class Worker:
job_class=self.job_class, serializer=self.serializer)
for queue in queues.split(',')]
def increment_failed_job_count(self, pipeline: t.Optional['Pipeline'] = None):
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1)
def increment_successful_job_count(self, pipeline: t.Optional['Pipeline'] = None):
def increment_successful_job_count(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'successful_job_count', 1)
@ -962,7 +984,7 @@ class Worker:
self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None,
exc_string=''):
exc_string=''):
"""
Handles the failure or an executing job by:
1. Setting the job status to failed

@ -1,6 +1,6 @@
import typing as t
from typing import Optional, TYPE_CHECKING, Any, Set
if t.TYPE_CHECKING:
if TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .worker import Worker
@ -15,13 +15,13 @@ REDIS_WORKER_KEYS = 'rq:workers'
MAX_KEYS = 1000
def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
def register(worker: 'Worker', pipeline: Optional['Pipeline'] = None):
"""
Store worker key in Redis so we can easily discover active workers.
Args:
worker (Worker): The Worker
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
pipeline (Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else worker.connection
connection.sadd(worker.redis_workers_keys, worker.key)
@ -30,12 +30,12 @@ def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
connection.sadd(redis_key, worker.key)
def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
def unregister(worker: 'Worker', pipeline: Optional['Pipeline'] = None):
"""Remove Worker key from Redis
Args:
worker (Worker): The Worker
pipeline (t.Optional[Pipeline], optional): Redis Pipeline. Defaults to None.
pipeline (Optional[Pipeline], optional): Redis Pipeline. Defaults to None.
"""
if pipeline is None:
connection = worker.connection.pipeline()
@ -51,12 +51,12 @@ def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
connection.execute()
def get_keys(queue: t.Optional['Queue'] = None, connection: t.Optional['Redis'] = None) -> t.Set[t.Any]:
def get_keys(queue: Optional['Queue'] = None, connection: Optional['Redis'] = None) -> Set[Any]:
"""Returns a list of worker keys for a given queue.
Args:
queue (t.Optional[&#39;Queue&#39;], optional): The Queue. Defaults to None.
connection (t.Optional[&#39;Redis&#39;], optional): The Redis Connection. Defaults to None.
queue (Optional[&#39;Queue&#39;], optional): The Queue. Defaults to None.
connection (Optional[&#39;Redis&#39;], optional): The Redis Connection. Defaults to None.
Raises:
ValueError: If no Queue or Connection is provided.

Loading…
Cancel
Save