* Gitignore Venv + VScode

* Add Typings, Add Test to Makefile

* Fix, More typing, Redis Pipeline specific type

* More types

* Fix 3.7- Typing Compat, Add Tox Envs, Tests Dockerfile

* fix listindex error (#1700)

* More docstrings

* More Types

* Fix Typo on Dependency

* Last Types

Co-authored-by: Burak Yılmaz <46003469+yilmaz-burak@users.noreply.github.com>
main
lowercase00 2 years ago committed by GitHub
parent 48e821ecd0
commit 375ace1747
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.5", "3.6", "3.7", "3.8.3", "3.9", "3.10"]
python-version: ["3.6", "3.7", "3.8.3", "3.9", "3.10"]
redis-version: [3, 4, 5, 6, 7]
redis-py-version: [3.5.0]
@ -49,7 +49,7 @@ jobs:
strategy:
matrix:
python-version: ["3.5", "3.6", "3.7", "3.8.3", "3.9", "3.10"]
python-version: ["3.6", "3.7", "3.8.3", "3.9", "3.10"]
redis-version: [3, 4, 5, 6, 7]
steps:
@ -68,7 +68,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install git+https://github.com/andymccurdy/redis-py
pip install git+https://github.com/redis/redis-py
pip install git+https://github.com/pallets/click
pip install -r dev-requirements.txt
pip install -e .

2
.gitignore vendored

@ -20,3 +20,5 @@ Vagrantfile
Gemfile
Gemfile.lock
_site/
.venv/
.vscode/

@ -4,6 +4,9 @@ all:
clean:
rm -rf build/ dist/
test:
docker build -f tests/Dockerfile . -t rqtest && docker run -it --rm rqtest
release: clean
# Check if latest tag is the current head we're releasing
echo "Latest tag = $$(git tag | sort -nr | head -n1)"

@ -1,6 +1,11 @@
import json
import os
import signal
import typing as t
if t.TYPE_CHECKING:
from redis import Redis
from .worker import Worker
from rq.exceptions import InvalidJobOperation
from rq.job import Job
@ -9,39 +14,74 @@ from rq.job import Job
PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s'
def send_command(connection, worker_name, command, **kwargs):
"""Use connection' pubsub mechanism to send a command"""
def send_command(connection: 'Redis', worker_name: str, command, **kwargs):
"""
Use connection' pubsub mechanism to send a command
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
payload = {'command': command}
if kwargs:
payload.update(kwargs)
connection.publish(PUBSUB_CHANNEL_TEMPLATE % worker_name, json.dumps(payload))
def parse_payload(payload):
"""Returns a dict of command data"""
def parse_payload(payload: t.Dict[t.Any, t.Any]) -> t.Dict[t.Any, t.Any]:
"""
Returns a dict of command data
Args:
payload (dict): Parses the payload dict.
"""
return json.loads(payload.get('data').decode())
def send_shutdown_command(connection, worker_name):
"""Send shutdown command"""
def send_shutdown_command(connection: 'Redis', worker_name: str):
"""
Sends a shutdown command to the pubsub topic.
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'shutdown')
def send_kill_horse_command(connection, worker_name):
"""Tell worker to kill it's horse"""
def send_kill_horse_command(connection: 'Redis', worker_name: str):
"""
Tell worker to kill it's horse
Args:
connection (Redis): A Redis Connection
worker_name (str): The Job ID
"""
send_command(connection, worker_name, 'kill-horse')
def send_stop_job_command(connection, job_id, serializer=None):
"""Instruct a worker to stop a job"""
def send_stop_job_command(connection: 'Redis', job_id: str, serializer=None):
"""
Instruct a worker to stop a job
Args:
connection (Redis): A Redis Connection
job_id (str): The Job ID
serializer (): The serializer
"""
job = Job.fetch(job_id, connection=connection, serializer=serializer)
if not job.worker_name:
raise InvalidJobOperation('Job is not currently executing')
send_command(connection, job.worker_name, 'stop-job', job_id=job_id)
def handle_command(worker, payload):
"""Parses payload and routes commands"""
def handle_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
"""Parses payload and routes commands
Args:
worker (Worker): The worker to use
payload (t.Dict[t.Any, t.Any]): The Payload
"""
if payload['command'] == 'stop-job':
handle_stop_job_command(worker, payload)
elif payload['command'] == 'shutdown':
@ -50,15 +90,26 @@ def handle_command(worker, payload):
handle_kill_worker_command(worker, payload)
def handle_shutdown_command(worker):
"""Perform shutdown command"""
def handle_shutdown_command(worker: 'Worker'):
"""Perform shutdown command.
Args:
worker (Worker): The worker to use.
"""
worker.log.info('Received shutdown command, sending SIGINT signal.')
pid = os.getpid()
os.kill(pid, signal.SIGINT)
def handle_kill_worker_command(worker, payload):
"""Stops work horse"""
def handle_kill_worker_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
"""
Stops work horse
Args:
worker (Worker): The worker to stop
payload (t.Dict[t.Any, t.Any]): The payload.
"""
worker.log.info('Received kill horse command.')
if worker.horse_pid:
worker.log.info('Kiling horse...')
@ -67,8 +118,13 @@ def handle_kill_worker_command(worker, payload):
worker.log.info('Worker is not working, kill horse command ignored')
def handle_stop_job_command(worker, payload):
"""Handles stop job command"""
def handle_stop_job_command(worker: 'Worker', payload: t.Dict[t.Any, t.Any]):
"""Handles stop job command.
Args:
worker (Worker): The worker to use
payload (t.Dict[t.Any, t.Any]): The payload.
"""
job_id = payload.get('job_id')
worker.log.debug('Received command to stop job %s', job_id)
if job_id and worker.get_current_job_id() == job_id:

