From ea063edf0a790630d0800808fe6236b3e9ddcf22 Mon Sep 17 00:00:00 2001 From: Rob Hudson Date: Wed, 17 May 2023 09:19:14 -0700 Subject: [PATCH] Update linting configuration (#1915) * Update linting configuration This removes flake8 in favor of ruff, which also provides isort support, and updates all files to be black, isort, and ruff compliant. This also adds black and ruff checks to the tox and Github linting workflow. * Tweak the code coverage config and calls --- .coveragerc | 7 ++-- .github/workflows/lint.yml | 14 ++++---- .github/workflows/workflow.yml | 4 +-- .pre-commit-config.yaml | 9 +++++ codecov.yml | 1 + docs/contrib/testing.md | 2 +- examples/fib.py | 2 +- examples/run_example.py | 4 +-- pyproject.toml | 20 +++++++++-- rq/__init__.py | 5 ++- rq/cli/__init__.py | 2 +- rq/cli/cli.py | 22 ++++++------ rq/cli/helpers.py | 16 ++++----- rq/command.py | 5 ++- rq/connections.py | 3 +- rq/contrib/legacy.py | 3 +- rq/decorators.py | 3 +- rq/defaults.py | 2 +- rq/job.py | 34 ++++++++++--------- rq/local.py | 17 +++++----- rq/logutils.py | 13 +++---- rq/queue.py | 17 +++++----- rq/registry.py | 25 +++++++------- rq/results.py | 7 ++-- rq/scheduler.py | 2 +- rq/serializers.py | 4 +-- rq/suspension.py | 1 + rq/utils.py | 6 ++-- rq/worker.py | 26 +++++++------- rq/worker_pool.py | 9 ++--- rq/worker_registration.py | 9 ++--- setup.cfg | 6 ---- setup.py | 12 +++---- tests/__init__.py | 4 +-- tests/fixtures.py | 9 ++--- tests/test_callbacks.py | 25 ++++---------- tests/test_cli.py | 18 ++++------ tests/test_commands.py | 6 ++-- tests/test_decorator.py | 41 ++++++++++++---------- tests/test_dependencies.py | 28 ++++------------ tests/test_fixtures.py | 1 - tests/test_helpers.py | 56 +++++++++++++++++-------------- tests/test_job.py | 17 ++++------ tests/test_queue.py | 7 ++-- tests/test_registry.py | 44 ++++++++++-------------- tests/test_results.py | 17 ++++------ tests/test_retry.py | 1 - tests/test_scheduler.py | 5 +-- tests/test_sentry.py | 8 ++--- tests/test_serializers.py | 5 +-- tests/test_timeouts.py | 2 +- tests/test_utils.py | 6 ++-- tests/test_worker.py | 42 +++++++++++------------ tests/test_worker_pool.py | 14 +++----- tests/test_worker_registration.py | 47 +++++++++++--------------- tox.ini | 15 +++++---- 56 files changed, 344 insertions(+), 386 deletions(-) create mode 100644 .pre-commit-config.yaml diff --git a/.coveragerc b/.coveragerc index b78c524..2838512 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,16 +1,13 @@ [run] +source = rq omit = - rq/scripts/* - rq/compat/* rq/contrib/legacy.py - rq/dummy.py rq/local.py rq/tests/* tests/* [report] exclude_lines = + if __name__ == .__main__.: if TYPE_CHECKING: pragma: no cover - if __name__ == .__main__.: - \ No newline at end of file diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index df3f274..7a3dbca 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,15 +25,13 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 black + pip install black ruff - - name: Lint with flake8 + - name: Lint with black run: | - # stop the build if there are Python syntax errors or undefined names - flake8 . --select=E9,F63,F7,F82 --show-source - # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide - flake8 . --exit-zero --max-complexity=5 + black --check --skip-string-normalization --line-length 120 rq tests - - name: Lint with black + - name: Lint with ruff run: | - black -S -l 120 rq/ + # stop the build if there are Python syntax errors. + ruff check --show-source rq tests diff --git a/.github/workflows/workflow.yml b/.github/workflows/workflow.yml index 1f77645..67052e8 100644 --- a/.github/workflows/workflow.yml +++ b/.github/workflows/workflow.yml @@ -59,7 +59,7 @@ jobs: - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report=xml --durations=5 + RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 @@ -98,7 +98,7 @@ jobs: - name: Test with pytest run: | - RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report=xml --durations=5 + RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report=xml --durations=5 - name: Upload coverage to Codecov uses: codecov/codecov-action@v3 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..d45026b --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,9 @@ +repos: + - repo: https://github.com/psf/black + rev: 23.3.0 + hooks: + - id: black + - repo: https://github.com/charliermarsh/ruff-pre-commit + rev: "v0.0.267" + hooks: + - id: ruff diff --git a/codecov.yml b/codecov.yml index aa84c67..6e566ad 100644 --- a/codecov.yml +++ b/codecov.yml @@ -1,2 +1,3 @@ ignore: + - setup.py - "*/tests/*" diff --git a/docs/contrib/testing.md b/docs/contrib/testing.md index f8da71e..3264e43 100644 --- a/docs/contrib/testing.md +++ b/docs/contrib/testing.md @@ -37,7 +37,7 @@ RUN_SLOW_TESTS_TOO=1 pytest . If you want to analyze the coverage reports, you can use the `--cov` argument to `pytest`. By adding `--cov-report`, you also have some flexibility in terms of the report output format: ```sh -RUN_SLOW_TESTS_TOO=1 pytest --cov=./ --cov-report={{report_format}} --durations=5 +RUN_SLOW_TESTS_TOO=1 pytest --cov=rq --cov-config=.coveragerc --cov-report={{report_format}} --durations=5 ``` Where you replace the `report_format` by the desired format (`term` / `html` / `xml`). diff --git a/examples/fib.py b/examples/fib.py index 2130b3c..4ca4493 100644 --- a/examples/fib.py +++ b/examples/fib.py @@ -2,4 +2,4 @@ def slow_fib(n): if n <= 1: return 1 else: - return slow_fib(n-1) + slow_fib(n-2) + return slow_fib(n - 1) + slow_fib(n - 2) diff --git a/examples/run_example.py b/examples/run_example.py index 93f62bd..43fe163 100644 --- a/examples/run_example.py +++ b/examples/run_example.py @@ -1,10 +1,10 @@ import os import time -from rq import Connection, Queue - from fib import slow_fib +from rq import Connection, Queue + def main(): # Range of Fibonacci numbers to compute diff --git a/pyproject.toml b/pyproject.toml index 4787b13..ebcd7e8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,4 +1,20 @@ [tool.black] line-length = 120 -target-version = ['py36'] -skip-string-normalization = true \ No newline at end of file +target-version = ['py38'] +skip-string-normalization = true + +[tool.ruff] +# Set what ruff should check for. +# See https://beta.ruff.rs/docs/rules/ for a list of rules. +select = [ + "E", # pycodestyle errors + "F", # pyflakes errors + "I", # import sorting + "W", # pycodestyle warnings +] +line-length = 120 # To match black. +target-version = 'py38' + +[tool.ruff.isort] +known-first-party = ["rq"] +section-order = ["future", "standard-library", "third-party", "first-party", "local-folder"] diff --git a/rq/__init__.py b/rq/__init__.py index 0ab7065..b385e76 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,7 +1,6 @@ -# flake8: noqa - +# ruff: noqa: F401 from .connections import Connection, get_current_connection, pop_connection, push_connection -from .job import cancel_job, get_current_job, requeue_job, Retry, Callback +from .job import Callback, Retry, cancel_job, get_current_job, requeue_job from .queue import Queue from .version import VERSION from .worker import SimpleWorker, Worker diff --git a/rq/cli/__init__.py b/rq/cli/__init__.py index 821f9d7..ec850b8 100644 --- a/rq/cli/__init__.py +++ b/rq/cli/__init__.py @@ -1,4 +1,4 @@ -# flake8: noqa +# ruff: noqa: F401 I001 from .cli import main # TODO: the following imports can be removed when we drop the `rqinfo` and diff --git a/rq/cli/cli.py b/rq/cli/cli.py index bccde97..eb18293 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,45 +5,47 @@ RQ command line tool import os import sys import warnings - from typing import List, Type import click from redis.exceptions import ConnectionError -from rq import Connection, Retry, __version__ as version +from rq import Connection, Retry +from rq import __version__ as version from rq.cli.helpers import ( + parse_function_args, + parse_schedule, + pass_cli_config, read_config_file, refresh, setup_loghandlers_from_args, show_both, show_queues, show_workers, - parse_function_args, - parse_schedule, - pass_cli_config, ) # from rq.cli.pool import pool from rq.contrib.legacy import cleanup_ghosts from rq.defaults import ( - DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_LOGGING_FORMAT, DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, ) from rq.exceptions import InvalidJobOperationError from rq.job import Job, JobStatus from rq.logutils import blue from rq.registry import FailedJobRegistry, clean_registries from rq.serializers import DefaultSerializer -from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended +from rq.suspension import is_suspended +from rq.suspension import resume as connection_resume +from rq.suspension import suspend as connection_suspend +from rq.utils import get_call_string, import_attribute from rq.worker import Worker from rq.worker_pool import WorkerPool from rq.worker_registration import clean_worker_registry -from rq.utils import import_attribute, get_call_string @click.group() diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index bea2c37..e585ca7 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -1,14 +1,12 @@ -import sys import importlib -import time import os - -from functools import partial, update_wrapper -from enum import Enum - -from datetime import datetime, timezone, timedelta -from json import loads, JSONDecodeError +import sys +import time from ast import literal_eval +from datetime import datetime, timedelta, timezone +from enum import Enum +from functools import partial, update_wrapper +from json import JSONDecodeError, loads from shutil import get_terminal_size import click @@ -20,8 +18,8 @@ from rq.defaults import ( DEFAULT_DEATH_PENALTY_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, - DEFAULT_WORKER_CLASS, DEFAULT_SERIALIZER_CLASS, + DEFAULT_WORKER_CLASS, ) from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout diff --git a/rq/command.py b/rq/command.py index 4566ec0..0488d68 100644 --- a/rq/command.py +++ b/rq/command.py @@ -1,17 +1,16 @@ import json import os import signal - -from typing import TYPE_CHECKING, Dict, Any +from typing import TYPE_CHECKING, Any, Dict if TYPE_CHECKING: from redis import Redis + from .worker import Worker from rq.exceptions import InvalidJobOperation from rq.job import Job - PUBSUB_CHANNEL_TEMPLATE = 'rq:pubsub:%s' diff --git a/rq/connections.py b/rq/connections.py index 02b50e3..5d10ea4 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -2,7 +2,8 @@ import warnings from contextlib import contextmanager from typing import Optional, Tuple, Type -from redis import Connection as RedisConnection, Redis +from redis import Connection as RedisConnection +from redis import Redis from .local import LocalStack diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 33ecf18..be44b65 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -1,7 +1,6 @@ import logging -from rq import get_current_connection -from rq import Worker +from rq import Worker, get_current_connection logger = logging.getLogger(__name__) diff --git a/rq/decorators.py b/rq/decorators.py index 2bf46e8..a24101e 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -1,8 +1,9 @@ from functools import wraps -from typing import TYPE_CHECKING, Callable, Dict, Optional, List, Any, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union if TYPE_CHECKING: from redis import Redis + from .job import Retry from .defaults import DEFAULT_RESULT_TTL diff --git a/rq/defaults.py b/rq/defaults.py index 3744c12..0cea711 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -99,4 +99,4 @@ Defaults to the `UnixSignalDeathPenalty` class within the `rq.timeouts` module UNSERIALIZABLE_RETURN_VALUE_PAYLOAD = 'Unserializable return value' """ The value that we store in the job's _result property or in the Result's return_value in case the return value of the actual job is not serializable -""" \ No newline at end of file +""" diff --git a/rq/job.py b/rq/job.py index b4ee6a4..7e7e964 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,25 +1,26 @@ +import asyncio import inspect import json import logging import warnings import zlib -import asyncio - from datetime import datetime, timedelta, timezone from enum import Enum -from redis import WatchError -from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union, Type +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Type, Union from uuid import uuid4 +from redis import WatchError + from .defaults import CALLBACK_TIMEOUT, UNSERIALIZABLE_RETURN_VALUE_PAYLOAD -from .timeouts import JobTimeoutException, BaseDeathPenalty +from .timeouts import BaseDeathPenalty, JobTimeoutException if TYPE_CHECKING: - from .results import Result - from .queue import Queue from redis import Redis from redis.client import Pipeline + from .queue import Queue + from .results import Result + from .connections import resolve_connection from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack @@ -167,8 +168,8 @@ class Job: func (FunctionReference): The function/method/callable for the Job. This can be a reference to a concrete callable or a string representing the path of function/method to be imported. Effectively this is the only required attribute when creating a new Job. - args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the callable. - Defaults to None, meaning no args being passed. + args (Union[List[Any], Optional[Tuple]], optional): A Tuple / List of positional arguments to pass the + callable. Defaults to None, meaning no args being passed. kwargs (Optional[Dict], optional): A Dictionary of keyword arguments to pass the callable. Defaults to None, meaning no kwargs being passed. connection (Optional[Redis], optional): The Redis connection to use. Defaults to None. @@ -179,13 +180,16 @@ class Job: status (JobStatus, optional): The Job Status. Defaults to None. description (Optional[str], optional): The Job Description. Defaults to None. depends_on (Union['Dependency', List[Union['Dependency', 'Job']]], optional): What the jobs depends on. - This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a `Job` - list of `Job`. Defaults to None. - timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job execution. Defaults to None. + This accepts a variaty of different arguments including a `Dependency`, a list of `Dependency` or a + `Job` list of `Job`. Defaults to None. + timeout (Optional[int], optional): The amount of time in seconds that should be a hardlimit for a job + execution. Defaults to None. id (Optional[str], optional): An Optional ID (str) for the Job. Defaults to None. origin (Optional[str], optional): The queue of origin. Defaults to None. - meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. Defaults to None. - failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. Defaults to None. + meta (Optional[Dict[str, Any]], optional): Custom metadata about the job, takes a dictioanry. + Defaults to None. + failure_ttl (Optional[int], optional): THe time to live in seconds for failed-jobs information. + Defaults to None. serializer (Optional[str], optional): The serializer class path to use. Should be a string with the import path for the serializer to use. eg. `mymodule.myfile.MySerializer` Defaults to None. on_success (Optional[Callable[..., Any]], optional): A callback function, should be a callable to run @@ -1081,8 +1085,8 @@ class Job: """ if self.is_canceled: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) - from .registry import CanceledJobRegistry from .queue import Queue + from .registry import CanceledJobRegistry pipe = pipeline or self.connection.pipeline() diff --git a/rq/local.py b/rq/local.py index e6b070b..2fe22c9 100644 --- a/rq/local.py +++ b/rq/local.py @@ -1,4 +1,4 @@ -# flake8: noqa +# ruff: noqa: E731 """ werkzeug.local ~~~~~~~~~~~~~~ @@ -13,14 +13,14 @@ # current thread ident. try: from greenlet import getcurrent as get_ident -except ImportError: # noqa +except ImportError: try: - from threading import get_ident # noqa - except ImportError: # noqa + from threading import get_ident + except ImportError: try: - from _thread import get_ident # noqa - except ImportError: # noqa - from dummy_thread import get_ident # noqa + from _thread import get_ident + except ImportError: + from dummy_thread import get_ident def release_local(local): @@ -120,7 +120,7 @@ class LocalStack: def _get__ident_func__(self): return self._local.__ident_func__ - def _set__ident_func__(self, value): # noqa + def _set__ident_func__(self, value): object.__setattr__(self._local, '__ident_func__', value) __ident_func__ = property(_get__ident_func__, _set__ident_func__) @@ -348,7 +348,6 @@ class LocalProxy: __invert__ = lambda x: ~(x._get_current_object()) __complex__ = lambda x: complex(x._get_current_object()) __int__ = lambda x: int(x._get_current_object()) - __long__ = lambda x: long(x._get_current_object()) __float__ = lambda x: float(x._get_current_object()) __oct__ = lambda x: oct(x._get_current_object()) __hex__ = lambda x: hex(x._get_current_object()) diff --git a/rq/logutils.py b/rq/logutils.py index b36ece8..9a1c6c5 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -2,7 +2,7 @@ import logging import sys from typing import Union -from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT +from rq.defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT class _Colorizer: @@ -24,12 +24,12 @@ class _Colorizer: light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] x = 30 - for d, l in zip(dark_colors, light_colors): - self.codes[d] = esc + "%im" % x - self.codes[l] = esc + "%i;01m" % x + for dark, light in zip(dark_colors, light_colors): + self.codes[dark] = esc + "%im" % x + self.codes[light] = esc + "%i;01m" % x x += 1 - del d, l, x + del dark, light, x self.codes["darkteal"] = self.codes["turquoise"] self.codes["darkyellow"] = self.codes["brown"] @@ -117,7 +117,8 @@ def setup_loghandlers( level (Union[int, str, None], optional): The log level. Access an integer level (10-50) or a string level ("info", "debug" etc). Defaults to None. date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S'). - log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). + log_format (str, optional): The log format to use. + Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). name (str, optional): The looger name. Defaults to 'rq.worker'. """ logger = logging.getLogger(name) diff --git a/rq/queue.py b/rq/queue.py index f7a0c87..2d74ddf 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -4,9 +4,9 @@ import traceback import uuid import warnings from collections import namedtuple -from datetime import datetime, timezone, timedelta +from datetime import datetime, timedelta, timezone from functools import total_ordering -from typing import TYPE_CHECKING, Dict, List, Any, Callable, Optional, Tuple, Type, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union from redis import WatchError @@ -15,18 +15,17 @@ from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline + from .job import Retry -from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus from .logutils import blue, green -from .types import FunctionReferenceType, JobDependencyType from .serializers import resolve_serializer -from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow, compact - +from .types import FunctionReferenceType, JobDependencyType +from .utils import as_text, backend_class, compact, get_version, import_attribute, parse_timeout, utcnow logger = logging.getLogger("rq.queue") @@ -158,9 +157,11 @@ class Queue: connection (Optional[Redis], optional): Redis connection. Defaults to None. is_async (bool, optional): Whether jobs should run "async" (using the worker). If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. - job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. + job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. + Defaults to None. serializer (Any, optional): Serializer. Defaults to None. - death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. Defaults to UnixSignalDeathPenalty. + death_penalty_class (Type[BaseDeathPenalty, optional): Job class or a string referencing the Job class path. + Defaults to UnixSignalDeathPenalty. """ self.connection = connection or resolve_connection() prefix = self.redis_queue_namespace_prefix diff --git a/rq/registry.py b/rq/registry.py index acd6bd7..b955d6b 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -1,26 +1,24 @@ import calendar import logging -import traceback - -from rq.serializers import resolve_serializer import time +import traceback from datetime import datetime, timedelta, timezone from typing import TYPE_CHECKING, Any, List, Optional, Type, Union -from .timeouts import UnixSignalDeathPenalty, BaseDeathPenalty +from rq.serializers import resolve_serializer + +from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline -from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL -from .exceptions import InvalidJobOperation, NoSuchJobError, AbandonedJobError +from .exceptions import AbandonedJobError, InvalidJobOperation, NoSuchJobError from .job import Job, JobStatus from .queue import Queue -from .utils import backend_class, current_timestamp - +from .utils import as_text, backend_class, current_timestamp logger = logging.getLogger("rq.registry") @@ -237,8 +235,9 @@ class StartedJobRegistry(BaseRegistry): except NoSuchJobError: continue - job.execute_failure_callback(self.death_penalty_class, AbandonedJobError, AbandonedJobError(), - traceback.extract_stack()) + job.execute_failure_callback( + self.death_penalty_class, AbandonedJobError, AbandonedJobError(), traceback.extract_stack() + ) retry = job.retries_left and job.retries_left > 0 @@ -248,8 +247,10 @@ class StartedJobRegistry(BaseRegistry): else: exc_string = f"due to {AbandonedJobError.__name__}" - logger.warning(f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' - f'({exc_string})') + logger.warning( + f'{self.__class__.__name__} cleanup: Moving job to {FailedJobRegistry.__name__} ' + f'({exc_string})' + ) job.set_status(JobStatus.FAILED) job._exc_info = f"Moved to {FailedJobRegistry.__name__}, {exc_string}, at {datetime.now()}" job.save(pipeline=pipeline, include_meta=False) diff --git a/rq/results.py b/rq/results.py index fdbb763..27bab15 100644 --- a/rq/results.py +++ b/rq/results.py @@ -1,16 +1,15 @@ -from typing import Any, Optional import zlib - from base64 import b64decode, b64encode from datetime import datetime, timezone from enum import Enum +from typing import Any, Optional + from redis import Redis from .defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD -from .utils import decode_redis_hash from .job import Job from .serializers import resolve_serializer -from .utils import now +from .utils import decode_redis_hash, now def get_key(job_id): diff --git a/rq/scheduler.py b/rq/scheduler.py index a64b400..97d627c 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -8,7 +8,7 @@ from enum import Enum from multiprocessing import Process from typing import List, Set -from redis import ConnectionPool, Redis, SSLConnection, UnixDomainSocketConnection +from redis import ConnectionPool, Redis from .connections import parse_connection from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD diff --git a/rq/serializers.py b/rq/serializers.py index 96de3f5..94eddbf 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -1,6 +1,6 @@ -from functools import partial -import pickle import json +import pickle +from functools import partial from typing import Optional, Type, Union from .utils import import_attribute diff --git a/rq/suspension.py b/rq/suspension.py index 77df9b8..10af5ba 100644 --- a/rq/suspension.py +++ b/rq/suspension.py @@ -2,6 +2,7 @@ from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from redis import Redis + from rq.worker import Worker diff --git a/rq/utils.py b/rq/utils.py index db483ab..5e61983 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -7,16 +7,16 @@ terminal colorizing code, originally by Georg Brandl. import calendar import datetime +import datetime as dt import importlib import logging import numbers -import sys -import datetime as dt from collections.abc import Iterable -from typing import TYPE_CHECKING, Dict, List, Optional, Any, Callable, Tuple, Union +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union if TYPE_CHECKING: from redis import Redis + from .queue import Queue from redis.exceptions import ResponseError diff --git a/rq/worker.py b/rq/worker.py index ade789b..062b1b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,8 +13,8 @@ import warnings from datetime import datetime, timedelta from enum import Enum from random import shuffle -from typing import TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, Union from types import FrameType +from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Type, Union from uuid import uuid4 if TYPE_CHECKING: @@ -35,19 +35,17 @@ from contextlib import suppress import redis.exceptions from . import worker_registration -from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .connections import get_current_connection, push_connection, pop_connection - +from .command import PUBSUB_CHANNEL_TEMPLATE, handle_command, parse_payload +from .connections import get_current_connection, pop_connection, push_connection from .defaults import ( + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_LOGGING_FORMAT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, - DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, - DEFAULT_LOGGING_DATE_FORMAT, ) -from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException - +from .exceptions import DequeueTimeout, DeserializationError, ShutDownImminentException from .job import Job, JobStatus from .logutils import blue, green, setup_loghandlers, yellow from .queue import Queue @@ -55,20 +53,19 @@ from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .serializers import resolve_serializer from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty +from .timeouts import HorseMonitorTimeoutException, JobTimeoutException, UnixSignalDeathPenalty from .utils import ( + as_text, backend_class, + compact, ensure_list, get_version, utcformat, utcnow, utcparse, - compact, - as_text, ) from .version import VERSION - try: from setproctitle import setproctitle as setprocname except ImportError: @@ -373,7 +370,8 @@ class BaseWorker: max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. - dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT + dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. + Defaults to DequeueStrategy.DEFAULT Returns: worked (bool): Will return True if any job was processed, False otherwise. diff --git a/rq/worker_pool.py b/rq/worker_pool.py index 005c3b9..b161cc8 100644 --- a/rq/worker_pool.py +++ b/rq/worker_pool.py @@ -4,17 +4,14 @@ import logging import os import signal import time - from enum import Enum from multiprocessing import Process -from typing import Dict, List, NamedTuple, Optional, Set, Type, Union +from typing import Dict, List, NamedTuple, Optional, Type, Union from uuid import uuid4 -from redis import Redis -from redis import ConnectionPool -from rq.serializers import DefaultSerializer +from redis import ConnectionPool, Redis -from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty +from rq.serializers import DefaultSerializer from .connections import parse_connection from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT diff --git a/rq/worker_registration.py b/rq/worker_registration.py index fe4dc04..838c63f 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -1,15 +1,16 @@ -from typing import Optional, TYPE_CHECKING, Any, Set +from typing import TYPE_CHECKING, Optional, Set if TYPE_CHECKING: from redis import Redis from redis.client import Pipeline - from .worker import Worker - from .queue import Queue -from .utils import as_text + from .queue import Queue + from .worker import Worker from rq.utils import split_list +from .utils import as_text + WORKERS_BY_QUEUE_KEY = 'rq:workers:%s' REDIS_WORKER_KEYS = 'rq:workers' MAX_KEYS = 1000 diff --git a/setup.cfg b/setup.cfg index 9cc24f6..f873f9b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -4,9 +4,3 @@ requires = redis >= 3.0.0 [wheel] universal = 1 - -[flake8] -max-line-length=120 -ignore=E731 -count=True -statistics=True diff --git a/setup.py b/setup.py index 221e223..ceaf034 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,8 @@ rq is a simple, lightweight, library for creating background jobs, and processing them. """ import os -from setuptools import setup, find_packages + +from setuptools import find_packages, setup def get_version(): @@ -33,11 +34,10 @@ setup( license='BSD', author='Vincent Driessen', author_email='vincent@3rdcloud.com', - description='RQ is a simple, lightweight, library for creating background ' - 'jobs, and processing them.', + description='RQ is a simple, lightweight, library for creating background jobs, and processing them.', long_description=__doc__, packages=find_packages(exclude=['tests', 'tests.*']), - package_data = {"rq": ["py.typed"]}, + package_data={"rq": ["py.typed"]}, include_package_data=True, zip_safe=False, platforms='any', @@ -46,7 +46,6 @@ setup( entry_points={ 'console_scripts': [ 'rq = rq.cli:main', - # NOTE: rqworker/rqinfo are kept for backward-compatibility, # remove eventually (TODO) 'rqinfo = rq.cli:info', @@ -85,6 +84,5 @@ setup( 'Topic :: System :: Distributed Computing', 'Topic :: System :: Systems Administration', 'Topic :: System :: Monitoring', - - ] + ], ) diff --git a/tests/__init__.py b/tests/__init__.py index 36b2bc6..9da4687 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,10 +1,10 @@ import logging import os +import unittest from redis import Redis -from rq import pop_connection, push_connection -import unittest +from rq import pop_connection, push_connection def find_empty_redis_database(ssl=False): diff --git a/tests/fixtures.py b/tests/fixtures.py index 4536c3c..62ea8e1 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,16 +3,17 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slightly different characteristics. """ +import contextlib import os -import time import signal -import sys import subprocess -import contextlib +import sys +import time from multiprocessing import Process from redis import Redis -from rq import Connection, get_current_job, get_current_connection, Queue + +from rq import Connection, Queue, get_current_connection, get_current_job from rq.command import send_kill_horse_command, send_shutdown_command from rq.decorators import job from rq.job import Job diff --git a/tests/test_callbacks.py b/tests/test_callbacks.py index 680ee38..c47ad84 100644 --- a/tests/test_callbacks.py +++ b/tests/test_callbacks.py @@ -1,15 +1,13 @@ from datetime import timedelta -from tests import RQTestCase -from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello - from rq import Queue, Worker -from rq.job import Job, JobStatus, UNEVALUATED +from rq.job import UNEVALUATED, Job, JobStatus from rq.worker import SimpleWorker +from tests import RQTestCase +from tests.fixtures import div_by_zero, erroneous_callback, save_exception, save_result, say_hello class QueueCallbackTestCase(RQTestCase): - def test_enqueue_with_success_callback(self): """Test enqueue* methods with on_success""" queue = Queue(connection=self.testconn) @@ -54,10 +52,7 @@ class SyncJobCallback(RQTestCase): job = queue.enqueue(say_hello, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FINISHED) - self.assertEqual( - self.testconn.get('success_callback:%s' % job.id).decode(), - job.result - ) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.result) job = queue.enqueue(div_by_zero, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -69,8 +64,7 @@ class SyncJobCallback(RQTestCase): job = queue.enqueue(div_by_zero, on_failure=save_exception) self.assertEqual(job.get_status(), JobStatus.FAILED) - self.assertIn('div_by_zero', - self.testconn.get('failure_callback:%s' % job.id).decode()) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) job = queue.enqueue(div_by_zero, on_success=save_result) self.assertEqual(job.get_status(), JobStatus.FAILED) @@ -88,10 +82,7 @@ class WorkerCallbackTestCase(RQTestCase): # Callback is executed when job is successfully executed worker.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) - self.assertEqual( - self.testconn.get('success_callback:%s' % job.id).decode(), - job.return_value() - ) + self.assertEqual(self.testconn.get('success_callback:%s' % job.id).decode(), job.return_value()) job = queue.enqueue(div_by_zero, on_success=save_result) worker.work(burst=True) @@ -120,8 +111,7 @@ class WorkerCallbackTestCase(RQTestCase): self.assertEqual(job.get_status(), JobStatus.FAILED) job.refresh() print(job.exc_info) - self.assertIn('div_by_zero', - self.testconn.get('failure_callback:%s' % job.id).decode()) + self.assertIn('div_by_zero', self.testconn.get('failure_callback:%s' % job.id).decode()) job = queue.enqueue(div_by_zero, on_success=save_result) worker.work(burst=True) @@ -132,7 +122,6 @@ class WorkerCallbackTestCase(RQTestCase): class JobCallbackTestCase(RQTestCase): - def test_job_creation_with_success_callback(self): """Ensure callbacks are created and persisted properly""" job = Job.create(say_hello) diff --git a/tests/test_cli.py b/tests/test_cli.py index 79ac12d..1767a3e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,26 +1,22 @@ -from datetime import datetime, timezone, timedelta +import json +import os +from datetime import datetime, timedelta, timezone from time import sleep from uuid import uuid4 -import os -import json - -from click import BadParameter +import pytest from click.testing import CliRunner from redis import Redis from rq import Queue from rq.cli import main -from rq.cli.helpers import read_config_file, CliConfig, parse_function_arg, parse_schedule +from rq.cli.helpers import CliConfig, parse_function_arg, parse_schedule, read_config_file from rq.job import Job, JobStatus from rq.registry import FailedJobRegistry, ScheduledJobRegistry +from rq.scheduler import RQScheduler from rq.serializers import JSONSerializer from rq.timeouts import UnixSignalDeathPenalty from rq.worker import Worker, WorkerStatus -from rq.scheduler import RQScheduler - -import pytest - from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -809,7 +805,7 @@ class WorkerPoolCLITestCase(CLITestCase): queue = Queue('bar', connection=self.connection, serializer=JSONSerializer) job_2 = queue.enqueue(say_hello, 'Hello') runner = CliRunner() - result = runner.invoke( + runner.invoke( main, ['worker-pool', 'foo', 'bar', '-u', self.redis_url, '-b', '--serializer', 'rq.serializers.JSONSerializer'], ) diff --git a/tests/test_commands.py b/tests/test_commands.py index f98a0ec..355b72a 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1,17 +1,15 @@ import time - from multiprocessing import Process from redis import Redis -from tests import RQTestCase -from tests.fixtures import long_running_job, _send_kill_horse_command, _send_shutdown_command - from rq import Queue, Worker from rq.command import send_command, send_kill_horse_command, send_shutdown_command, send_stop_job_command from rq.exceptions import InvalidJobOperation, NoSuchJobError from rq.serializers import JSONSerializer from rq.worker import WorkerStatus +from tests import RQTestCase +from tests.fixtures import _send_kill_horse_command, _send_shutdown_command, long_running_job def start_work(queue_name, worker_name, connection_kwargs): diff --git a/tests/test_decorator.py b/tests/test_decorator.py index fb945e5..69ddde1 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -11,13 +11,11 @@ from tests.fixtures import decorated_job class TestDecorator(RQTestCase): - def setUp(self): super().setUp() def test_decorator_preserves_functionality(self): - """Ensure that a decorated function's functionality is still preserved. - """ + """Ensure that a decorated function's functionality is still preserved.""" self.assertEqual(decorated_job(1, 2), 3) def test_decorator_adds_delay_attr(self): @@ -34,9 +32,11 @@ class TestDecorator(RQTestCase): """Ensure that passing in queue name to the decorator puts the job in the right queue. """ + @job(queue='queue_name') def hello(): return 'Hi' + result = hello.delay() self.assertEqual(result.origin, 'queue_name') @@ -51,12 +51,12 @@ class TestDecorator(RQTestCase): @job('default', result_ttl=10) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.result_ttl, 10) def test_decorator_accepts_ttl_as_argument(self): - """Ensure that passing in ttl to the decorator sets the ttl on the job - """ + """Ensure that passing in ttl to the decorator sets the ttl on the job""" # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.ttl, None) @@ -64,12 +64,12 @@ class TestDecorator(RQTestCase): @job('default', ttl=30) def hello(): return 'Hello' + result = hello.delay() self.assertEqual(result.ttl, 30) def test_decorator_accepts_meta_as_argument(self): - """Ensure that passing in meta to the decorator sets the meta on the job - """ + """Ensure that passing in meta to the decorator sets the meta on the job""" # Ensure default result = decorated_job.delay(1, 2) self.assertEqual(result.meta, {}) @@ -82,6 +82,7 @@ class TestDecorator(RQTestCase): @job('default', meta=test_meta) def hello(): return 'Hello' + result = hello.delay() self.assertEqual(result.meta, test_meta) @@ -153,16 +154,19 @@ class TestDecorator(RQTestCase): """Ensure that passing in on_failure function to the decorator sets the correct on_failure function on the job. """ + # Only functions and builtins are supported as callback @job('default', on_failure=Job.fetch) def foo(): return 'Foo' + with self.assertRaises(ValueError): result = foo.delay() @job('default', on_failure=print) def hello(): return 'Hello' + result = hello.delay() result_job = Job.fetch(id=result.id, connection=self.testconn) self.assertEqual(result_job.failure_callback, print) @@ -171,23 +175,26 @@ class TestDecorator(RQTestCase): """Ensure that passing in on_failure function to the decorator sets the correct on_success function on the job. """ + # Only functions and builtins are supported as callback @job('default', on_failure=Job.fetch) def foo(): return 'Foo' + with self.assertRaises(ValueError): result = foo.delay() @job('default', on_success=print) def hello(): return 'Hello' + result = hello.delay() result_job = Job.fetch(id=result.id, connection=self.testconn) self.assertEqual(result_job.success_callback, print) @mock.patch('rq.queue.resolve_connection') def test_decorator_connection_laziness(self, resolve_connection): - """Ensure that job decorator resolve connection in `lazy` way """ + """Ensure that job decorator resolve connection in `lazy` way""" resolve_connection.return_value = Redis() @@ -207,12 +214,11 @@ class TestDecorator(RQTestCase): def test_decorator_custom_queue_class(self): """Ensure that a custom queue class can be passed to the job decorator""" + class CustomQueue(Queue): pass - CustomQueue.enqueue_call = mock.MagicMock( - spec=lambda *args, **kwargs: None, - name='enqueue_call' - ) + + CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call') custom_decorator = job(queue='default', queue_class=CustomQueue) self.assertIs(custom_decorator.queue_class, CustomQueue) @@ -226,12 +232,11 @@ class TestDecorator(RQTestCase): def test_decorate_custom_queue(self): """Ensure that a custom queue instance can be passed to the job decorator""" + class CustomQueue(Queue): pass - CustomQueue.enqueue_call = mock.MagicMock( - spec=lambda *args, **kwargs: None, - name='enqueue_call' - ) + + CustomQueue.enqueue_call = mock.MagicMock(spec=lambda *args, **kwargs: None, name='enqueue_call') queue = CustomQueue() @job(queue=queue) @@ -252,11 +257,12 @@ class TestDecorator(RQTestCase): @job('default', failure_ttl=10) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.failure_ttl, 10) def test_decorator_custom_retry(self): - """ Ensure that passing in retry to the decorator sets the + """Ensure that passing in retry to the decorator sets the retry on the job """ # Ensure default @@ -267,6 +273,7 @@ class TestDecorator(RQTestCase): @job('default', retry=Retry(3, [2])) def hello(): return 'Why hello' + result = hello.delay() self.assertEqual(result.retries_left, 3) self.assertEqual(result.retry_intervals, [2]) diff --git a/tests/test_dependencies.py b/tests/test_dependencies.py index a290a87..13400f7 100644 --- a/tests/test_dependencies.py +++ b/tests/test_dependencies.py @@ -1,12 +1,10 @@ +from rq import Queue, SimpleWorker, Worker +from rq.job import Dependency, Job, JobStatus from tests import RQTestCase from tests.fixtures import check_dependencies_are_met, div_by_zero, say_hello -from rq import Queue, SimpleWorker, Worker -from rq.job import Job, JobStatus, Dependency - class TestDependencies(RQTestCase): - def test_allow_failure_is_persisted(self): """Ensure that job.allow_dependency_failures is properly set when providing Dependency object to depends_on.""" @@ -70,10 +68,8 @@ class TestDependencies(RQTestCase): # When a failing job has multiple dependents, only enqueue those # with allow_failure=True parent_job = q.enqueue(div_by_zero) - job_allow_failure = q.enqueue(say_hello, - depends_on=Dependency(jobs=parent_job, allow_failure=True)) - job = q.enqueue(say_hello, - depends_on=Dependency(jobs=parent_job, allow_failure=False)) + job_allow_failure = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=True)) + job = q.enqueue(say_hello, depends_on=Dependency(jobs=parent_job, allow_failure=False)) w.work(burst=True, max_jobs=1) self.assertEqual(parent_job.get_status(), JobStatus.FAILED) self.assertEqual(job_allow_failure.get_status(), JobStatus.QUEUED) @@ -101,22 +97,12 @@ class TestDependencies(RQTestCase): # Test dependant is enqueued at front q.empty() parent_job = q.enqueue(say_hello) - q.enqueue( - say_hello, - job_id='fake_job_id_1', - depends_on=Dependency(jobs=[parent_job]) - ) - q.enqueue( - say_hello, - job_id='fake_job_id_2', - depends_on=Dependency(jobs=[parent_job],enqueue_at_front=True) - ) - #q.enqueue(say_hello) # This is a filler job that will act as a separator for jobs, one will be enqueued at front while the other one at the end of the queue + q.enqueue(say_hello, job_id='fake_job_id_1', depends_on=Dependency(jobs=[parent_job])) + q.enqueue(say_hello, job_id='fake_job_id_2', depends_on=Dependency(jobs=[parent_job], enqueue_at_front=True)) w.work(burst=True, max_jobs=1) self.assertEqual(q.job_ids, ["fake_job_id_2", "fake_job_id_1"]) - def test_dependency_list_in_depends_on(self): """Enqueue with Dependency list in depends_on""" q = Queue(connection=self.testconn) @@ -129,7 +115,6 @@ class TestDependencies(RQTestCase): w.work(burst=True) self.assertEqual(job.get_status(), JobStatus.FINISHED) - def test_enqueue_job_dependency(self): """Enqueue via Queue.enqueue_job() with depencency""" q = Queue(connection=self.testconn) @@ -147,7 +132,6 @@ class TestDependencies(RQTestCase): self.assertEqual(parent_job.get_status(), JobStatus.FINISHED) self.assertEqual(job.get_status(), JobStatus.FINISHED) - def test_dependencies_are_met_if_parent_is_canceled(self): """When parent job is canceled, it should be treated as failed""" queue = Queue(connection=self.testconn) diff --git a/tests/test_fixtures.py b/tests/test_fixtures.py index 383ba15..1517b80 100644 --- a/tests/test_fixtures.py +++ b/tests/test_fixtures.py @@ -1,5 +1,4 @@ from rq import Queue - from tests import RQTestCase, fixtures diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 5a84f71..c351b77 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -1,16 +1,14 @@ -from rq.cli.helpers import get_redis_from_config +from unittest import mock +from rq.cli.helpers import get_redis_from_config from tests import RQTestCase -from unittest import mock -class TestHelpers(RQTestCase): +class TestHelpers(RQTestCase): @mock.patch('rq.cli.helpers.Sentinel') def test_get_redis_from_config(self, sentinel_class_mock): """Ensure Redis connection params are properly parsed""" - settings = { - 'REDIS_URL': 'redis://localhost:1/1' - } + settings = {'REDIS_URL': 'redis://localhost:1/1'} # Ensure REDIS_URL is read redis = get_redis_from_config(settings) @@ -23,7 +21,7 @@ class TestHelpers(RQTestCase): 'REDIS_HOST': 'foo', 'REDIS_DB': 2, 'REDIS_PORT': 2, - 'REDIS_PASSWORD': 'bar' + 'REDIS_PASSWORD': 'bar', } # Ensure REDIS_URL is preferred @@ -42,23 +40,29 @@ class TestHelpers(RQTestCase): self.assertEqual(connection_kwargs['password'], 'bar') # Add Sentinel to the settings - settings.update({ - 'SENTINEL': { - 'INSTANCES':[('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)], - 'MASTER_NAME': 'master', - 'DB': 2, - 'USERNAME': 'redis-user', - 'PASSWORD': 'redis-secret', - 'SOCKET_TIMEOUT': None, - 'CONNECTION_KWARGS': { - 'ssl_ca_path': None, - }, - 'SENTINEL_KWARGS': { - 'username': 'sentinel-user', - 'password': 'sentinel-secret', + settings.update( + { + 'SENTINEL': { + 'INSTANCES': [ + ('remote.host1.org', 26379), + ('remote.host2.org', 26379), + ('remote.host3.org', 26379), + ], + 'MASTER_NAME': 'master', + 'DB': 2, + 'USERNAME': 'redis-user', + 'PASSWORD': 'redis-secret', + 'SOCKET_TIMEOUT': None, + 'CONNECTION_KWARGS': { + 'ssl_ca_path': None, + }, + 'SENTINEL_KWARGS': { + 'username': 'sentinel-user', + 'password': 'sentinel-secret', + }, }, - }, - }) + } + ) # Ensure SENTINEL is preferred against REDIS_* parameters redis = get_redis_from_config(settings) @@ -66,7 +70,7 @@ class TestHelpers(RQTestCase): sentinel_init_sentinel_kwargs = sentinel_class_mock.call_args[1] self.assertEqual( sentinel_init_sentinels_args, - ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],) + ([('remote.host1.org', 26379), ('remote.host2.org', 26379), ('remote.host3.org', 26379)],), ) self.assertDictEqual( sentinel_init_sentinel_kwargs, @@ -80,6 +84,6 @@ class TestHelpers(RQTestCase): 'sentinel_kwargs': { 'username': 'sentinel-user', 'password': 'sentinel-secret', - } - } + }, + }, ) diff --git a/tests/test_job.py b/tests/test_job.py index 318c41b..444080f 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -1,32 +1,29 @@ import json - -from rq.defaults import CALLBACK_TIMEOUT -from rq.serializers import JSONSerializer -import time import queue +import time import zlib from datetime import datetime, timedelta +from pickle import dumps, loads from redis import WatchError -from rq.utils import as_text +from rq.defaults import CALLBACK_TIMEOUT from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError -from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job, Callback +from rq.job import Callback, Dependency, Job, JobStatus, cancel_job, get_current_job from rq.queue import Queue from rq.registry import ( CanceledJobRegistry, DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, - StartedJobRegistry, ScheduledJobRegistry, + StartedJobRegistry, ) -from rq.utils import utcformat, utcnow +from rq.serializers import JSONSerializer +from rq.utils import as_text, utcformat, utcnow from rq.worker import Worker from tests import RQTestCase, fixtures -from pickle import loads, dumps - class TestJob(RQTestCase): def test_unicode(self): diff --git a/tests/test_queue.py b/tests/test_queue.py index d352736..e91ae54 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -1,9 +1,8 @@ import json from datetime import datetime, timedelta, timezone -from rq.serializers import JSONSerializer from unittest.mock import patch -from rq import Retry, Queue +from rq import Queue, Retry from rq.job import Job, JobStatus from rq.registry import ( CanceledJobRegistry, @@ -13,10 +12,10 @@ from rq.registry import ( ScheduledJobRegistry, StartedJobRegistry, ) +from rq.serializers import JSONSerializer from rq.worker import Worker - from tests import RQTestCase -from tests.fixtures import CustomJob, echo, say_hello +from tests.fixtures import echo, say_hello class MultipleDependencyJob(Job): diff --git a/tests/test_registry.py b/tests/test_registry.py index 57584b5..5dd0be6 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,20 +1,22 @@ from datetime import datetime, timedelta from unittest import mock -from unittest.mock import PropertyMock, ANY +from unittest.mock import ANY -from rq.serializers import JSONSerializer - -from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL -from rq.exceptions import InvalidJobOperation, AbandonedJobError +from rq.exceptions import AbandonedJobError, InvalidJobOperation from rq.job import Job, JobStatus, requeue_job from rq.queue import Queue -from rq.utils import current_timestamp +from rq.registry import ( + CanceledJobRegistry, + DeferredJobRegistry, + FailedJobRegistry, + FinishedJobRegistry, + StartedJobRegistry, + clean_registries, +) +from rq.serializers import JSONSerializer +from rq.utils import as_text, current_timestamp from rq.worker import Worker -from rq.registry import (CanceledJobRegistry, clean_registries, DeferredJobRegistry, - FailedJobRegistry, FinishedJobRegistry, - StartedJobRegistry) - from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -24,7 +26,6 @@ class CustomJob(Job): class TestRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = StartedJobRegistry(connection=self.testconn) @@ -83,8 +84,7 @@ class TestRegistry(RQTestCase): # Test that job is added with the right score self.registry.add(job, 1000) - self.assertLess(self.testconn.zscore(self.registry.key, job.id), - timestamp + 1002) + self.assertLess(self.testconn.zscore(self.registry.key, job.id), timestamp + 1002) # Ensure that a timeout of -1 results in a score of inf self.registry.add(job, -1) @@ -144,8 +144,7 @@ class TestRegistry(RQTestCase): self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) - self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), - ['foo', 'bar']) + self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), ['foo', 'bar']) # CanceledJobRegistry does not implement get_expired_job_ids() registry = CanceledJobRegistry(connection=self.testconn) @@ -268,12 +267,10 @@ class TestRegistry(RQTestCase): self.assertEqual(registry.get_queue(), Queue(connection=self.testconn)) registry = StartedJobRegistry('foo', connection=self.testconn, serializer=JSONSerializer) - self.assertEqual(registry.get_queue(), - Queue('foo', connection=self.testconn, serializer=JSONSerializer)) + self.assertEqual(registry.get_queue(), Queue('foo', connection=self.testconn, serializer=JSONSerializer)) class TestFinishedJobRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = FinishedJobRegistry(connection=self.testconn) @@ -321,7 +318,6 @@ class TestFinishedJobRegistry(RQTestCase): class TestDeferredRegistry(RQTestCase): - def setUp(self): super().setUp() self.registry = DeferredJobRegistry(connection=self.testconn) @@ -333,8 +329,7 @@ class TestDeferredRegistry(RQTestCase): """Adding a job to DeferredJobsRegistry.""" job = Job() self.registry.add(job) - job_ids = [as_text(job_id) for job_id in - self.testconn.zrange(self.registry.key, 0, -1)] + job_ids = [as_text(job_id) for job_id in self.testconn.zrange(self.registry.key, 0, -1)] self.assertEqual(job_ids, [job.id]) def test_register_dependency(self): @@ -352,7 +347,6 @@ class TestDeferredRegistry(RQTestCase): class TestFailedJobRegistry(RQTestCase): - def test_default_failure_ttl(self): """Job TTL defaults to DEFAULT_FAILURE_TTL""" queue = Queue(connection=self.testconn) @@ -511,11 +505,9 @@ class TestFailedJobRegistry(RQTestCase): w.handle_job_failure(job, q) # job is added to FailedJobRegistry with default failure ttl self.assertIn(job.id, registry.get_job_ids()) - self.assertLess(self.testconn.zscore(registry.key, job.id), - timestamp + DEFAULT_FAILURE_TTL + 5) + self.assertLess(self.testconn.zscore(registry.key, job.id), timestamp + DEFAULT_FAILURE_TTL + 5) # job is added to FailedJobRegistry with specified ttl job = q.enqueue(div_by_zero, failure_ttl=5) w.handle_job_failure(job, q) - self.assertLess(self.testconn.zscore(registry.key, job.id), - timestamp + 7) + self.assertLess(self.testconn.zscore(registry.key, job.id), timestamp + 7) diff --git a/tests/test_results.py b/tests/test_results.py index 4286cec..e27e872 100644 --- a/tests/test_results.py +++ b/tests/test_results.py @@ -1,13 +1,10 @@ -import unittest import tempfile - +import unittest from datetime import timedelta -from unittest.mock import patch, PropertyMock +from unittest.mock import PropertyMock, patch from redis import Redis -from tests import RQTestCase - from rq.defaults import UNSERIALIZABLE_RETURN_VALUE_PAYLOAD from rq.job import Job from rq.queue import Queue @@ -15,13 +12,13 @@ from rq.registry import StartedJobRegistry from rq.results import Result, get_key from rq.utils import get_version, utcnow from rq.worker import Worker +from tests import RQTestCase -from .fixtures import say_hello, div_by_zero +from .fixtures import div_by_zero, say_hello @unittest.skipIf(get_version(Redis()) < (5, 0, 0), 'Skip if Redis server < 5.0') class TestScheduledJobRegistry(RQTestCase): - def test_save_and_get_result(self): """Ensure data is saved properly""" queue = Queue(connection=self.connection) @@ -159,8 +156,7 @@ class TestScheduledJobRegistry(RQTestCase): registry = StartedJobRegistry(connection=self.connection) job.started_at = utcnow() job.ended_at = job.started_at + timedelta(seconds=0.75) - worker.handle_job_failure(job, exc_string='Error', queue=queue, - started_job_registry=registry) + worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry) job = Job.fetch(job.id, connection=self.connection) payload = self.connection.hgetall(job.key) @@ -181,8 +177,7 @@ class TestScheduledJobRegistry(RQTestCase): # If `save_result_to_job` = True, result will be saved to job # hash, simulating older versions of RQ - worker.handle_job_failure(job, exc_string='Error', queue=queue, - started_job_registry=registry) + worker.handle_job_failure(job, exc_string='Error', queue=queue, started_job_registry=registry) payload = self.connection.hgetall(job.key) self.assertTrue(b'exc_info' in payload.keys()) # Delete all new result objects so we only have result stored in job hash, diff --git a/tests/test_retry.py b/tests/test_retry.py index e8fddeb..ed2d477 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -9,7 +9,6 @@ from tests.fixtures import div_by_zero, say_hello class TestRetry(RQTestCase): - def test_persistence_of_retry_data(self): """Retry related data is stored and restored properly""" job = Job.create(func=fixtures.some_calculation) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 96cde1c..8aa722a 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,10 +1,10 @@ import os -import redis - from datetime import datetime, timedelta, timezone from multiprocessing import Process from unittest import mock +import redis + from rq import Queue from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL from rq.exceptions import NoSuchJobError @@ -15,6 +15,7 @@ from rq.serializers import JSONSerializer from rq.utils import current_timestamp from rq.worker import Worker from tests import RQTestCase, find_empty_redis_database, ssl_test + from .fixtures import kill_worker, say_hello diff --git a/tests/test_sentry.py b/tests/test_sentry.py index f52f7db..4ae9722 100644 --- a/tests/test_sentry.py +++ b/tests/test_sentry.py @@ -1,15 +1,15 @@ +from unittest import mock + +from click.testing import CliRunner + from rq import Queue from rq.cli import main from rq.cli.helpers import read_config_file from rq.contrib.sentry import register_sentry from rq.worker import SimpleWorker - from tests import RQTestCase from tests.fixtures import div_by_zero -from unittest import mock -from click.testing import CliRunner - class FakeSentry: servers = [] diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 0c50fa7..6ef7ed8 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -18,10 +18,7 @@ class TestSerializers(unittest.TestCase): test_data = {'test': 'data'} serialized_data = serializer.dumps(test_data) self.assertEqual(serializer.loads(serialized_data), test_data) - self.assertEqual( - next(pickletools.genops(serialized_data))[1], - pickle.HIGHEST_PROTOCOL - ) + self.assertEqual(next(pickletools.genops(serialized_data))[1], pickle.HIGHEST_PROTOCOL) # Test using json serializer serializer = resolve_serializer(json) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 1f392a3..42cd207 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -1,8 +1,8 @@ import time from rq import Queue, SimpleWorker -from rq.timeouts import TimerDeathPenalty from rq.registry import FailedJobRegistry, FinishedJobRegistry +from rq.timeouts import TimerDeathPenalty from tests import RQTestCase diff --git a/tests/test_utils.py b/tests/test_utils.py index b71e67e..2dbb613 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,9 @@ -import re import datetime +import re from unittest.mock import Mock from redis import Redis -from tests import RQTestCase, fixtures from rq.exceptions import TimeoutFormatError from rq.utils import ( backend_class, @@ -16,11 +15,12 @@ from rq.utils import ( import_attribute, is_nonstring_iterable, parse_timeout, - utcparse, split_list, truncate_long_string, + utcparse, ) from rq.worker import SimpleWorker +from tests import RQTestCase, fixtures class TestUtils(RQTestCase): diff --git a/tests/test_worker.py b/tests/test_worker.py index 5410180..285ae42 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,57 +1,52 @@ import json import os -import psutil import shutil import signal import subprocess import sys import time import zlib - from datetime import datetime, timedelta from multiprocessing import Process from time import sleep +from unittest import mock, skipIf +from unittest.mock import Mock -from unittest import skipIf - -import redis.exceptions +import psutil import pytest -from unittest import mock -from unittest.mock import Mock +import redis.exceptions +from rq import Queue, SimpleWorker, Worker, get_current_connection from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL +from rq.job import Job, JobStatus, Retry +from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry +from rq.results import Result +from rq.serializers import JSONSerializer +from rq.suspension import resume, suspend +from rq.utils import as_text, utcnow +from rq.version import VERSION +from rq.worker import HerokuWorker, RandomWorker, RoundRobinWorker, WorkerStatus from tests import RQTestCase, slow from tests.fixtures import ( + CustomJob, access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, - CustomJob, div_by_zero, do_nothing, kill_worker, + launch_process_within_worker_and_store_pid, long_running_job, modify_self, modify_self_and_error, + raise_exc_mock, run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, - raise_exc_mock, - launch_process_within_worker_and_store_pid, ) -from rq import Queue, SimpleWorker, Worker, get_current_connection -from rq.utils import as_text -from rq.job import Job, JobStatus, Retry -from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry -from rq.results import Result -from rq.suspension import resume, suspend -from rq.utils import utcnow -from rq.version import VERSION -from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker -from rq.serializers import JSONSerializer - class CustomQueue(Queue): pass @@ -656,7 +651,10 @@ class TestWorker(RQTestCase): self.assertIsNone(w.dequeue_job_and_maintain_ttl(None)) def test_worker_ttl_param_resolves_timeout(self): - """Ensures the worker_ttl param is being considered in the dequeue_timeout and connection_timeout params, takes into account 15 seconds gap (hard coded)""" + """ + Ensures the worker_ttl param is being considered in the dequeue_timeout and + connection_timeout params, takes into account 15 seconds gap (hard coded) + """ q = Queue() w = Worker([q]) self.assertEqual(w.dequeue_timeout, 405) diff --git a/tests/test_worker_pool.py b/tests/test_worker_pool.py index ab2e677..219b4a8 100644 --- a/tests/test_worker_pool.py +++ b/tests/test_worker_pool.py @@ -1,18 +1,16 @@ import os import signal - from multiprocessing import Process from time import sleep -from rq.job import JobStatus - -from tests import TestCase -from tests.fixtures import CustomJob, _send_shutdown_command, long_running_job, say_hello from rq.connections import parse_connection +from rq.job import JobStatus from rq.queue import Queue from rq.serializers import JSONSerializer from rq.worker import SimpleWorker -from rq.worker_pool import run_worker, WorkerPool +from rq.worker_pool import WorkerPool, run_worker +from tests import TestCase +from tests.fixtures import CustomJob, _send_shutdown_command, long_running_job, say_hello def wait_and_send_shutdown_signal(pid, time_to_wait=0.0): @@ -111,9 +109,7 @@ class TestWorkerPool(TestCase): queue.enqueue(say_hello) connection_class, pool_class, pool_kwargs = parse_connection(self.connection) - run_worker( - 'test-worker', ['foo'], connection_class, pool_class, pool_kwargs - ) + run_worker('test-worker', ['foo'], connection_class, pool_class, pool_kwargs) # Worker should have processed the job self.assertEqual(len(queue), 0) diff --git a/tests/test_worker_registration.py b/tests/test_worker_registration.py index 30a3c82..26ee617 100644 --- a/tests/test_worker_registration.py +++ b/tests/test_worker_registration.py @@ -1,15 +1,19 @@ -from rq.utils import ceildiv -from tests import RQTestCase from unittest.mock import patch from rq import Queue, Worker -from rq.worker_registration import (clean_worker_registry, get_keys, register, - unregister, REDIS_WORKER_KEYS, - WORKERS_BY_QUEUE_KEY) +from rq.utils import ceildiv +from rq.worker_registration import ( + REDIS_WORKER_KEYS, + WORKERS_BY_QUEUE_KEY, + clean_worker_registry, + get_keys, + register, + unregister, +) +from tests import RQTestCase class TestWorkerRegistry(RQTestCase): - def test_worker_registration(self): """Ensure worker.key is correctly set in Redis.""" foo_queue = Queue(name='foo') @@ -21,23 +25,15 @@ class TestWorkerRegistry(RQTestCase): self.assertTrue(redis.sismember(worker.redis_workers_keys, worker.key)) self.assertEqual(Worker.count(connection=redis), 1) - self.assertTrue( - redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) - ) + self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)) self.assertEqual(Worker.count(queue=foo_queue), 1) - self.assertTrue( - redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) - ) + self.assertTrue(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)) self.assertEqual(Worker.count(queue=bar_queue), 1) unregister(worker) self.assertFalse(redis.sismember(worker.redis_workers_keys, worker.key)) - self.assertFalse( - redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key) - ) - self.assertFalse( - redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key) - ) + self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % foo_queue.name, worker.key)) + self.assertFalse(redis.sismember(WORKERS_BY_QUEUE_KEY % bar_queue.name, worker.key)) def test_get_keys_by_queue(self): """get_keys_by_queue only returns active workers for that queue""" @@ -56,17 +52,11 @@ class TestWorkerRegistry(RQTestCase): register(worker3) # get_keys(queue) will return worker keys for that queue - self.assertEqual( - set([worker1.key, worker2.key]), - get_keys(foo_queue) - ) + self.assertEqual(set([worker1.key, worker2.key]), get_keys(foo_queue)) self.assertEqual(set([worker1.key]), get_keys(bar_queue)) # get_keys(connection=connection) will return all worker keys - self.assertEqual( - set([worker1.key, worker2.key, worker3.key]), - get_keys(connection=worker1.connection) - ) + self.assertEqual(set([worker1.key, worker2.key, worker3.key]), get_keys(connection=worker1.connection)) # Calling get_keys without arguments raises an exception self.assertRaises(ValueError, get_keys) @@ -105,8 +95,9 @@ class TestWorkerRegistry(RQTestCase): worker = Worker([queue]) register(worker) - with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), \ - patch.object(queue.connection, 'pipeline', wraps=queue.connection.pipeline) as pipeline_mock: + with patch('rq.worker_registration.MAX_KEYS', MAX_KEYS), patch.object( + queue.connection, 'pipeline', wraps=queue.connection.pipeline + ) as pipeline_mock: # clean_worker_registry creates a pipeline with a context manager. Configure the mock using the context # manager entry method __enter__ pipeline_mock.return_value.__enter__.return_value.srem.return_value = None diff --git a/tox.ini b/tox.ini index b2644f3..5adb901 100644 --- a/tox.ini +++ b/tox.ini @@ -1,8 +1,8 @@ [tox] -envlist=py36,py37,py38,py39,py310,flake8 +envlist=lint,py36,py37,py38,py39,py310 [testenv] -commands=pytest --cov rq --durations=5 {posargs} +commands=pytest --cov rq --cov-config=.coveragerc --durations=5 {posargs} deps= pytest pytest-cov @@ -13,13 +13,14 @@ passenv= RUN_SSL_TESTS RUN_SLOW_TESTS_TOO -[testenv:flake8] -basepython = python3.6 +[testenv:lint] +basepython = python3.10 deps = - flake8 + black + ruff commands = - flake8 rq tests - + black --check rq tests + ruff check rq tests [testenv:py36] skipdist = True