@ -1,5 +1,5 @@
from contextlib import contextmanager
import typing as t
from redis import Redis
from .local import LocalStack, release_local
@ -10,7 +10,7 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection=None): # noqa
def Connection(connection: t.Optional['Redis'] = None): # noqa
if connection is None:
connection = Redis()
push_connection(connection)
@ -23,19 +23,30 @@ def Connection(connection=None): # noqa
'Check your Redis connection setup.'
def push_connection(redis):
"""Pushes the given connection on the stack."""
def push_connection(redis: 'Redis'):
"""
Pushes the given connection on the stack.
Args:
redis (Redis): A Redis connection
"""
_connection_stack.push(redis)
def pop_connection():
"""Pops the topmost connection from the stack."""
"""
Pops the topmost connection from the stack.
"""
return _connection_stack.pop()
def use_connection(redis=None):
"""Clears the stack and uses the given connection. Protects against mixed
def use_connection(redis: t.Optional['Redis'] = None):
"""
Clears the stack and uses the given connection. Protects against mixed
use of use_connection() and stacked connection contexts.
Args:
redis (t.Optional[Redis], optional): A Redis Connection. Defaults to None.
"""
assert len(_connection_stack) <= 1, \
'You should not mix Connection contexts with use_connection()'
@ -47,16 +58,28 @@ def use_connection(redis=None):
def get_current_connection():
"""Returns the current Redis connection (i.e. the topmost on the
"""
Returns the current Redis connection (i.e. the topmost on the
connection stack).
"""
return _connection_stack.top
def resolve_connection(connection=None):
"""Convenience function to resolve the given or the current connection.
def resolve_connection(connection: t.Optional['Redis'] = None) -> 'Redis':
"""
Convenience function to resolve the given or the current connection.
Raises an exception if it cannot resolve a connection now.
Args:
connection (t.Optional[Redis], optional): A Redis connection. Defaults to None.
Raises:
NoRedisConnectionException: If connection couldn't be resolved.
Returns:
Redis: A Redis Connection
"""
if connection is not None:
return connection

@ -1,4 +1,9 @@
from functools import wraps
import typing as t
if t.TYPE_CHECKING:
from redis import Redis
from .job import Retry
from rq.compat import string_types
@ -10,21 +15,25 @@ from .utils import backend_class
class job: # noqa
queue_class = Queue
def __init__(self, queue, connection=None, timeout=None,
def __init__(self, queue: 'Queue', connection: t.Optional['Redis'] = None, timeout=None,
result_ttl=DEFAULT_RESULT_TTL, ttl=None,
queue_class=None, depends_on=None, at_front=None, meta=None,
description=None, failure_ttl=None, retry=None, on_failure=None,
queue_class=None, depends_on: t.Optional[t.List[t.Any]] = None, at_front: t.Optional[bool] = None,
meta=None, description=None, failure_ttl=None, retry: t.Optional['Retry'] = None, on_failure=None,
on_success=None):
"""A decorator that adds a ``delay`` method to the decorated function,
"""
A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string
denoting the queue name. For example:
denoting the queue name. For example::
@job(queue='default')
def simple_add(x, y):
return x + y
..codeblock:python::
simple_add.delay(1, 2) # Puts simple_add function into queue
>>> @job(queue='default')
>>> def simple_add(x, y):
>>> return x + y
>>> ...
>>> # Puts `simple_add` function into queue
>>> simple_add.delay(1, 2)
"""
self.queue = queue
self.queue_class = backend_class(self, 'queue_class', override=queue_class)

@ -10,7 +10,7 @@ def do_nothing():
pass
def sleep(secs):
def sleep(secs: int):
time.sleep(secs)
@ -23,7 +23,7 @@ def div_by_zero():
1 / 0
def fib(n):
def fib(n: int):
if n <= 1:
return 1
else:

@ -3,16 +3,21 @@ import json
import pickle
import warnings
import zlib
import typing as t
import asyncio
from collections.abc import Iterable
from datetime import datetime, timedelta, timezone
from enum import Enum
from functools import partial
from uuid import uuid4
from redis import WatchError
if t.TYPE_CHECKING:
from rq.queue import Queue
from redis import Redis
from redis.client import Pipeline
from rq.compat import as_text, decode_redis_hash, string_types
from .connections import resolve_connection
from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
@ -39,18 +44,18 @@ class JobStatus(str, Enum):
class Dependency:
def __init__(self, jobs, allow_failure: bool = False, enqueue_at_front: bool = False):
jobs = ensure_list(jobs)
def __init__(self, jobs: t.List[t.Union['Job', str]], allow_failure: bool = False, enqueue_at_front: bool = False):
dependent_jobs = ensure_list(jobs)
if not all(
isinstance(job, Job) or isinstance(job, str)
for job in jobs
for job in dependent_jobs
if job
):
raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
elif len(jobs) < 1:
elif len(dependent_jobs) < 1:
raise ValueError("jobs: cannot be empty.")
self.dependencies = jobs
self.dependencies = dependent_jobs
self.allow_failure = allow_failure
self.enqueue_at_front = enqueue_at_front
@ -60,14 +65,14 @@ class Dependency:
UNEVALUATED = object()
def cancel_job(job_id, connection=None, serializer=None, enqueue_dependents=False):
def cancel_job(job_id: str, connection: t.Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
"""Cancels the job with the given job ID, preventing execution. Discards
any job info (i.e. it can't be requeued later).
"""
Job.fetch(job_id, connection=connection, serializer=serializer).cancel(enqueue_dependents=enqueue_dependents)
def get_current_job(connection=None, job_class=None):
def get_current_job(connection: t.Optional['Redis'] = None, job_class: t.Optional['Job'] = None):
"""Returns the Job instance that is currently being executed. If this
function is invoked from outside a job context, None is returned.
"""
@ -77,7 +82,7 @@ def get_current_job(connection=None, job_class=None):
return _job_stack.top
def requeue_job(job_id, connection, serializer=None):
def requeue_job(job_id: str, connection: 'Redis', serializer=None):
job = Job.fetch(job_id, connection=connection, serializer=serializer)
return job.requeue()
@ -89,10 +94,10 @@ class Job:
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
def create(cls, func: t.Callable[..., t.Any], args=None, kwargs=None, connection: t.Optional['Redis'] = None,
result_ttl=None, ttl=None, status=None, description=None,
depends_on=None, timeout=None, id=None, origin=None, meta=None,
failure_ttl=None, serializer=None, *, on_success=None, on_failure=None):
failure_ttl=None, serializer=None, *, on_success=None, on_failure=None) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -171,18 +176,18 @@ class Job:
return q.get_job_position(self._id)
return None
def get_status(self, refresh=True):
def get_status(self, refresh: bool = True) -> str:
if refresh:
self._status = as_text(self.connection.hget(self.key, 'status'))
return self._status
def set_status(self, status, pipeline=None):
def set_status(self, status: str, pipeline: t.Optional['Pipeline'] = None):
self._status = status
connection = pipeline if pipeline is not None else self.connection
connection: 'Redis' = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'status', self._status)
def get_meta(self, refresh=True):
def get_meta(self, refresh: bool = True):
if refresh:
meta = self.connection.hget(self.key, 'meta')
self.meta = self.serializer.loads(meta) if meta else {}
@ -190,35 +195,35 @@ class Job:
return self.meta
@property
def is_finished(self):
def is_finished(self) -> bool:
return self.get_status() == JobStatus.FINISHED
@property
def is_queued(self):
def is_queued(self) -> bool:
return self.get_status() == JobStatus.QUEUED
@property
def is_failed(self):
def is_failed(self) -> bool:
return self.get_status() == JobStatus.FAILED
@property
def is_started(self):
def is_started(self) -> bool:
return self.get_status() == JobStatus.STARTED
@property
def is_deferred(self):
def is_deferred(self) -> bool:
return self.get_status() == JobStatus.DEFERRED
@property
def is_canceled(self):
def is_canceled(self) -> bool:
return self.get_status() == JobStatus.CANCELED
@property
def is_scheduled(self):
def is_scheduled(self) -> bool:
return self.get_status() == JobStatus.SCHEDULED
@property
def is_stopped(self):
def is_stopped(self) -> bool:
return self.get_status() == JobStatus.STOPPED
@property
@ -230,7 +235,7 @@ class Job:
return self._dependency_ids[0]
@property
def dependency(self):
def dependency(self) -> t.Optional['Job']:
"""Returns a job's first dependency. To avoid repeated Redis fetches, we cache
job.dependency as job._dependency.
"""
@ -243,7 +248,7 @@ class Job:
return job
@property
def dependent_ids(self):
def dependent_ids(self) -> t.List[str]:
"""Returns a list of ids of jobs whose execution depends on this
job's successful execution."""
return list(map(as_text, self.connection.smembers(self.dependents_key)))
@ -358,13 +363,13 @@ class Job:
self._data = UNEVALUATED
@classmethod
def exists(cls, job_id, connection=None):
def exists(cls, job_id: str, connection: t.Optional['Redis'] = None) -> int:
"""Returns whether a job hash exists for the given job ID."""
conn = resolve_connection(connection)
return conn.exists(cls.key_for(job_id))
@classmethod
def fetch(cls, id, connection=None, serializer=None):
def fetch(cls, id: str, connection: t.Optional['Redis'] = None, serializer=None) -> 'Job':
"""Fetches a persisted job from its corresponding Redis key and
instantiates it.
"""
@ -373,7 +378,7 @@ class Job:
return job
@classmethod
def fetch_many(cls, job_ids, connection, serializer=None):
def fetch_many(cls, job_ids: t.List[str], connection: 'Redis', serializer=None):
"""
Bulk version of Job.fetch
@ -385,7 +390,7 @@ class Job:
pipeline.hgetall(cls.key_for(job_id))
results = pipeline.execute()
jobs = []
jobs: t.List[t.Optional['Job']] = []
for i, job_id in enumerate(job_ids):
if results[i]:
job = cls(job_id, connection=connection, serializer=serializer)
@ -396,7 +401,7 @@ class Job:
return jobs
def __init__(self, id=None, connection=None, serializer=None):
def __init__(self, id: str = None, connection: t.Optional['Redis'] = None, serializer=None):
self.connection = resolve_connection(connection)
self._id = id
self.created_at = utcnow()
@ -411,27 +416,26 @@ class Job:
self._failure_callback = UNEVALUATED
self.description = None
self.origin = None
self.enqueued_at = None
self.started_at = None
self.ended_at = None
self.enqueued_at: t.Optional[datetime] = None
self.started_at: t.Optional[datetime] = None
self.ended_at: t.Optional[datetime] = None
self._result = None
self.exc_info = None
self.timeout = None
self.result_ttl = None
self.failure_ttl = None
self.ttl = None
self.worker_name = None
self.result_ttl: t.Optional[int] = None
self.failure_ttl: t.Optional[int] = None
self.ttl: t.Optional[int] = None
self.worker_name: t.Optional[str] = None
self._status = None
self._dependency_ids = []
self._dependency_ids: t.List[str] = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
# retry_intervals is a list of int e.g [60, 120, 240]
self.retry_intervals = None
self.retry_intervals: t.Optional[t.List[int]] = None
self.redis_server_version = None
self.last_heartbeat = None
self.allow_dependency_failures = None
self.enqueue_at_front = None
self.last_heartbeat: t.Optional[datetime] = None
self.allow_dependency_failures: t.Optional[bool] = None
self.enqueue_at_front: t.Optional[bool] = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -443,7 +447,6 @@ class Job:
self.id,
self.description)
# Job equality
def __eq__(self, other): # noqa
return isinstance(other, self.__class__) and self.id == other.id
@ -459,13 +462,13 @@ class Job:
self._id = str(uuid4())
return self._id
def set_id(self, value):
def set_id(self, value: str):
"""Sets a job ID for the given job."""
if not isinstance(value, string_types):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
def heartbeat(self, timestamp, ttl, pipeline=None, xx=False):
def heartbeat(self, timestamp: datetime, ttl: int, pipeline: t.Optional['Pipeline'] = None, xx: bool = False):
self.last_heartbeat = timestamp
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
@ -474,12 +477,12 @@ class Job:
id = property(get_id, set_id)
@classmethod
def key_for(cls, job_id):
def key_for(cls, job_id: str):
"""The Redis key that is used to store job hash under."""
return (cls.redis_job_namespace_prefix + job_id).encode('utf-8')
@classmethod
def dependents_key_for(cls, job_id):
def dependents_key_for(cls, job_id: str):
"""The Redis key that is used to store job dependents hash under."""
return '{0}{1}:dependents'.format(cls.redis_job_namespace_prefix, job_id)
@ -497,7 +500,7 @@ class Job:
def dependencies_key(self):
return '{0}:{1}:dependencies'.format(self.redis_job_namespace_prefix, self.id)
def fetch_dependencies(self, watch=False, pipeline=None):
def fetch_dependencies(self, watch: bool = False, pipeline: t.Optional['Pipeline'] = None):
"""
Fetch all of a job's dependencies. If a pipeline is supplied, and
watch is true, then set WATCH on all the keys of all dependencies.
@ -617,7 +620,7 @@ class Job:
raise NoSuchJobError('No such job: {0}'.format(self.key))
self.restore(data)
def to_dict(self, include_meta=True):
def to_dict(self, include_meta: bool = True) -> dict:
"""
Returns a serialization of the current job instance
@ -678,7 +681,7 @@ class Job:
return obj
def save(self, pipeline=None, include_meta=True):
def save(self, pipeline: t.Optional['Pipeline'] = None, include_meta: bool = True):
"""
Dumps the current job instance to its corresponding Redis key.
@ -710,7 +713,7 @@ class Job:
meta = self.serializer.dumps(self.meta)
self.connection.hset(self.key, 'meta', meta)
def cancel(self, pipeline=None, enqueue_dependents=False):
def cancel(self, pipeline: t.Optional['Pipeline'] = None, enqueue_dependents: bool = False):
"""Cancels the given job, which will prevent the job from ever being
ran (or inspected).
@ -766,11 +769,11 @@ class Job:
# handle it
raise
def requeue(self, at_front=False):
def requeue(self, at_front: bool = False):
"""Requeues job."""
return self.failed_job_registry.requeue(self, at_front=at_front)
def _remove_from_registries(self, pipeline=None, remove_from_queue=True):
def _remove_from_registries(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True):
if remove_from_queue:
from .queue import Queue
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
@ -818,7 +821,7 @@ class Job:
serializer=self.serializer)
registry.remove(self, pipeline=pipeline)
def delete(self, pipeline=None, remove_from_queue=True,
def delete(self, pipeline: t.Optional['Pipeline'] = None, remove_from_queue: bool = True,
delete_dependents=False):
"""Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well."""
@ -832,7 +835,7 @@ class Job:
connection.delete(self.key, self.dependents_key, self.dependencies_key)
def delete_dependents(self, pipeline=None):
def delete_dependents(self, pipeline: t.Optional['Pipeline'] = None):
"""Delete jobs depending on this job."""
connection = pipeline if pipeline is not None else self.connection
for dependent_id in self.dependent_ids:
@ -856,7 +859,7 @@ class Job:
assert self is _job_stack.pop()
return self._result
def prepare_for_execution(self, worker_name, pipeline):
def prepare_for_execution(self, worker_name: str, pipeline: 'Pipeline'):
"""Set job metadata before execution begins"""
self.worker_name = worker_name
self.last_heartbeat = utcnow()
@ -881,14 +884,14 @@ class Job:
return coro_result
return result
def get_ttl(self, default_ttl=None):
def get_ttl(self, default_ttl: t.Optional[int] = None):
"""Returns ttl for a job that determines how long a job will be
persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl=None):
def get_result_ttl(self, default_ttl: t.Optional[int] = None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
@ -902,7 +905,8 @@ class Job:
"""
return get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True):
def cleanup(self, ttl: t.Optional[int] = None, pipeline: t.Optional['Pipeline'] = None,
remove_from_queue: bool = True):
"""Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its
result depends on the value of ttl:
@ -946,7 +950,7 @@ class Job:
index = max(number_of_intervals - self.retries_left, 0)
return self.retry_intervals[index]
def retry(self, queue, pipeline):
def retry(self, queue: 'Queue', pipeline: 'Pipeline'):
"""Requeue or schedule this job for execution"""
retry_interval = self.get_retry_interval()
self.retries_left = self.retries_left - 1
@ -957,7 +961,7 @@ class Job:
else:
queue.enqueue_job(self, pipeline=pipeline)
def register_dependency(self, pipeline=None):
def register_dependency(self, pipeline: t.Optional['Pipeline'] = None):
"""Jobs may have dependencies. Jobs are enqueued only if the jobs they
depend on are successfully performed. We record this relation as
a reverse dependency (a Redis set), with a key that looks something
@ -989,7 +993,8 @@ class Job:
return [Job.key_for(_id.decode())
for _id in dependencies]
def dependencies_are_met(self, parent_job=None, pipeline=None, exclude_job_id=None):
def dependencies_are_met(self, parent_job: t.Optional['Job'] = None,
pipeline: t.Optional['Pipeline'] = None, exclude_job_id: str = None):
"""Returns a boolean indicating if all of this job's dependencies are _FINISHED_
If a pipeline is passed, all dependencies are WATCHed.
@ -1009,7 +1014,7 @@ class Job:
for _id in connection.smembers(self.dependencies_key)}
if exclude_job_id:
dependencies_ids.discard(exclude_job_id)
dependencies_ids.discard(exclude_job_id)
if parent_job.id == exclude_job_id:
parent_job = None
@ -1050,7 +1055,7 @@ _job_stack = LocalStack()
class Retry:
def __init__(self, max, interval=0):
def __init__(self, max, interval: int = 0):
"""`interval` can be a positive number or a list of ints"""
super().__init__()
if max < 1:

@ -1,12 +1,16 @@
import uuid
import sys
import warnings
import typing as t
from collections import namedtuple
from datetime import datetime, timezone
from redis import WatchError
if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .compat import as_text, string_types, total_ordering
from .connections import resolve_connection
from .defaults import DEFAULT_RESULT_TTL
@ -33,13 +37,13 @@ class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout"
@total_ordering
class Queue:
job_class = Job
DEFAULT_TIMEOUT = 180 # Default timeout seconds.
redis_queue_namespace_prefix = 'rq:queue:'
redis_queues_keys = 'rq:queues'
job_class: t.Type['Job'] = Job
DEFAULT_TIMEOUT: int = 180 # Default timeout seconds.
redis_queue_namespace_prefix: str = 'rq:queue:'
redis_queues_keys: str = 'rq:queues'
@classmethod
def all(cls, connection=None, job_class=None, serializer=None):
def all(cls, connection: t.Optional['Redis'] = None, job_class: t.Optional[t.Type['Job']] = None, serializer=None):
"""Returns an iterable of all Queues.
"""
connection = resolve_connection(connection)
@ -54,7 +58,8 @@ class Queue:
if rq_key]
@classmethod
def from_queue_key(cls, queue_key, connection=None, job_class=None, serializer=None):
def from_queue_key(cls, queue_key, connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, serializer=None):
"""Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys.
@ -65,7 +70,7 @@ class Queue:
name = queue_key[len(prefix):]
return cls(name, connection=connection, job_class=job_class, serializer=serializer)
def __init__(self, name='default', default_timeout=None, connection=None,
def __init__(self, name='default', default_timeout=None, connection: t.Optional['Redis'] = None,
is_async=True, job_class=None, serializer=None, **kwargs):
self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix
@ -143,7 +148,7 @@ class Queue:
script = self.connection.register_script(script)
return script(keys=[self.key])
def delete(self, delete_jobs=True):
def delete(self, delete_jobs: bool = True):
"""Deletes the queue. If delete_jobs is true it removes all the associated messages on the queue first."""
if delete_jobs:
self.empty()
@ -162,7 +167,7 @@ class Queue:
"""Returns whether the current queue is async."""
return bool(self._is_async)
def fetch_job(self, job_id):
def fetch_job(self, job_id: str):
try:
job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
except NoSuchJobError:
@ -171,7 +176,7 @@ class Queue:
if job.origin == self.name:
return job
def get_job_position(self, job_or_id):
def get_job_position(self, job_or_id: t.Union[Job, str]):
"""Returns the position of a job within the queue
Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of
@ -192,7 +197,7 @@ class Queue:
return self.job_ids.index(job_id)
return None
def get_job_ids(self, offset=0, length=-1):
def get_job_ids(self, offset: int = 0, length: int = -1):
"""Returns a slice of job IDs in the queue."""
start = offset
if length >= 0:
@ -202,23 +207,23 @@ class Queue:
return [as_text(job_id) for job_id in
self.connection.lrange(self.key, start, end)]
def get_jobs(self, offset=0, length=-1):
def get_jobs(self, offset: int = 0, length: int = -1):
"""Returns a slice of jobs in the queue."""
job_ids = self.get_job_ids(offset, length)
return compact([self.fetch_job(job_id) for job_id in job_ids])
@property
def job_ids(self):
def job_ids(self) -> t.List[str]:
"""Returns a list of all job IDS in the queue."""
return self.get_job_ids()
@property
def jobs(self):
def jobs(self) -> t.List['Job']:
"""Returns a list of all (valid) jobs in the queue."""
return self.get_jobs()
@property
def count(self):
def count(self) -> int:
"""Returns a count of all messages in the queue."""
return self.connection.llen(self.key)
@ -259,7 +264,7 @@ class Queue:
from rq.registry import CanceledJobRegistry
return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id, pipeline=None):
def remove(self, job_or_id: t.Union['Job', str], pipeline: t.Optional['Pipeline'] = None):
"""Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
@ -283,7 +288,7 @@ class Queue:
if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id, pipeline=None, at_front=False):
def push_job_id(self, job_id: str, pipeline: t.Optional['Pipeline'] = None, at_front=False):
"""Pushes a job ID on the corresponding Redis queue.
'at_front' allows you to push the job onto the front instead of the back of the queue"""
connection = pipeline if pipeline is not None else self.connection
@ -292,11 +297,11 @@ class Queue:
else:
connection.rpush(self.key, job_id)
def create_job(self, func, args=None, kwargs=None, timeout=None,
def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, depends_on=None, job_id=None,
meta=None, status=JobStatus.QUEUED, retry=None, *,
on_success=None, on_failure=None):
on_success=None, on_failure=None) -> Job:
"""Creates a job based on parameters given."""
timeout = parse_timeout(timeout)
@ -327,11 +332,7 @@ class Queue:
return job
def setup_dependencies(
self,
job,
pipeline=None
):
def setup_dependencies(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None):
# If a _dependent_ job depends on any unfinished job, register all the
# _dependent_ job's dependencies instead of enqueueing it.
#
@ -383,10 +384,10 @@ class Queue:
pipeline.multi() # Ensure pipeline in multi mode before returning to caller
return job
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
def enqueue_call(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None, description=None,
depends_on=None, job_id=None, at_front=False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None):
depends_on=None, job_id: str = None, at_front: bool = False, meta=None,
retry=None, on_success=None, on_failure=None, pipeline=None) -> Job:
"""Creates a job to represent the delayed function call and enqueues it.
It is much like `.enqueue()`, except that it takes the function's args
@ -414,7 +415,7 @@ class Queue:
def prepare_data(func, args=None, kwargs=None, timeout=None,
result_ttl=None, ttl=None, failure_ttl=None,
description=None, job_id=None,
at_front=False, meta=None, retry=None, on_success=None, on_failure=None):
at_front=False, meta=None, retry=None, on_success=None, on_failure=None) -> EnqueueData:
# Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
# And can keep this logic within EnqueueData
return EnqueueData(
@ -424,11 +425,7 @@ class Queue:
at_front, meta, retry, on_success, on_failure
)
def enqueue_many(
self,
job_datas,
pipeline=None
):
def enqueue_many(self, job_datas, pipeline: t.Optional['Pipeline'] = None) -> t.List[Job]:
"""
Creates multiple jobs (created via `Queue.prepare_data` calls)
to represent the delayed function calls and enqueues them.
@ -456,7 +453,7 @@ class Queue:
pipe.execute()
return jobs
def run_job(self, job):
def run_job(self, job: 'Job') -> Job:
job.perform()
job.set_status(JobStatus.FINISHED)
job.save(include_meta=False)
@ -464,7 +461,7 @@ class Queue:
return job
@classmethod
def parse_args(cls, f, *args, **kwargs):
def parse_args(cls, f: t.Union[t.Callable[..., t.Any], str], *args, **kwargs):
"""
Parses arguments passed to `queue.enqueue()` and `queue.enqueue_at()`
@ -519,7 +516,7 @@ class Queue:
pipeline=pipeline
)
def enqueue_at(self, datetime, f, *args, **kwargs):
def enqueue_at(self, datetime: datetime, f, *args, **kwargs):
"""Schedules a job to be enqueued at specified time"""
(f, timeout, description, result_ttl, ttl, failure_ttl,
@ -533,7 +530,7 @@ class Queue:
return self.schedule_job(job, datetime, pipeline=pipeline)
def schedule_job(self, job, datetime, pipeline=None):
def schedule_job(self, job: 'Job', datetime: datetime, pipeline: t.Optional['Pipeline'] = None):
"""Puts job on ScheduledJobRegistry"""
from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self)
@ -553,7 +550,7 @@ class Queue:
return self.enqueue_at(datetime.now(timezone.utc) + time_delta,
func, *args, **kwargs)
def enqueue_job(self, job, pipeline=None, at_front=False):
def enqueue_job(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution.
If Queue is instantiated with is_async=False, job is executed immediately.
@ -583,7 +580,7 @@ class Queue:
return job
def run_sync(self, job):
def run_sync(self, job: 'Job') -> 'Job':
with self.connection.pipeline() as pipeline:
job.prepare_for_execution('sync', pipeline)
@ -599,7 +596,7 @@ class Queue:
return job
def enqueue_dependents(self, job, pipeline=None, exclude_job_id=None):
def enqueue_dependents(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, exclude_job_id=None):
"""Enqueues all jobs in the given job's dependents set and clears it.
When called without a pipeline, this method uses WATCH/MULTI/EXEC.
@ -683,7 +680,7 @@ class Queue:
return as_text(self.connection.lpop(self.key))
@classmethod
def lpop(cls, queue_keys, timeout, connection=None):
def lpop(cls, queue_keys, timeout, connection: t.Optional['Redis'] = None):
"""Helper method. Intermediate method to abstract away from some
Redis API details, where LPOP accepts only a single key, whereas BLPOP
accepts multiple. So if we want the non-blocking LPOP, we need to
@ -713,7 +710,8 @@ class Queue:
return None
@classmethod
def dequeue_any(cls, queues, timeout, connection=None, job_class=None, serializer=None):
def dequeue_any(cls, queues, timeout, connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, serializer=None):
"""Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important.

@ -1,8 +1,13 @@
import typing as t
import calendar
from rq.serializers import resolve_serializer
import time
from datetime import datetime, timedelta, timezone
if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .compat import as_text
from .connections import resolve_connection
from .defaults import DEFAULT_FAILURE_TTL
@ -21,8 +26,8 @@ class BaseRegistry:
job_class = Job
key_template = 'rq:registry:{0}'
def __init__(self, name='default', connection=None, job_class=None,
queue=None, serializer=None):
def __init__(self, name='default', connection: t.Optional['Redis'] = None,
job_class: t.Optional[t.Type['Job']] = None, queue=None, serializer=None):
if queue:
self.name = queue.name
self.connection = resolve_connection(queue.connection)
@ -45,10 +50,13 @@ class BaseRegistry:
self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
)
def __contains__(self, item):
def __contains__(self, item: t.Union[str, 'Job']):
"""
Returns a boolean indicating registry contains the given
job instance or job id.
Args:
item (Union[str, Job]): A Job ID or a Job.
"""
job_id = item
if isinstance(item, self.job_class):
@ -61,8 +69,15 @@ class BaseRegistry:
self.cleanup()
return self.connection.zcard(self.key)
def add(self, job, ttl=0, pipeline=None, xx=False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf"""
def add(self, job: 'Job', ttl=0, pipeline: t.Optional['Pipeline'] = None, xx: bool = False):
"""Adds a job to a registry with expiry time of now + ttl, unless it's -1 which is set to +inf
Args:
job (Job): The Job to add
ttl (int, optional): The time to live. Defaults to 0.
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
xx (bool, optional): .... Defaults to False.
"""
score = ttl if ttl < 0 else current_timestamp() + ttl
if score == -1:
score = '+inf'
@ -71,8 +86,14 @@ class BaseRegistry:
return self.connection.zadd(self.key, {job.id: score}, xx=xx)
def remove(self, job, pipeline=None, delete_job=False):
"""Removes job from registry and deletes it if `delete_job == True`"""
def remove(self, job: 'Job', pipeline: t.Optional['Pipeline'] = None, delete_job: bool = False):
"""Removes job from registry and deletes it if `delete_job == True`
Args:
job (Job): The Job to remove from the registry
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
delete_job (bool, optional): If should delete the job.. Defaults to False.
"""
connection = pipeline if pipeline is not None else self.connection
job_id = job.id if isinstance(job, self.job_class) else job
result = connection.zrem(self.key, job_id)
@ -84,7 +105,7 @@ class BaseRegistry:
job_instance.delete()
return result
def get_expired_job_ids(self, timestamp=None):
def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None):
"""Returns job ids whose score are less than current timestamp.
Returns ids for jobs with an expiry time earlier than timestamp,
@ -95,7 +116,7 @@ class BaseRegistry:
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score)]
def get_job_ids(self, start=0, end=-1):
def get_job_ids(self, start: int = 0, end: int = -1):
"""Returns list of all job ids."""
self.cleanup()
return [as_text(job_id) for job_id in
@ -105,13 +126,28 @@ class BaseRegistry:
"""Returns Queue object associated with this registry."""
return Queue(self.name, connection=self.connection, serializer=self.serializer)
def get_expiration_time(self, job):
"""Returns job's expiration time."""
def get_expiration_time(self, job: 'Job') -> datetime:
"""Returns job's expiration time.
Args:
job (Job): The Job to get the expiration
"""
score = self.connection.zscore(self.key, job.id)
return datetime.utcfromtimestamp(score)
def requeue(self, job_or_id, at_front=False):
"""Requeues the job with the given job ID."""
def requeue(self, job_or_id: t.Union['Job', str], at_front: bool = False) -> 'Job':
"""Requeues the job with the given job ID.
Args:
job_or_id (t.Union[&#39;Job&#39;, str]): The Job or the Job ID
at_front (bool, optional): If the Job should be put at the front of the queue. Defaults to False.
Raises:
InvalidJobOperation: If nothing is returned from the `ZREM` operation.
Returns:
Job: The Requeued Job.
"""
if isinstance(job_or_id, self.job_class):
job = job_or_id
serializer = job.serializer
@ -146,12 +182,15 @@ class StartedJobRegistry(BaseRegistry):
"""
key_template = 'rq:wip:{0}'
def cleanup(self, timestamp=None):
def cleanup(self, timestamp: t.Optional[datetime] = None):
"""Remove expired jobs from registry and add them to FailedJobRegistry.
Removes jobs with an expiry time earlier than timestamp, specified as
seconds since the Unix epoch. timestamp defaults to call time if
unspecified. Removed jobs are added to the global failed job queue.
Args:
timestamp (datetime): The datetime to use as the limit.
"""
score = timestamp if timestamp is not None else current_timestamp()
job_ids = self.get_expired_job_ids(score)
@ -194,7 +233,7 @@ class FinishedJobRegistry(BaseRegistry):
"""
key_template = 'rq:finished:{0}'
def cleanup(self, timestamp=None):
def cleanup(self, timestamp: t.Optional[datetime] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -211,7 +250,7 @@ class FailedJobRegistry(BaseRegistry):
"""
key_template = 'rq:failed:{0}'
def cleanup(self, timestamp=None):
def cleanup(self, timestamp: t.Optional[datetime] = None):
"""Remove expired jobs from registry.
Removes jobs with an expiry time earlier than timestamp, specified as
@ -221,7 +260,7 @@ class FailedJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score)
def add(self, job, ttl=None, exc_string='', pipeline=None):
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: t.Optional['Pipeline'] = None):
"""
Adds a job to a registry with expiry time of now + ttl.
`ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
@ -270,7 +309,7 @@ class ScheduledJobRegistry(BaseRegistry):
# make sense in this context
self.get_jobs_to_enqueue = self.get_expired_job_ids
def schedule(self, job, scheduled_datetime, pipeline=None):
def schedule(self, job: 'Job', scheduled_datetime, pipeline: t.Optional['Pipeline'] = None):
"""
Adds job to registry, scored by its execution time (in UTC).
If datetime has no tzinfo, it will assume localtimezone.
@ -295,19 +334,19 @@ class ScheduledJobRegistry(BaseRegistry):
implemented in BaseRegistry."""
pass
def remove_jobs(self, timestamp=None, pipeline=None):
def remove_jobs(self, timestamp: t.Optional[datetime] = None, pipeline: t.Optional['Pipeline'] = None):
"""Remove jobs whose timestamp is in the past from registry."""
connection = pipeline if pipeline is not None else self.connection
score = timestamp if timestamp is not None else current_timestamp()
return connection.zremrangebyscore(self.key, 0, score)
def get_jobs_to_schedule(self, timestamp=None, chunk_size=1000):
def get_jobs_to_schedule(self, timestamp: t.Optional[datetime] = None, chunk_size: int = 1000):
"""Remove jobs whose timestamp is in the past from registry."""
score = timestamp if timestamp is not None else current_timestamp()
return [as_text(job_id) for job_id in
self.connection.zrangebyscore(self.key, 0, score, start=0, num=chunk_size)]
def get_scheduled_time(self, job_or_id):
def get_scheduled_time(self, job_or_id: t.Union['Job', str]):
"""Returns datetime (UTC) at which job is scheduled to be enqueued"""
if isinstance(job_or_id, self.job_class):
job_id = job_or_id.id
@ -324,7 +363,7 @@ class ScheduledJobRegistry(BaseRegistry):
class CanceledJobRegistry(BaseRegistry):
key_template = 'rq:canceled:{0}'
def get_expired_job_ids(self, timestamp=None):
def get_expired_job_ids(self, timestamp: t.Optional[datetime] = None):
raise NotImplementedError
def cleanup(self):

@ -122,7 +122,7 @@ class RQScheduler:
return successful_locks
def prepare_registries(self, queue_names=None):
def prepare_registries(self, queue_names: str = None):
"""Prepare scheduled job registries for use"""
self._scheduled_job_registries = []
if not queue_names:
@ -133,7 +133,7 @@ class RQScheduler:
)
@classmethod
def get_locking_key(cls, name):
def get_locking_key(cls, name: str):
"""Returns scheduler key for a given queue name"""
return SCHEDULER_LOCKING_KEY_TEMPLATE % name

@ -21,7 +21,7 @@ class JSONSerializer():
return json.loads(s.decode('utf-8'), *args, **kwargs)
def resolve_serializer(serializer):
def resolve_serializer(serializer: str):
"""This function checks the user defined serializer for ('dumps', 'loads') methods
It returns a default pickle serializer if not found else it returns a MySerializer
The returned serializer objects implement ('dumps', 'loads') methods

@ -1,7 +1,20 @@
import typing as t
if t.TYPE_CHECKING:
from redis import Redis
from rq.worker import Worker
WORKERS_SUSPENDED = 'rq:suspended'
def is_suspended(connection, worker=None):
def is_suspended(connection: 'Redis', worker: t.Optional['Worker'] = None):
"""Checks whether a Worker is suspendeed on a given connection
Args:
connection (Redis): The Redis Connection
worker (t.Optional[Worker], optional): The Worker. Defaults to None.
"""
with connection.pipeline() as pipeline:
if worker is not None:
worker.heartbeat(pipeline=pipeline)
@ -11,14 +24,25 @@ def is_suspended(connection, worker=None):
return pipeline.execute()[-1]
def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
def suspend(connection: 'Redis', ttl: int = None):
"""
Suspends.
TTL of 0 will invalidate right away.
Args:
connection (Redis): The Redis connection to use..
ttl (int): time to live in seconds. Defaults to `None`
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)
def resume(connection):
def resume(connection: 'Redis'):
"""
Resumes.
Args:
connection (Redis): The Redis connection to use..
"""
return connection.delete(WORKERS_SUSPENDED)

@ -11,9 +11,13 @@ import importlib
import logging
import numbers
import sys
import datetime as dt
import typing as t
from collections.abc import Iterable
if t.TYPE_CHECKING:
from redis import Redis
from redis.exceptions import ResponseError
from .compat import as_text, string_types
@ -73,17 +77,18 @@ class _Colorizer:
colorizer = _Colorizer()
def make_colorizer(color):
def make_colorizer(color: str):
"""Creates a function that colorizes text with the given color.
For example:
green = make_colorizer('darkgreen')
red = make_colorizer('red')
For example::
Then, you can use:
..codeblock::python
print "It's either " + green('OK') + ' or ' + red('Oops')
>>> green = make_colorizer('darkgreen')
>>> red = make_colorizer('red')
>>>
>>> # You can then use:
>>> print("It's either " + green('OK') + ' or ' + red('Oops'))
"""
def inner(text):
return colorizer.colorize(color, text)
@ -121,19 +126,31 @@ class ColorizingStreamHandler(logging.StreamHandler):
return message
def import_attribute(name):
"""Return an attribute from a dotted path name (e.g. "path.to.func")."""
def import_attribute(name: str):
"""Returns an attribute from a dotted path name. Example: `path.to.func`.
When the attribute we look for is a staticmethod, module name in its
dotted path is not the last-before-end word
E.g.: package_a.package_b.module_a.ClassA.my_static_method
Thus we remove the bits from the end of the name until we can import it
Sometimes the failure during importing is due to a genuine coding error in the imported module
In this case, the exception is logged as a warning for ease of debugging.
The above logic will apply anyways regardless of the cause of the import error.
Args:
name (str): The name (reference) to the path.
Raises:
ValueError: If no module is found or invalid attribute name.
Returns:
t.Any: An attribute (normally a Callable)
"""
name_bits = name.split('.')
module_name_bits, attribute_bits = name_bits[:-1], [name_bits[-1]]
module = None
# When the attribute we look for is a staticmethod, module name in its
# dotted path is not the last-before-end word
# E.g.: package_a.package_b.module_a.ClassA.my_static_method
# Thus we remove the bits from the end of the name until we can import it
#
# Sometimes the failure during importing is due to a genuine coding error in the imported module
# In this case, the exception is logged as a warning for ease of debugging.
# The above logic will apply anyways regardless of the cause of the import error.
while len(module_name_bits):
try:
module_name = '.'.join(module_name_bits)
@ -168,11 +185,11 @@ def utcnow():
_TIMESTAMP_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
def utcformat(dt):
def utcformat(dt: dt.datetime):
return dt.strftime(as_text(_TIMESTAMP_FORMAT))
def utcparse(string):
def utcparse(string: str):
try:
return datetime.datetime.strptime(string, _TIMESTAMP_FORMAT)
except ValueError:
@ -180,7 +197,7 @@ def utcparse(string):
return datetime.datetime.strptime(string, '%Y-%m-%dT%H:%M:%SZ')
def first(iterable, default=None, key=None):
def first(iterable: t.Iterable, default=None, key=None):
"""
Return first element of `iterable` that evaluates true, else return None
(or an optional default value).
@ -219,12 +236,12 @@ def first(iterable, default=None, key=None):
return default
def is_nonstring_iterable(obj):
def is_nonstring_iterable(obj: t.Any) -> bool:
"""Returns whether the obj is an iterable, but not a string"""
return isinstance(obj, Iterable) and not isinstance(obj, string_types)
def ensure_list(obj):
def ensure_list(obj: t.Any) -> t.List:
"""
When passed an iterable of objects, does nothing, otherwise, it returns
a list with just that object in it.
@ -232,7 +249,7 @@ def ensure_list(obj):
return obj if is_nonstring_iterable(obj) else [obj]
def current_timestamp():
def current_timestamp() -> int:
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())
@ -247,14 +264,14 @@ def backend_class(holder, default_name, override=None):
return override
def str_to_date(date_str):
def str_to_date(date_str: t.Optional[str]) -> t.Union[dt.datetime, t.Any]:
if not date_str:
return
else:
return utcparse(date_str.decode())
def parse_timeout(timeout):
def parse_timeout(timeout: t.Any):
"""Transfer all kinds of timeout format to an integer representing seconds"""
if not isinstance(timeout, numbers.Integral) and timeout is not None:
try:
@ -272,10 +289,13 @@ def parse_timeout(timeout):
return timeout
def get_version(connection):
def get_version(connection: 'Redis'):
"""
Returns tuple of Redis server version.
This function also correctly handles 4 digit redis server versions.
Args:
connection (Redis): The Redis connection.
"""
try:
return tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3])
@ -288,33 +308,55 @@ def ceildiv(a, b):
return -(-a // b)
def split_list(a_list, segment_size):
"""
Splits a list into multiple smaller lists having size `segment_size`
def split_list(a_list: t.List[t.Any], segment_size: int):
"""Splits a list into multiple smaller lists having size `segment_size`
Args:
a_list (t.List[t.Any]): A list to split
segment_size (int): The segment size to split into
Yields:
list: The splitted listed
"""
for i in range(0, len(a_list), segment_size):
yield a_list[i:i + segment_size]
def truncate_long_string(data, max_length=None):
"""Truncate arguments with representation longer than max_length"""
def truncate_long_string(data: str, max_length: t.Optional[int] = None) -> str:
"""Truncate arguments with representation longer than max_length
Args:
data (str): The data to truncate
max_length (t.Optional[int], optional): The max length. Defaults to None.
"""
if max_length is None:
return data
return (data[:max_length] + '...') if len(data) > max_length else data
def get_call_string(func_name, args, kwargs, max_length=None):
"""Returns a string representation of the call, formatted as a regular
def get_call_string(func_name: t.Optional[str], args: t.Any, kwargs: t.Dict[t.Any, t.Any],
max_length: t.Optional[int] = None) -> t.Optional[str]:
"""
Returns a string representation of the call, formatted as a regular
Python function invocation statement. If max_length is not None, truncate
arguments with representation longer than max_length.
Args:
func_name (str): The funtion name
args (t.Any): The function arguments
kwargs (t.Dict[t.Any, t.Any]): The function kwargs
max_length (int, optional): The max length. Defaults to None.
Returns:
str: A String representation of the function call.
"""
if func_name is None:
return None
arg_list = [as_text(truncate_long_string(repr(arg), max_length)) for arg in args]
kwargs = ['{0}={1}'.format(k, as_text(truncate_long_string(repr(v), max_length))) for k, v in kwargs.items()]
arg_list += sorted(kwargs)
list_kwargs = ['{0}={1}'.format(k, as_text(truncate_long_string(repr(v), max_length))) for k, v in kwargs.items()]
arg_list += sorted(list_kwargs)
args = ', '.join(arg_list)
return '{0}({1})'.format(func_name, args)

@ -8,6 +8,11 @@ import sys
import time
import traceback
import warnings
import typing as t
if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from datetime import timedelta
from enum import Enum
@ -60,8 +65,8 @@ class StopRequested(Exception):
pass
def compact(l):
return [x for x in l if x is not None]
def compact(a_list):
return [x for x in a_list if x is not None]
_signames = dict((getattr(signal, signame), signame)
@ -106,7 +111,14 @@ class Worker:
max_connection_wait_time = 60.0
@classmethod
def all(cls, connection=None, job_class=None, queue_class=None, queue=None, serializer=None):
def all(
cls,
connection: t.Optional['Redis'] = None,
job_class: t.Type['Job'] = None,
queue_class: t.Optional[t.Type['Queue']] = None,
queue: t.Optional['Queue'] = None,
serializer=None
) -> t.List['Worker']:
"""Returns an iterable of all Workers.
"""
if queue:
@ -123,18 +135,18 @@ class Worker:
return compact(workers)
@classmethod
def all_keys(cls, connection=None, queue=None):
def all_keys(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None):
return [as_text(key)
for key in get_keys(queue=queue, connection=connection)]
@classmethod
def count(cls, connection=None, queue=None):
def count(cls, connection: t.Optional['Redis'] = None, queue: t.Optional['Queue'] = None):
"""Returns the number of workers by queue or connection"""
return len(get_keys(queue=queue, connection=connection))
@classmethod
def find_by_key(cls, worker_key, connection=None, job_class=None,
queue_class=None, serializer=None):
def find_by_key(cls, worker_key: str, connection: t.Optional['Redis'] = None, job_class: t.Type['Job'] = None,
queue_class: t.Type['Queue'] = None, serializer=None):
"""Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys.
@ -157,13 +169,13 @@ class Worker:
return worker
def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL,
connection=None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class=None,
queue_class=None, log_job_description=True,
def __init__(self, queues, name: t.Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL,
connection: t.Optional['Redis'] = None, exc_handler=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class: t.Type['Job'] = None,
queue_class=None, log_job_description: bool = True,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler=False,
prepare_for_work=True, serializer=None): # noqa
disable_default_exception_handler: bool = False,
prepare_for_work: bool = True, serializer=None): # noqa
if connection is None:
connection = get_current_connection()
self.connection = connection
@ -182,7 +194,7 @@ class Worker:
if isinstance(q, string_types) else q
for q in ensure_list(queues)]
self.name = name or uuid4().hex
self.name: str = name or uuid4().hex
self.queues = queues
self.validate_queues()
self._ordered_queues = self.queues[:]
@ -192,21 +204,21 @@ class Worker:
self.default_worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval
self._state = 'starting'
self._is_horse = False
self._horse_pid = 0
self._stop_requested = False
self._state: str = 'starting'
self._is_horse: bool = False
self._horse_pid: int = 0
self._stop_requested: bool = False
self._stopped_job_id = None
self.log = logger
self.log_job_description = log_job_description
self.last_cleaned_at = None
self.successful_job_count = 0
self.failed_job_count = 0
self.total_working_time = 0
self.current_job_working_time = 0
self.successful_job_count: int = 0
self.failed_job_count: int = 0
self.total_working_time: int = 0
self.current_job_working_time: int = 0
self.birth_date = None
self.scheduler = None
self.scheduler: t.Optional[RQScheduler] = None
self.pubsub = None
self.pubsub_thread = None
@ -368,7 +380,7 @@ class Worker:
if death_timestamp is not None:
return utcparse(as_text(death_timestamp))
def set_state(self, state, pipeline=None):
def set_state(self, state, pipeline: t.Optional['Pipeline'] = None):
self._state = state
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'state', state)
@ -394,12 +406,12 @@ class Worker:
state = property(_get_state, _set_state)
def set_current_job_working_time(self, current_job_working_time, pipeline=None):
def set_current_job_working_time(self, current_job_working_time, pipeline: t.Optional['Pipeline'] = None):
self.current_job_working_time = current_job_working_time
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'current_job_working_time', current_job_working_time)
def set_current_job_id(self, job_id, pipeline=None):
def set_current_job_id(self, job_id: t.Optional[str] = None, pipeline: t.Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
if job_id is None:
@ -407,7 +419,7 @@ class Worker:
else:
connection.hset(self.key, 'current_job', job_id)
def get_current_job_id(self, pipeline=None):
def get_current_job_id(self, pipeline: t.Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
return as_text(connection.hget(self.key, 'current_job'))
@ -553,8 +565,8 @@ class Worker:
def reorder_queues(self, reference_queue):
pass
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False):
"""Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
@ -701,7 +713,7 @@ class Worker:
self.heartbeat()
return result
def heartbeat(self, timeout=None, pipeline=None):
def heartbeat(self, timeout=None, pipeline: t.Optional['Pipeline'] = None):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
@ -759,11 +771,11 @@ class Worker:
job_class=self.job_class, serializer=self.serializer)
for queue in queues.split(',')]
def increment_failed_job_count(self, pipeline=None):
def increment_failed_job_count(self, pipeline: t.Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'failed_job_count', 1)
def increment_successful_job_count(self, pipeline=None):
def increment_successful_job_count(self, pipeline: t.Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection
connection.hincrby(self.key, 'successful_job_count', 1)
@ -771,7 +783,7 @@ class Worker:
pipeline.hincrbyfloat(self.key, 'total_working_time',
job_execution_time.total_seconds())
def fork_work_horse(self, job, queue):
def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job.
"""
child_pid = os.fork()
@ -785,14 +797,14 @@ class Worker:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def get_heartbeat_ttl(self, job):
def get_heartbeat_ttl(self, job: 'Job'):
if job.timeout and job.timeout > 0:
remaining_execution_time = job.timeout - self.current_job_working_time
return min(remaining_execution_time, self.job_monitoring_interval) + 60
else:
return self.job_monitoring_interval + 60
def monitor_work_horse(self, job, queue):
def monitor_work_horse(self, job: 'Job', queue: 'Queue'):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
@ -863,7 +875,7 @@ class Worker:
"(waitpid returned %s)" % ret_val
)
def execute_job(self, job, queue):
def execute_job(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
@ -874,7 +886,7 @@ class Worker:
self.monitor_work_horse(job, queue)
self.set_state(WorkerStatus.IDLE)
def maintain_heartbeats(self, job):
def maintain_heartbeats(self, job: 'Job'):
"""Updates worker and job's last heartbeat field. If job was
enqueued with `result_ttl=0`, a race condition could happen where this heartbeat
arrives after job has been deleted, leaving a job key that contains only
@ -895,7 +907,7 @@ class Worker:
if results[2] == 1:
self.connection.delete(job.key)
def main_work_horse(self, job, queue):
def main_work_horse(self, job: 'Job', queue: 'Queue'):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
@ -923,7 +935,7 @@ class Worker:
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def prepare_job_execution(self, job):
def prepare_job_execution(self, job: 'Job'):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
@ -942,7 +954,7 @@ class Worker:
msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job, queue, started_job_registry=None,
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None,
exc_string=''):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
@ -1006,7 +1018,7 @@ class Worker:
# even if Redis is down
pass
def handle_job_success(self, job, queue, started_job_registry):
def handle_job_success(self, job: 'Job', queue: 'Queue', started_job_registry):
self.log.debug('Handling successful execution of job %s', job.id)
with self.connection.pipeline() as pipeline:
@ -1046,7 +1058,7 @@ class Worker:
except redis.exceptions.WatchError:
continue
def execute_success_callback(self, job, result):
def execute_success_callback(self, job: 'Job', result):
"""Executes success_callback with timeout"""
job.heartbeat(utcnow(), CALLBACK_TIMEOUT)
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
@ -1058,7 +1070,7 @@ class Worker:
with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id):
job.failure_callback(job, self.connection, *sys.exc_info())
def perform_job(self, job, queue):
def perform_job(self, job: 'Job', queue: 'Queue'):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
@ -1126,7 +1138,7 @@ class Worker:
return True
def handle_exception(self, job, *exc_info):
def handle_exception(self, job: 'Job', *exc_info):
"""Walks the exception handler stack to delegate exception handling."""
exc_string = ''.join(traceback.format_exception(*exc_info))
@ -1206,13 +1218,13 @@ class Worker:
class SimpleWorker(Worker):
def execute_job(self, job, queue):
def execute_job(self, job: 'Job', queue: 'Queue'):
"""Execute job in same thread/process, do not fork()"""
self.set_state(WorkerStatus.BUSY)
self.perform_job(job, queue)
self.set_state(WorkerStatus.IDLE)
def get_heartbeat_ttl(self, job):
def get_heartbeat_ttl(self, job: 'Job'):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59.
# # We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1:

@ -1,3 +1,11 @@
import typing as t
if t.TYPE_CHECKING:
from redis import Redis
from redis.client import Pipeline
from .worker import Worker
from .queue import Queue
from .compat import as_text
from rq.utils import split_list
@ -7,8 +15,14 @@ REDIS_WORKER_KEYS = 'rq:workers'
MAX_KEYS = 1000
def register(worker, pipeline=None):
"""Store worker key in Redis so we can easily discover active workers."""
def register(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
"""
Store worker key in Redis so we can easily discover active workers.
Args:
worker (Worker): The Worker
pipeline (t.Optional[Pipeline], optional): The Redis Pipeline. Defaults to None.
"""
connection = pipeline if pipeline is not None else worker.connection
connection.sadd(worker.redis_workers_keys, worker.key)
for name in worker.queue_names():
@ -16,8 +30,13 @@ def register(worker, pipeline=None):
connection.sadd(redis_key, worker.key)
def unregister(worker, pipeline=None):
"""Remove worker key from Redis."""
def unregister(worker: 'Worker', pipeline: t.Optional['Pipeline'] = None):
"""Remove Worker key from Redis
Args:
worker (Worker): The Worker
pipeline (t.Optional[Pipeline], optional): Redis Pipeline. Defaults to None.
"""
if pipeline is None:
connection = worker.connection.pipeline()
else:
@ -32,23 +51,38 @@ def unregister(worker, pipeline=None):
connection.execute()
def get_keys(queue=None, connection=None):
"""Returnes a list of worker keys for a queue"""
def get_keys(queue: t.Optional['Queue'] = None, connection: t.Optional['Redis'] = None) -> t.Set[t.Any]:
"""Returns a list of worker keys for a given queue.
Args:
queue (t.Optional[&#39;Queue&#39;], optional): The Queue. Defaults to None.
connection (t.Optional[&#39;Redis&#39;], optional): The Redis Connection. Defaults to None.
Raises:
ValueError: If no Queue or Connection is provided.
Returns:
set: A Set with keys.
"""
if queue is None and connection is None:
raise ValueError('"queue" or "connection" argument is required')
raise ValueError('"Queue" or "connection" argument is required')
if queue:
redis = queue.connection
redis_key = WORKERS_BY_QUEUE_KEY % queue.name
else:
redis = connection
redis = connection # type: ignore
redis_key = REDIS_WORKER_KEYS
return {as_text(key) for key in redis.smembers(redis_key)}
def clean_worker_registry(queue):
"""Delete invalid worker keys in registry"""
def clean_worker_registry(queue: 'Queue'):
"""Delete invalid worker keys in registry.
Args:
queue (Queue): The Queue
"""
keys = list(get_keys(queue))
with queue.connection.pipeline() as pipeline:

@ -1,3 +0,0 @@
#!/bin/bash
docker build -f tests/Dockerfile . -t rqtest && docker run -it --rm rqtest

@ -1,19 +1,51 @@
FROM ubuntu:latest
FROM ubuntu:20.04
RUN apt-get update \
&& apt-get install -y \
ARG DEBIAN_FRONTEND=noninteractive
ENV LANG C.UTF-8
ENV LC_ALL C.UTF-8
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y \
build-essential \
zlib1g-dev \
libncurses5-dev \
libgdbm-dev \
libnss3-dev \
libssl-dev \
libreadline-dev \
libffi-dev wget \
software-properties-common && \
add-apt-repository ppa:deadsnakes/ppa && \
apt-get update
RUN apt-get install -y \
redis-server \
python3-pip \
stunnel
stunnel \
python3.6 \
python3.7 \
python3.8 \
python3.9 \
python3.10 \
python3.6-distutils \
python3.7-distutils
RUN apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY tests/ssl_config/private.pem tests/ssl_config/stunnel.conf /etc/stunnel/
COPY . /tmp/rq
WORKDIR /tmp/rq
RUN pip3 install -r /tmp/rq/requirements.txt -r /tmp/rq/dev-requirements.txt \
&& python3 /tmp/rq/setup.py build \
&& python3 /tmp/rq/setup.py install
RUN set -e && \
python3 -m pip install --upgrade pip && \
python3 -m pip install --no-cache-dir tox && \
pip3 install -r /tmp/rq/requirements.txt -r /tmp/rq/dev-requirements.txt && \
python3 /tmp/rq/setup.py build && \
python3 /tmp/rq/setup.py install
CMD stunnel \
& redis-server \
& RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 pytest /tmp/rq/tests/ --durations=5 -v --log-cli-level 10
& RUN_SLOW_TESTS_TOO=1 RUN_SSL_TESTS=1 tox

@ -160,4 +160,4 @@ class TestDependencies(RQTestCase):
job_c.dependencies_are_met()
w = Worker([queue])
w.work(burst=True)
assert job_c.result
assert job_c.result

@ -28,7 +28,7 @@ from tests.fixtures import (
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.compat import as_text, PY2
from rq.compat import as_text
from rq.job import Job, JobStatus, Retry
from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry
from rq.suspension import resume, suspend
@ -68,22 +68,6 @@ class TestWorker(RQTestCase):
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# Also accept byte strings in Python 2
if PY2:
# With single byte string argument
w = Worker(b'foo')
self.assertEqual(w.queues[0].name, 'foo')
# With list of byte strings
w = Worker([b'foo', b'bar'])
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With iterable of byte strings
w = Worker(iter([b'foo', b'bar']))
self.assertEqual(w.queues[0].name, 'foo')
self.assertEqual(w.queues[1].name, 'bar')
# With single Queue
w = Worker(Queue('foo'))
self.assertEqual(w.queues[0].name, 'foo')

@ -1,11 +1,14 @@
[tox]
envlist=py35,py36,py37,py38,py39,pypy,flake8
envlist=py36,py37,py38,py39,py310,flake8
[testenv]
commands=pytest --cov rq --durations=5 {posargs}
deps=
pytest
pytest-cov
sentry-sdk
codecov
psutil
[testenv:flake8]
basepython = python3.6
@ -13,3 +16,29 @@ deps =
flake8
commands =
flake8 rq tests
[testenv:py36]
skipdist = True
basepython = python3.6
deps = {[testenv]deps}
[testenv:py37]
skipdist = True
basepython = python3.7
deps = {[testenv]deps}
[testenv:py38]
skipdist = True
basepython = python3.8
deps = {[testenv]deps}
[testenv:py39]
skipdist = True
basepython = python3.9
deps = {[testenv]deps}
[testenv:py310]
skipdist = True
basepython = python3.10
deps = {[testenv]deps}

Loading…
Cancel
Save