From fdb14df18141b6bedc40d719904f56e64984bc37 Mon Sep 17 00:00:00 2001 From: Paul Spooren Date: Sat, 4 Feb 2023 01:42:51 +0100 Subject: [PATCH] Black style (#1292) * treewide: apply black style This PR applied the black code style, adds it to the CI and README. The changes look big, however no functional changed are applied at all. The line length is set to 120, which is different from black recommendation of 88. I would suggest to stick to the 88 recommendation and adapt the flake8 check. Add the `-S` parameter to `black` to keep single quotes. Signed-off-by: Paul Spooren * README: add black badge Help people to find the used code style. Signed-off-by: Paul Spooren * CI: check codestyle via black Automatically run the CI and check that the code style is still black. Signed-off-by: Paul Spooren --------- Signed-off-by: Paul Spooren --- .github/workflows/lint.yml | 6 +- README.md | 2 + rq/__init__.py | 3 +- rq/cli/cli.py | 270 +++++++++++++++--------- rq/cli/helpers.py | 57 ++--- rq/command.py | 2 +- rq/connections.py | 17 +- rq/contrib/sentry.py | 1 + rq/decorators.py | 49 +++-- rq/defaults.py | 1 - rq/job.py | 259 +++++++++++------------ rq/local.py | 8 +- rq/logutils.py | 13 +- rq/queue.py | 413 ++++++++++++++++++++++++++----------- rq/registry.py | 63 +++--- rq/results.py | 59 ++++-- rq/scheduler.py | 38 ++-- rq/serializers.py | 4 +- rq/timeouts.py | 20 +- rq/utils.py | 41 ++-- rq/worker.py | 337 ++++++++++++++++-------------- 21 files changed, 1006 insertions(+), 657 deletions(-) diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index ea767a7..c99348d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,7 +25,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 + pip install flake8 black - name: Lint with flake8 run: | @@ -33,3 +33,7 @@ jobs: flake8 . --select=E9,F63,F7,F82 --show-source # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --exit-zero --max-complexity=5 + + - name: Lint with black + run: | + black -S -l 120 rq/ diff --git a/README.md b/README.md index 40d314b..5fe80cd 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ RQ requires Redis >= 3.0.0. [![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) [![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) + Full documentation can be found [here][d]. diff --git a/rq/__init__.py b/rq/__init__.py index abca97c..d5db681 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,7 +1,6 @@ # flake8: noqa -from .connections import (Connection, get_current_connection, pop_connection, - push_connection, use_connection) +from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection from .job import cancel_job, get_current_job, requeue_job, Retry from .queue import Queue from .version import VERSION diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 75c5974..f781010 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -10,22 +10,34 @@ import click from redis.exceptions import ConnectionError from rq import Connection, Retry, __version__ as version -from rq.cli.helpers import (read_config_file, refresh, - setup_loghandlers_from_args, - show_both, show_queues, show_workers, CliConfig, parse_function_args, - parse_schedule) +from rq.cli.helpers import ( + read_config_file, + refresh, + setup_loghandlers_from_args, + show_both, + show_queues, + show_workers, + CliConfig, + parse_function_args, + parse_schedule, +) from rq.contrib.legacy import cleanup_ghosts -from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, - DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, - DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, - DEFAULT_SERIALIZER_CLASS) +from rq.defaults import ( + DEFAULT_CONNECTION_CLASS, + DEFAULT_JOB_CLASS, + DEFAULT_QUEUE_CLASS, + DEFAULT_WORKER_CLASS, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, + DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_SERIALIZER_CLASS, +) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries from rq.utils import import_attribute, get_call_string, make_colorizer -from rq.suspension import (suspend as connection_suspend, - resume as connection_resume, is_suspended) +from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended from rq.worker_registration import clean_worker_registry from rq.job import JobStatus @@ -39,35 +51,26 @@ click.disable_unicode_literals_warning = True shared_options = [ - click.option('--url', '-u', - envvar='RQ_REDIS_URL', - help='URL describing Redis connection details.'), - click.option('--config', '-c', - envvar='RQ_CONFIG', - help='Module containing RQ settings.'), - click.option('--worker-class', '-w', - envvar='RQ_WORKER_CLASS', - default=DEFAULT_WORKER_CLASS, - help='RQ Worker class to use'), - click.option('--job-class', '-j', - envvar='RQ_JOB_CLASS', - default=DEFAULT_JOB_CLASS, - help='RQ Job class to use'), - click.option('--queue-class', - envvar='RQ_QUEUE_CLASS', - default=DEFAULT_QUEUE_CLASS, - help='RQ Queue class to use'), - click.option('--connection-class', - envvar='RQ_CONNECTION_CLASS', - default=DEFAULT_CONNECTION_CLASS, - help='Redis client class to use'), - click.option('--path', '-P', - default=['.'], - help='Specify the import path.', - multiple=True), - click.option('--serializer', '-S', - default=DEFAULT_SERIALIZER_CLASS, - help='Path to serializer, defaults to rq.serializers.DefaultSerializer') + click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'), + click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'), + click.option( + '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use' + ), + click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'), + click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'), + click.option( + '--connection-class', + envvar='RQ_CONNECTION_CLASS', + default=DEFAULT_CONNECTION_CLASS, + help='Redis client class to use', + ), + click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True), + click.option( + '--serializer', + '-S', + default=DEFAULT_SERIALIZER_CLASS, + help='Path to serializer, defaults to rq.serializers.DefaultSerializer', + ), ] @@ -100,15 +103,16 @@ def empty(cli_config, all, queues, serializer, **options): """Empty given queues.""" if all: - queues = cli_config.queue_class.all(connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) + queues = cli_config.queue_class.all( + connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) else: - queues = [cli_config.queue_class(queue, - connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) - for queue in queues] + queues = [ + cli_config.queue_class( + queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) + for queue in queues + ] if not queues: click.echo('Nothing to do') @@ -127,10 +131,9 @@ def empty(cli_config, all, queues, serializer, **options): def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): """Requeue failed jobs.""" - failed_job_registry = FailedJobRegistry(queue, - connection=cli_config.connection, - job_class=job_class, - serializer=serializer) + failed_job_registry = FailedJobRegistry( + queue, connection=cli_config.connection, job_class=job_class, serializer=serializer + ) if all: job_ids = failed_job_registry.get_job_ids() @@ -159,8 +162,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') @click.argument('queues', nargs=-1) @pass_cli_config -def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, - **options): +def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, **options): """RQ command-line monitor.""" if only_queues: @@ -182,8 +184,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, clean_registries(queue) clean_worker_registry(queue) - refresh(interval, func, qs, raw, by_queue, - cli_config.queue_class, cli_config.worker_class) + refresh(interval, func, qs, raw, by_queue, cli_config.queue_class, cli_config.worker_class) except ConnectionError as e: click.echo(e) sys.exit(1) @@ -200,7 +201,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--name', '-n', help='Specify a different name') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') @click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') -@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used') +@click.option( + '--job-monitoring-interval', + type=int, + default=DEFAULT_JOB_MONITORING_INTERVAL, + help='Default job monitoring interval to be used', +) @click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @@ -215,11 +221,31 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.argument('queues', nargs=-1) @pass_cli_config -def worker(cli_config, burst, logging_level, name, results_ttl, - worker_ttl, job_monitoring_interval, disable_job_desc_logging, - verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn, - exception_handler, pid, disable_default_exception_handler, max_jobs, - with_scheduler, queues, log_format, date_format, serializer, **options): +def worker( + cli_config, + burst, + logging_level, + name, + results_ttl, + worker_ttl, + job_monitoring_interval, + disable_job_desc_logging, + verbose, + quiet, + sentry_ca_certs, + sentry_debug, + sentry_dsn, + exception_handler, + pid, + disable_default_exception_handler, + max_jobs, + with_scheduler, + queues, + log_format, + date_format, + serializer, + **options +): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} # Worker specific default arguments @@ -245,38 +271,46 @@ def worker(cli_config, burst, logging_level, name, results_ttl, click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') sys.exit(1) - queues = [cli_config.queue_class(queue, - connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) - for queue in queues] + queues = [ + cli_config.queue_class( + queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) + for queue in queues + ] worker = cli_config.worker_class( - queues, name=name, connection=cli_config.connection, - default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + queues, + name=name, + connection=cli_config.connection, + default_worker_ttl=worker_ttl, + default_result_ttl=results_ttl, job_monitoring_interval=job_monitoring_interval, - job_class=cli_config.job_class, queue_class=cli_config.queue_class, + job_class=cli_config.job_class, + queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer + serializer=serializer, ) # Should we configure Sentry? if sentry_dsn: - sentry_opts = { - "ca_certs": sentry_ca_certs, - "debug": sentry_debug - } + sentry_opts = {"ca_certs": sentry_ca_certs, "debug": sentry_debug} from rq.contrib.sentry import register_sentry + register_sentry(sentry_dsn, **sentry_opts) # if --verbose or --quiet, override --logging_level if verbose or quiet: logging_level = None - worker.work(burst=burst, logging_level=logging_level, - date_format=date_format, log_format=log_format, - max_jobs=max_jobs, with_scheduler=with_scheduler) + worker.work( + burst=burst, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + max_jobs=max_jobs, + with_scheduler=with_scheduler, + ) except ConnectionError as e: print(e) sys.exit(1) @@ -296,7 +330,9 @@ def suspend(cli_config, duration, **options): if duration: msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will - automatically resume""".format(duration) + automatically resume""".format( + duration + ) click.echo(msg) else: click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") @@ -312,27 +348,51 @@ def resume(cli_config, **options): @main.command() @click.option('--queue', '-q', help='The name of the queue.', default='default') -@click.option('--timeout', - help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.') +@click.option( + '--timeout', help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.' +) @click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.') @click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.') @click.option('--failure-ttl', help='Specifies how long failed jobs are kept.') @click.option('--description', help='Additional description of the job') -@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.', - multiple=True) +@click.option( + '--depends-on', help='Specifies another job id that must complete before this job will be queued.', multiple=True +) @click.option('--job-id', help='The id of this job') @click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') @click.option('--retry-max', help='Maximum amount of retries', default=0, type=int) @click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) @click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') -@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' - 'timezone (e.g. 2021-05-27T21:45:00).') +@click.option( + '--schedule-at', + help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' + 'timezone (e.g. 2021-05-27T21:45:00).', +) @click.option('--quiet', is_flag=True, help='Only logs errors.') @click.argument('function') @click.argument('arguments', nargs=-1) @pass_cli_config -def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, - retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options): +def enqueue( + cli_config, + queue, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + at_front, + retry_max, + retry_interval, + schedule_in, + schedule_at, + quiet, + serializer, + function, + arguments, + **options +): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) function_string = get_call_string(function, args, kwargs) @@ -348,11 +408,37 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio queue = cli_config.queue_class(queue, serializer=serializer) if schedule is None: - job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, - description, depends_on, job_id, at_front, None, retry) + job = queue.enqueue_call( + function, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + at_front, + None, + retry, + ) else: - job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, - description, depends_on, job_id, None, JobStatus.SCHEDULED, retry) + job = queue.create_job( + function, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + None, + JobStatus.SCHEDULED, + retry, + ) queue.schedule_job(job, schedule) if not quiet: diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index af812ce..fb20109 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -13,8 +13,7 @@ from shutil import get_terminal_size import click from redis import Redis from redis.sentinel import Sentinel -from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) +from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout from rq.worker import WorkerStatus @@ -27,16 +26,14 @@ yellow = partial(click.style, fg='yellow') def read_config_file(module): """Reads all UPPERCASE variables defined in the given module file.""" settings = importlib.import_module(module) - return dict([(k, v) - for k, v in settings.__dict__.items() - if k.upper() == k]) + return dict([(k, v) for k, v in settings.__dict__.items() if k.upper() == k]) def get_redis_from_config(settings, connection_class=Redis): """Returns a StrictRedis instance from a dictionary of settings. - To use redis sentinel, you must specify a dictionary in the configuration file. - Example of a dictionary with keys without values: - SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':} + To use redis sentinel, you must specify a dictionary in the configuration file. + Example of a dictionary with keys without values: + SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':} """ if settings.get('REDIS_URL') is not None: return connection_class.from_url(settings['REDIS_URL']) @@ -50,8 +47,7 @@ def get_redis_from_config(settings, connection_class=Redis): ssl = settings['SENTINEL'].get('SSL', False) arguments = {'password': password, 'ssl': ssl} sn = Sentinel( - instances, socket_timeout=socket_timeout, password=password, - db=db, ssl=ssl, sentinel_kwargs=arguments + instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments ) return sn.master_for(master_name) @@ -168,9 +164,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): for queue in queue_dict: if queue_dict[queue]: queues_str = ", ".join( - sorted( - map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]) - ) + sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])) ) else: queues_str = '–' @@ -188,6 +182,7 @@ def show_both(queues, raw, by_queue, queue_class, worker_class): if not raw: click.echo('') import datetime + click.echo('Updated: %s' % datetime.datetime.now()) @@ -233,14 +228,14 @@ def parse_function_arg(argument, arg_pos): if index > 0: if ':' in argument and argument.index(':') + 1 == index: # keyword, json mode = ParsingMode.JSON - keyword = argument[:index - 1] + keyword = argument[: index - 1] elif '%' in argument and argument.index('%') + 1 == index: # keyword, literal_eval mode = ParsingMode.LITERAL_EVAL - keyword = argument[:index - 1] + keyword = argument[: index - 1] else: # keyword, text mode = ParsingMode.PLAIN_TEXT keyword = argument[:index] - value = argument[index + 1:] + value = argument[index + 1 :] else: # no keyword, text mode = ParsingMode.PLAIN_TEXT value = argument @@ -261,9 +256,11 @@ def parse_function_arg(argument, arg_pos): try: value = literal_eval(value) except Exception: - raise click.BadParameter('Unable to eval %s as Python object. See ' - 'https://docs.python.org/3/library/ast.html#ast.literal_eval' - % (keyword or '%s. non keyword argument' % arg_pos)) + raise click.BadParameter( + 'Unable to eval %s as Python object. See ' + 'https://docs.python.org/3/library/ast.html#ast.literal_eval' + % (keyword or '%s. non keyword argument' % arg_pos) + ) return keyword, value @@ -294,9 +291,19 @@ def parse_schedule(schedule_in, schedule_at): class CliConfig: """A helper class to be used with click commands, to handle shared options""" - def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS, - job_class=DEFAULT_JOB_CLASS, queue_class=DEFAULT_QUEUE_CLASS, - connection_class=DEFAULT_CONNECTION_CLASS, path=None, *args, **kwargs): + + def __init__( + self, + url=None, + config=None, + worker_class=DEFAULT_WORKER_CLASS, + job_class=DEFAULT_JOB_CLASS, + queue_class=DEFAULT_QUEUE_CLASS, + connection_class=DEFAULT_CONNECTION_CLASS, + path=None, + *args, + **kwargs + ): self._connection = None self.url = url self.config = config @@ -331,9 +338,7 @@ class CliConfig: self._connection = self.connection_class.from_url(self.url) elif self.config: settings = read_config_file(self.config) if self.config else {} - self._connection = get_redis_from_config(settings, - self.connection_class) + self._connection = get_redis_from_config(settings, self.connection_class) else: - self._connection = get_redis_from_config(os.environ, - self.connection_class) + self._connection = get_redis_from_config(os.environ, self.connection_class) return self._connection diff --git a/rq/command.py b/rq/command.py index 0f8ef6e..4566ec0 100644 --- a/rq/command.py +++ b/rq/command.py @@ -22,7 +22,7 @@ def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs): - `shutdown`: Shuts down a worker - `kill-horse`: Command for the worker to kill the current working horse - `stop-job`: A command for the worker to stop the currently running job - + The command string will be parsed into a dictionary and send to a PubSub Topic. Workers listen to the PubSub, and `handle` the specific command. diff --git a/rq/connections.py b/rq/connections.py index c5ebc20..413ee5a 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -30,8 +30,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa Args: connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. """ - warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.", - DeprecationWarning) + warnings.warn( + "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning + ) if connection is None: connection = Redis() push_connection(connection) @@ -39,9 +40,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa yield finally: popped = pop_connection() - assert popped == connection, \ - 'Unexpected Redis connection was popped off the stack. ' \ - 'Check your Redis connection setup.' + assert popped == connection, ( + 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.' + ) def push_connection(redis: 'Redis'): @@ -72,8 +73,7 @@ def use_connection(redis: Optional['Redis'] = None): Args: redis (Optional[Redis], optional): A Redis Connection. Defaults to None. """ - assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection()' + assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()' release_local(_connection_stack) if redis is None: @@ -118,5 +118,4 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis': _connection_stack = LocalStack() -__all__ = ['Connection', 'get_current_connection', 'push_connection', - 'pop_connection', 'use_connection'] +__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection'] diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index edf0fe4..efb55b2 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -4,4 +4,5 @@ def register_sentry(sentry_dsn, **opts): """ import sentry_sdk from sentry_sdk.integrations.rq import RqIntegration + sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()], **opts) diff --git a/rq/decorators.py b/rq/decorators.py index 70a61aa..2bf46e8 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -13,12 +13,23 @@ from .utils import backend_class class job: # noqa queue_class = Queue - def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None, - result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None, - queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None, - meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None, - retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None, - on_success: Optional[Callable[..., Any]] = None): + def __init__( + self, + queue: Union['Queue', str], + connection: Optional['Redis'] = None, + timeout: Optional[int] = None, + result_ttl: int = DEFAULT_RESULT_TTL, + ttl: Optional[int] = None, + queue_class: Optional['Queue'] = None, + depends_on: Optional[List[Any]] = None, + at_front: Optional[bool] = None, + meta: Optional[Dict[Any, Any]] = None, + description: Optional[str] = None, + failure_ttl: Optional[int] = None, + retry: Optional['Retry'] = None, + on_failure: Optional[Callable[..., Any]] = None, + on_success: Optional[Callable[..., Any]] = None, + ): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string @@ -68,8 +79,7 @@ class job: # noqa @wraps(f) def delay(*args, **kwargs): if isinstance(self.queue, str): - queue = self.queue_class(name=self.queue, - connection=self.connection) + queue = self.queue_class(name=self.queue, connection=self.connection) else: queue = self.queue @@ -83,10 +93,23 @@ class job: # noqa if not at_front: at_front = self.at_front - return queue.enqueue_call(f, args=args, kwargs=kwargs, - timeout=self.timeout, result_ttl=self.result_ttl, - ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front, - meta=self.meta, description=self.description, failure_ttl=self.failure_ttl, - retry=self.retry, on_failure=self.on_failure, on_success=self.on_success) + return queue.enqueue_call( + f, + args=args, + kwargs=kwargs, + timeout=self.timeout, + result_ttl=self.result_ttl, + ttl=self.ttl, + depends_on=depends_on, + job_id=job_id, + at_front=at_front, + meta=self.meta, + description=self.description, + failure_ttl=self.failure_ttl, + retry=self.retry, + on_failure=self.on_failure, + on_success=self.on_success, + ) + f.delay = delay return f diff --git a/rq/defaults.py b/rq/defaults.py index ef76678..bd50489 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -88,4 +88,3 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' Uses Python's default attributes as defined https://docs.python.org/3/library/logging.html#logrecord-attributes """ - diff --git a/rq/job.py b/rq/job.py index 9104747..61b359f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -8,8 +8,7 @@ from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum from redis import WatchError -from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List, - Optional, Tuple, Union) +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from uuid import uuid4 @@ -20,18 +19,27 @@ if TYPE_CHECKING: from redis.client import Pipeline from .connections import resolve_connection -from .exceptions import (DeserializationError, InvalidJobOperation, - NoSuchJobError) +from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack from .serializers import resolve_serializer from .types import FunctionReferenceType, JobDependencyType -from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string, - get_version, import_attribute, parse_timeout, str_to_date, - utcformat, utcnow) +from .utils import ( + as_text, + decode_redis_hash, + ensure_list, + get_call_string, + get_version, + import_attribute, + parse_timeout, + str_to_date, + utcformat, + utcnow, +) class JobStatus(str, Enum): - """The Status of Job within its lifecycle at any given time. """ + """The Status of Job within its lifecycle at any given time.""" + QUEUED = 'queued' FINISHED = 'finished' FAILED = 'failed' @@ -57,13 +65,9 @@ class Dependency: Raises: ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty - """ + """ dependent_jobs = ensure_list(jobs) - if not all( - isinstance(job, Job) or isinstance(job, str) - for job in dependent_jobs - if job - ): + if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job): raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") elif len(dependent_jobs) < 1: raise ValueError("jobs: cannot be empty.") @@ -79,8 +83,7 @@ yet been evaluated. """ -def cancel_job(job_id: str, connection: Optional['Redis'] = None, - serializer=None, enqueue_dependents: bool = False): +def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): """Cancels the job with the given job ID, preventing execution. Use with caution. This will discard any job info (i.e. it can't be requeued later). @@ -103,13 +106,11 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J Returns: job (Optional[Job]): The current Job running - """ + """ if connection: - warnings.warn("connection argument for get_current_job is deprecated.", - DeprecationWarning) + warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning) if job_class: - warnings.warn("job_class argument for get_current_job is deprecated.", - DeprecationWarning) + warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning) return _job_stack.top @@ -123,26 +124,38 @@ def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job': Returns: Job: The requeued Job object. - """ + """ job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() class Job: """A Job is just a convenient datastructure to pass around job (meta) data.""" + redis_job_namespace_prefix = 'rq:job:' @classmethod - def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None, - kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None, - result_ttl: Optional[int] = None, ttl: Optional[int] = None, - status: Optional[JobStatus] = None, description: Optional[str] =None, - depends_on: Optional[JobDependencyType] = None, - timeout: Optional[int] = None, id: Optional[str] = None, - origin=None, meta: Optional[Dict[str, Any]] = None, - failure_ttl: Optional[int] = None, serializer=None, *, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None) -> 'Job': + def create( + cls, + func: FunctionReferenceType, + args: Union[List[Any], Optional[Tuple]] = None, + kwargs: Optional[Dict[str, Any]] = None, + connection: Optional['Redis'] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + status: Optional[JobStatus] = None, + description: Optional[str] = None, + depends_on: Optional[JobDependencyType] = None, + timeout: Optional[int] = None, + id: Optional[str] = None, + origin=None, + meta: Optional[Dict[str, Any]] = None, + failure_ttl: Optional[int] = None, + serializer=None, + *, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None + ) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -185,7 +198,7 @@ class Job: Returns: Job: A job instance. - """ + """ if args is None: args = () if kwargs is None: @@ -247,10 +260,7 @@ class Job: depends_on_list = depends_on.dependencies else: depends_on_list = ensure_list(depends_on) - job._dependency_ids = [ - dep.id if isinstance(dep, Job) else dep - for dep in depends_on_list - ] + job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list] return job @@ -259,8 +269,9 @@ class Job: Returns: position (Optional[int]): The position - """ + """ from .queue import Queue + if self.origin: q = Queue(name=self.origin, connection=self.connection) return q.get_job_position(self._id) @@ -274,7 +285,7 @@ class Job: Returns: status (JobStatus): The Job Status - """ + """ if refresh: self._status = as_text(self.connection.hget(self.key, 'status')) return self._status @@ -285,7 +296,7 @@ class Job: Args: status (JobStatus): The Job Status to be set pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None. - """ + """ self._status = status connection: 'Redis' = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @@ -298,7 +309,7 @@ class Job: Returns: meta (Dict): The dictionary of metadata - """ + """ if refresh: meta = self.connection.hget(self.key, 'meta') self.meta = self.serializer.loads(meta) if meta else {} @@ -401,7 +412,7 @@ class Job: Raises: DeserializationError: Cathes any deserialization error (since serializers are generic) - """ + """ try: self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) except Exception as e: @@ -522,7 +533,7 @@ class Job: job_ids (Iterable[str]): A list of job ids. connection (Redis): Redis connection serializer (Callable): A serializer - + Returns: jobs (list[Job]): A list of Jobs instances. """ @@ -580,17 +591,14 @@ class Job: self.enqueue_at_front: Optional[bool] = None from .results import Result + self._cached_result: Optional[Result] = None def __repr__(self): # noqa # pragma: no cover - return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, - self._id, - self.enqueued_at) + return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, self._id, self.enqueued_at) def __str__(self): - return '<{0} {1}: {2}>'.format(self.__class__.__name__, - self.id, - self.description) + return '<{0} {1}: {2}>'.format(self.__class__.__name__, self.id, self.description) def __eq__(self, other): # noqa return isinstance(other, self.__class__) and self.id == other.id @@ -612,7 +620,7 @@ class Job: def set_id(self, value: str) -> None: """Sets a job ID for the given job - + Args: value (str): The value to set as Job ID """ @@ -630,7 +638,7 @@ class Job: ttl (int): The time to live pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None. xx (bool, optional): Only sets the key if already exists. Defaults to False. - """ + """ self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -690,14 +698,15 @@ class Job: Returns: jobs (list[Job]): A list of Jobs - """ + """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: - connection.watch(*[self.key_for(dependency_id) - for dependency_id in self._dependency_ids]) + connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) - dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer) + dependencies_list = self.fetch_many( + self._dependency_ids, connection=self.connection, serializer=self.serializer + ) jobs = [job for job in dependencies_list if job] return jobs @@ -706,8 +715,7 @@ class Job: """ Get the latest result and returns `exc_info` only if the latest result is a failure. """ - warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", - DeprecationWarning) + warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning) from .results import Result @@ -730,6 +738,7 @@ class Job: result (Optional[Any]): The job return value. """ from .results import Result + if refresh: self._cached_result = None @@ -741,7 +750,7 @@ class Job: if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: return self._cached_result.return_value - + return None @property @@ -762,8 +771,7 @@ class Job: seconds by default). """ - warnings.warn("job.result is deprecated, use job.return_value instead.", - DeprecationWarning) + warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning) from .results import Result @@ -789,6 +797,7 @@ class Job: all_results (List[Result]): A list of 'Result' objects """ from .results import Result + return Result.all(self, serializer=self.serializer) def latest_result(self) -> Optional['Result']: @@ -796,9 +805,10 @@ class Job: Returns: result (Result): The Result object - """ + """ """Returns the latest Result object""" from .results import Result + return Result.fetch_latest(self, serializer=self.serializer) def restore(self, raw_data) -> Any: @@ -849,8 +859,7 @@ class Job: dep_ids = obj.get('dependency_ids') dep_id = obj.get('dependency_id') # for backwards compatibility - self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids - else [dep_id.decode()] if dep_id else []) + self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else [] allow_failures = obj.get('allow_dependency_failures') self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None @@ -902,7 +911,7 @@ class Job: 'started_at': utcformat(self.started_at) if self.started_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', - 'worker_name': self.worker_name or '' + 'worker_name': self.worker_name or '', } if self.retries_left is not None: @@ -948,8 +957,7 @@ class Job: return obj - def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, - include_result: bool = True): + def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, include_result: bool = True): """Dumps the current job instance to its corresponding Redis key. Exclude saving the `meta` dictionary by setting @@ -980,7 +988,7 @@ class Job: def get_redis_server_version(self) -> Tuple[int, int, int]: """Return Redis server version of connection - + Returns: redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9) """ @@ -1016,15 +1024,13 @@ class Job: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) from .registry import CanceledJobRegistry from .queue import Queue + pipe = pipeline or self.connection.pipeline() while True: try: q = Queue( - name=self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer + name=self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer ) self.set_status(JobStatus.CANCELED, pipeline=pipe) @@ -1033,16 +1039,10 @@ class Job: if pipeline is None: pipe.watch(self.dependents_key) q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id) - self._remove_from_registries( - pipeline=pipe, - remove_from_queue=True - ) + self._remove_from_registries(pipeline=pipe, remove_from_queue=True) registry = CanceledJobRegistry( - self.origin, - self.connection, - job_class=self.__class__, - serializer=self.serializer + self.origin, self.connection, job_class=self.__class__, serializer=self.serializer ) registry.add(self, pipeline=pipe) if pipeline is None: @@ -1070,41 +1070,43 @@ class Job: def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): from .registry import BaseRegistry + if remove_from_queue: from .queue import Queue + q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q.remove(self, pipeline=pipeline) registry: BaseRegistry if self.is_finished: from .registry import FinishedJobRegistry - registry = FinishedJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = FinishedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_deferred: from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = DeferredJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_started: from .registry import StartedJobRegistry - registry = StartedJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = StartedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_scheduled: from .registry import ScheduledJobRegistry - registry = ScheduledJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = ScheduledJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_failed or self.is_stopped: @@ -1112,13 +1114,15 @@ class Job: elif self.is_canceled: from .registry import CanceledJobRegistry - registry = CanceledJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = CanceledJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) - def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, - delete_dependents: bool = False): + def delete( + self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents: bool = False + ): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well. @@ -1184,7 +1188,7 @@ class Job: 'last_heartbeat': utcformat(self.last_heartbeat), 'status': self._status, 'started_at': utcformat(self.started_at), # type: ignore - 'worker_name': worker_name + 'worker_name': worker_name, } if self.get_redis_server_version() >= (4, 0, 0): pipeline.hset(self.key, mapping=mapping) @@ -1199,7 +1203,7 @@ class Job: Returns: result (Any): The function result - """ + """ result = self.func(*self.args, **self.kwargs) if asyncio.iscoroutine(result): loop = asyncio.new_event_loop() @@ -1214,7 +1218,7 @@ class Job: Args: default_ttl (Optional[int]): The default time to live for the job - + Returns: ttl (int): The time to live """ @@ -1227,7 +1231,7 @@ class Job: Args: default_ttl (Optional[int]): The default time to live for the job result - + Returns: ttl (int): The time to live for the result """ @@ -1244,8 +1248,7 @@ class Job: call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75) return call_repr - def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, - remove_from_queue: bool = True): + def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its result depends on the value of ttl: @@ -1271,16 +1274,18 @@ class Job: @property def started_job_registry(self): from .registry import StartedJobRegistry - return StartedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + return StartedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) @property def failed_job_registry(self): from .registry import FailedJobRegistry - return FailedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + return FailedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) def get_retry_interval(self) -> int: """Returns the desired retry interval. @@ -1332,10 +1337,9 @@ class Job: """ from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + registry = DeferredJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.add(self, pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection @@ -1348,11 +1352,14 @@ class Job: @property def dependency_ids(self) -> List[bytes]: dependencies = self.connection.smembers(self.dependencies_key) - return [Job.key_for(_id.decode()) - for _id in dependencies] - - def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None, - exclude_job_id: Optional[str] = None) -> bool: + return [Job.key_for(_id.decode()) for _id in dependencies] + + def dependencies_are_met( + self, + parent_job: Optional['Job'] = None, + pipeline: Optional['Pipeline'] = None, + exclude_job_id: Optional[str] = None, + ) -> bool: """Returns a boolean indicating if all of this job's dependencies are `FINISHED` If a pipeline is passed, all dependencies are WATCHed. @@ -1373,8 +1380,7 @@ class Job: connection = pipeline if pipeline is not None else self.connection if pipeline is not None: - connection.watch(*[self.key_for(dependency_id) - for dependency_id in self._dependency_ids]) + connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} @@ -1407,12 +1413,7 @@ class Job: if self.allow_dependency_failures: allowed_statuses.append(JobStatus.FAILED) - return all( - status.decode() in allowed_statuses - for status - in dependencies_statuses - if status - ) + return all(status.decode() in allowed_statuses for status in dependencies_statuses if status) _job_stack = LocalStack() diff --git a/rq/local.py b/rq/local.py index 458cd83..8e94457 100644 --- a/rq/local.py +++ b/rq/local.py @@ -122,6 +122,7 @@ class LocalStack: def _set__ident_func__(self, value): # noqa object.__setattr__(self._local, '__ident_func__', value) + __ident_func__ = property(_get__ident_func__, _set__ident_func__) del _get__ident_func__, _set__ident_func__ @@ -131,6 +132,7 @@ class LocalStack: if rv is None: raise RuntimeError('object unbound') return rv + return LocalProxy(_lookup) def push(self, obj): @@ -223,10 +225,7 @@ class LocalManager: release_local(local) def __repr__(self): - return '<%s storages: %d>' % ( - self.__class__.__name__, - len(self.locals) - ) + return '<%s storages: %d>' % (self.__class__.__name__, len(self.locals)) class LocalProxy: @@ -264,6 +263,7 @@ class LocalProxy: .. versionchanged:: 0.6.1 The class can be instanciated with a callable as well now. """ + __slots__ = ('__local', '__dict__', '__name__') def __init__(self, local, name=None): diff --git a/rq/logutils.py b/rq/logutils.py index 36a404d..33e0949 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -3,12 +3,15 @@ import sys from typing import Union from rq.utils import ColorizingStreamHandler -from rq.defaults import (DEFAULT_LOGGING_FORMAT, - DEFAULT_LOGGING_DATE_FORMAT) +from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT -def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT, - log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'): +def setup_loghandlers( + level: Union[int, str, None] = None, + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + name: str = 'rq.worker', +): """Sets up a log handler. Args: @@ -17,7 +20,7 @@ def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DE date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S'). log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). name (str, optional): The looger name. Defaults to 'rq.worker'. - """ + """ logger = logging.getLogger(name) if not _has_effective_handler(logger): diff --git a/rq/queue.py b/rq/queue.py index 79125ce..bc7f070 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -30,11 +30,27 @@ blue = make_colorizer('darkblue') logger = logging.getLogger("rq.queue") - -class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout", - "result_ttl", "ttl", "failure_ttl", - "description", "job_id", - "at_front", "meta", "retry", "on_success", "on_failure"])): +class EnqueueData( + namedtuple( + 'EnqueueData', + [ + "func", + "args", + "kwargs", + "timeout", + "result_ttl", + "ttl", + "failure_ttl", + "description", + "job_id", + "at_front", + "meta", + "retry", + "on_success", + "on_failure", + ], + ) +): """Helper type to use when calling enqueue_many NOTE: Does not support `depends_on` yet. """ @@ -50,7 +66,9 @@ class Queue: redis_queues_keys: str = 'rq:queues' @classmethod - def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']: + def all( + cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None + ) -> List['Queue']: """Returns an iterable of all Queues. Args: @@ -64,17 +82,22 @@ class Queue: connection = resolve_connection(connection) def to_queue(queue_key): - return cls.from_queue_key(as_text(queue_key), - connection=connection, - job_class=job_class, serializer=serializer) + return cls.from_queue_key( + as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer + ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key] return all_queues @classmethod - def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue': + def from_queue_key( + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, + ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their Redis keys. @@ -94,11 +117,19 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix):] + name = queue_key[len(prefix) :] return cls(name, connection=connection, job_class=job_class, serializer=serializer) - def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None, - is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs): + def __init__( + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + **kwargs, + ): """Initializes a Queue object. Args: @@ -109,7 +140,7 @@ class Queue: If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. - """ + """ self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name @@ -145,7 +176,7 @@ class Queue: def get_redis_server_version(self) -> Tuple[int, int, int]: """Return Redis server version of connection - + Returns: redis_version (Tuple): A tuple with the parsed Redis version (eg: (5,0,0)) """ @@ -184,7 +215,7 @@ class Queue: Returns: script (...): The Lua Script is called. - """ + """ script = """ local prefix = "{0}" local q = KEYS[1] @@ -201,13 +232,17 @@ class Queue: count = count + 1 end return count - """.format(self.job_class.redis_job_namespace_prefix).encode("utf-8") + """.format( + self.job_class.redis_job_namespace_prefix + ).encode( + "utf-8" + ) script = self.connection.register_script(script) return script(keys=[self.key]) def delete(self, delete_jobs: bool = True): """Deletes the queue. - + Args: delete_jobs (bool): If true, removes all the associated messages on the queue first. """ @@ -221,7 +256,7 @@ class Queue: def is_empty(self) -> bool: """Returns whether the current queue is empty. - + Returns: is_empty (bool): Whether the queue is empty """ @@ -242,7 +277,7 @@ class Queue: Returns: job (Optional[Job]): The job if found - """ + """ try: job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: @@ -251,7 +286,7 @@ class Queue: if job.origin == self.name: return job - def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]: + def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]: """Returns the position of a job within the queue Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of @@ -293,11 +328,7 @@ class Queue: end = offset + (length - 1) else: end = length - job_ids = [ - as_text(job_id) - for job_id - in self.connection.lrange(self.key, start, end) - ] + job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") return job_ids @@ -333,18 +364,21 @@ class Queue: def failed_job_registry(self): """Returns this queue's FailedJobRegistry.""" from rq.registry import FailedJobRegistry + return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def started_job_registry(self): """Returns this queue's StartedJobRegistry.""" from rq.registry import StartedJobRegistry + return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def finished_job_registry(self): """Returns this queue's FinishedJobRegistry.""" from rq.registry import FinishedJobRegistry + # TODO: Why was job_class only ommited here before? Was it intentional? return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @@ -352,18 +386,21 @@ class Queue: def deferred_job_registry(self): """Returns this queue's DeferredJobRegistry.""" from rq.registry import DeferredJobRegistry + return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def scheduled_job_registry(self): """Returns this queue's ScheduledJobRegistry.""" from rq.registry import ScheduledJobRegistry + return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def canceled_job_registry(self): """Returns this queue's CanceledJobRegistry.""" from rq.registry import CanceledJobRegistry + return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None): @@ -387,8 +424,7 @@ class Queue: """Removes all "dead" jobs from the queue by cycling through it, while guaranteeing FIFO semantics. """ - COMPACT_QUEUE = '{0}_compact:{1}'.format( - self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa + COMPACT_QUEUE = '{0}_compact:{1}'.format(self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa self.connection.rename(self.key, COMPACT_QUEUE) while True: @@ -414,12 +450,25 @@ class Queue: result = connection.rpush(self.key, job_id) self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") - def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None, - job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None) -> Job: + def create_job( + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, + ) -> Job: """Creates a job based on parameters given Args: @@ -461,12 +510,23 @@ class Queue: raise ValueError('Job ttl must be greater than 0') job = self.job_class.create( - func, args=args, kwargs=kwargs, connection=self.connection, - result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - status=status, description=description, - depends_on=depends_on, timeout=timeout, id=job_id, - origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success, - on_failure=on_failure + func, + args=args, + kwargs=kwargs, + connection=self.connection, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + status=status, + description=description, + depends_on=depends_on, + timeout=timeout, + id=job_id, + origin=self.name, + meta=meta, + serializer=self.serializer, + on_success=on_success, + on_failure=on_failure, ) if retry: @@ -501,10 +561,7 @@ class Queue: # is called from within this method. pipe.watch(job.dependencies_key) - dependencies = job.fetch_dependencies( - watch=True, - pipeline=pipe - ) + dependencies = job.fetch_dependencies(watch=True, pipeline=pipe) pipe.multi() @@ -535,12 +592,25 @@ class Queue: pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job - def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job: + def enqueue_call( + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, + ) -> Job: """Creates a job to represent the delayed function call and enqueues it. It is much like `.enqueue()`, except that it takes the function's args @@ -567,30 +637,49 @@ class Queue: Returns: Job: The enqueued Job - """ + """ job = self.create_job( - func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, - failure_ttl=failure_ttl, description=description, depends_on=depends_on, - job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, - retry=retry, on_success=on_success, on_failure=on_failure + func, + args=args, + kwargs=kwargs, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + meta=meta, + status=JobStatus.QUEUED, + timeout=timeout, + retry=retry, + on_success=on_success, + on_failure=on_failure, ) - job = self.setup_dependencies( - job, - pipeline=pipeline - ) + job = self.setup_dependencies(job, pipeline=pipeline) # If we do not depend on an unfinished job, enqueue the job. if job.get_status(refresh=False) != JobStatus.DEFERRED: return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) return job @staticmethod - def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None, - at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData: + def prepare_data( + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, + ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -614,10 +703,20 @@ class Queue: EnqueueData: The EnqueueData """ return EnqueueData( - func, args, kwargs, timeout, - result_ttl, ttl, failure_ttl, - description, job_id, - at_front, meta, retry, on_success, on_failure + func, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, ) def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]: @@ -635,18 +734,24 @@ class Queue: jobs = [ self.enqueue_job( self.create_job( - job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl, + job_data.func, + args=job_data.args, + kwargs=job_data.kwargs, + result_ttl=job_data.result_ttl, ttl=job_data.ttl, - failure_ttl=job_data.failure_ttl, description=job_data.description, + failure_ttl=job_data.failure_ttl, + description=job_data.description, depends_on=None, - job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED, + job_id=job_data.job_id, + meta=job_data.meta, + status=JobStatus.QUEUED, timeout=job_data.timeout, retry=job_data.retry, on_success=job_data.on_success, - on_failure=job_data.on_failure + on_failure=job_data.on_failure, ), pipeline=pipe, - at_front=job_data.at_front + at_front=job_data.at_front, ) for job_data in job_datas ] @@ -662,7 +767,7 @@ class Queue: Returns: Job: _description_ - """ + """ job.perform() job.set_status(JobStatus.FINISHED) job.save(include_meta=False) @@ -687,8 +792,7 @@ class Queue: kwargs (*kwargs): function kargs """ if not isinstance(f, str) and f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed ' - 'by workers') + raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) @@ -711,9 +815,24 @@ class Queue: args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) - return (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, on_failure, - pipeline, args, kwargs) + return ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job': """Creates a job to represent the delayed function call and enqueues it. @@ -727,16 +846,42 @@ class Queue: Returns: job (Job): The created Job """ - (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, - on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) = Queue.parse_args(f, *args, **kwargs) return self.enqueue_call( - func=f, args=args, kwargs=kwargs, timeout=timeout, - result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - description=description, depends_on=depends_on, job_id=job_id, - at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure, - pipeline=pipeline + func=f, + args=args, + kwargs=kwargs, + timeout=timeout, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + at_front=at_front, + meta=meta, + retry=retry, + on_success=on_success, + on_failure=on_failure, + pipeline=pipeline, ) def enqueue_at(self, datetime: datetime, f, *args, **kwargs): @@ -749,14 +894,41 @@ class Queue: Returns: _type_: _description_ """ - (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, on_failure, - pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) - job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, ttl=ttl, - failure_ttl=failure_ttl, description=description, - depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, - on_success=on_success, on_failure=on_failure) + ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) = Queue.parse_args(f, *args, **kwargs) + job = self.create_job( + f, + status=JobStatus.SCHEDULED, + args=args, + kwargs=kwargs, + timeout=timeout, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + meta=meta, + retry=retry, + on_success=on_success, + on_failure=on_failure, + ) if at_front: job.enqueue_at_front = True return self.schedule_job(job, datetime, pipeline=pipeline) @@ -773,6 +945,7 @@ class Queue: _type_: _description_ """ from .registry import ScheduledJobRegistry + registry = ScheduledJobRegistry(queue=self) pipe = pipeline if pipeline is not None else self.connection.pipeline() @@ -795,8 +968,7 @@ class Queue: Returns: job (Job): The enqueued Job """ - return self.enqueue_at(datetime.now(timezone.utc) + time_delta, - func, *args, **kwargs) + return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution. @@ -838,14 +1010,14 @@ class Queue: return job def run_sync(self, job: 'Job') -> 'Job': - """Run a job synchronously, meaning on the same process the method was called. + """Run a job synchronously, meaning on the same process the method was called. Args: job (Job): The job to run Returns: Job: The job instance - """ + """ with self.connection.pipeline() as pipeline: job.prepare_for_execution('sync', pipeline) @@ -861,7 +1033,9 @@ class Queue: return job - def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None): + def enqueue_dependents( + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + ): """Enqueues all jobs in the given job's dependents set and clears it. When called without a pipeline, this method uses WATCH/MULTI/EXEC. @@ -886,20 +1060,19 @@ class Queue: if pipeline is None: pipe.watch(dependents_key) - dependent_job_ids = {as_text(_id) - for _id in pipe.smembers(dependents_key)} + dependent_job_ids = {as_text(_id) for _id in pipe.smembers(dependents_key)} # There's no dependents if not dependent_job_ids: break jobs_to_enqueue = [ - dependent_job for dependent_job - in self.job_class.fetch_many( - dependent_job_ids, - connection=self.connection, - serializer=self.serializer - ) if dependent_job and dependent_job.dependencies_are_met( + dependent_job + for dependent_job in self.job_class.fetch_many( + dependent_job_ids, connection=self.connection, serializer=self.serializer + ) + if dependent_job + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -914,10 +1087,9 @@ class Queue: for dependent in jobs_to_enqueue: enqueue_at_front = dependent.enqueue_at_front or False - registry = DeferredJobRegistry(dependent.origin, - self.connection, - job_class=self.job_class, - serializer=self.serializer) + registry = DeferredJobRegistry( + dependent.origin, self.connection, job_class=self.job_class, serializer=self.serializer + ) registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: @@ -1000,8 +1172,14 @@ class Queue: return None @classmethod - def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']: + def dequeue_any( + cls, + queues: List['Queue'], + timeout: int, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, + ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1033,10 +1211,7 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, - connection=connection, - job_class=job_class, - serializer=serializer) + queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: diff --git a/rq/registry.py b/rq/registry.py index f581776..509bd87 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -23,11 +23,18 @@ class BaseRegistry: Each job is stored as a key in the registry, scored by expiration time (unix timestamp). """ + job_class = Job key_template = 'rq:registry:{0}' - def __init__(self, name: str = 'default', connection: Optional['Redis'] = None, - job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None): + def __init__( + self, + name: str = 'default', + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + queue: Optional['Queue'] = None, + serializer: Any = None, + ): if queue: self.name = queue.name self.connection = resolve_connection(queue.connection) @@ -46,8 +53,8 @@ class BaseRegistry: def __eq__(self, other): return ( - self.name == other.name and - self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs + self.name == other.name + and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) def __contains__(self, item: Union[str, 'Job']): @@ -134,8 +141,7 @@ class BaseRegistry: _type_: _description_ """ self.cleanup() - return [as_text(job_id) for job_id in - self.connection.zrange(self.key, start, end)] + return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] def get_queue(self): """Returns Queue object associated with this registry.""" @@ -175,8 +181,7 @@ class BaseRegistry: raise InvalidJobOperation with self.connection.pipeline() as pipeline: - queue = Queue(job.origin, connection=self.connection, - job_class=self.job_class, serializer=serializer) + queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer) job.started_at = None job.ended_at = None job._exc_info = '' @@ -195,6 +200,7 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). """ + key_template = 'rq:wip:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -216,9 +222,7 @@ class StartedJobRegistry(BaseRegistry): with self.connection.pipeline() as pipeline: for job_id in job_ids: try: - job = self.job_class.fetch(job_id, - connection=self.connection, - serializer=self.serializer) + job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: continue @@ -246,6 +250,7 @@ class FinishedJobRegistry(BaseRegistry): Registry of jobs that have been completed. Jobs are added to this registry after they have successfully completed for monitoring purposes. """ + key_template = 'rq:finished:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -263,6 +268,7 @@ class FailedJobRegistry(BaseRegistry): """ Registry of containing failed jobs. """ + key_template = 'rq:failed:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -275,8 +281,14 @@ class FailedJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) - def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None, - _save_exc_to_job: bool = False): + def add( + self, + job: 'Job', + ttl=None, + exc_string: str = '', + pipeline: Optional['Pipeline'] = None, + _save_exc_to_job: bool = False, + ): """ Adds a job to a registry with expiry time of now + ttl. `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. @@ -303,6 +315,7 @@ class DeferredJobRegistry(BaseRegistry): """ Registry of deferred jobs (waiting for another job to finish). """ + key_template = 'rq:deferred:{0}' def cleanup(self): @@ -316,6 +329,7 @@ class ScheduledJobRegistry(BaseRegistry): """ Registry of scheduled jobs. """ + key_template = 'rq:scheduled:{0}' def __init__(self, *args, **kwargs): @@ -396,7 +410,7 @@ class ScheduledJobRegistry(BaseRegistry): class CanceledJobRegistry(BaseRegistry): key_template = 'rq:canceled:{0}' - def get_expired_job_ids(self, timestamp: Optional[datetime] = None): + def get_expired_job_ids(self, timestamp: Optional[datetime] = None): raise NotImplementedError def cleanup(self): @@ -412,19 +426,16 @@ def clean_registries(queue: 'Queue'): Args: queue (Queue): The queue to clean """ - registry = FinishedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = FinishedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() - registry = StartedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = StartedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() - registry = FailedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = FailedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() diff --git a/rq/results.py b/rq/results.py index 0df131a..a6dafde 100644 --- a/rq/results.py +++ b/rq/results.py @@ -17,15 +17,22 @@ def get_key(job_id): class Result(object): - class Type(Enum): SUCCESSFUL = 1 FAILED = 2 STOPPED = 3 - def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None, - created_at: Optional[datetime] = None, return_value: Optional[Any] = None, - exc_string: Optional[str] = None, serializer=None): + def __init__( + self, + job_id: str, + type: Type, + connection: Redis, + id: Optional[str] = None, + created_at: Optional[datetime] = None, + return_value: Optional[Any] = None, + exc_string: Optional[str] = None, + serializer=None, + ): self.return_value = return_value self.exc_string = exc_string self.type = type @@ -49,16 +56,26 @@ class Result(object): @classmethod def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None): - result = cls(job_id=job.id, type=type, connection=job.connection, - return_value=return_value, - exc_string=exc_string, serializer=job.serializer) + result = cls( + job_id=job.id, + type=type, + connection=job.connection, + return_value=return_value, + exc_string=exc_string, + serializer=job.serializer, + ) result.save(ttl=ttl, pipeline=pipeline) return result @classmethod def create_failure(cls, job, ttl, exc_string, pipeline=None): - result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection, - exc_string=exc_string, serializer=job.serializer) + result = cls( + job_id=job.id, + type=cls.Type.FAILED, + connection=job.connection, + exc_string=exc_string, + serializer=job.serializer, + ) result.save(ttl=ttl, pipeline=pipeline) return result @@ -70,8 +87,7 @@ class Result(object): results = [] for (result_id, payload) in response: results.append( - cls.restore(job.id, result_id.decode(), payload, - connection=job.connection, serializer=serializer) + cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) ) return results @@ -89,9 +105,7 @@ class Result(object): @classmethod def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result': """Create a Result object from given Redis payload""" - created_at = datetime.fromtimestamp( - int(result_id.split('-')[0]) / 1000, tz=timezone.utc - ) + created_at = datetime.fromtimestamp(int(result_id.split('-')[0]) / 1000, tz=timezone.utc) payload = decode_redis_hash(payload) # data, timestamp = payload # result_data = json.loads(data) @@ -106,11 +120,15 @@ class Result(object): if exc_string: exc_string = zlib.decompress(b64decode(exc_string)).decode() - return Result(job_id, Result.Type(int(payload['type'])), connection=connection, - id=result_id, - created_at=created_at, - return_value=return_value, - exc_string=exc_string) + return Result( + job_id, + Result.Type(int(payload['type'])), + connection=connection, + id=result_id, + created_at=created_at, + return_value=return_value, + exc_string=exc_string, + ) @classmethod def fetch(cls, job: Job, serializer=None) -> Optional['Result']: @@ -134,8 +152,7 @@ class Result(object): return None result_id, payload = response[0] - return cls.restore(job.id, result_id.decode(), payload, - connection=job.connection, serializer=serializer) + return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) @classmethod def get_key(cls, job_id): diff --git a/rq/scheduler.py b/rq/scheduler.py index de8f26e..da59b0d 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -9,8 +9,7 @@ from multiprocessing import Process from redis import SSLConnection, UnixDomainSocketConnection -from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, - DEFAULT_SCHEDULER_FALLBACK_PERIOD) +from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD from .job import Job from .logutils import setup_loghandlers from .queue import Queue @@ -35,9 +34,16 @@ class RQScheduler: Status = SchedulerStatus - def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, serializer=None): + def __init__( + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, + ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() self._scheduled_job_registries = [] @@ -59,9 +65,7 @@ class RQScheduler: # the key is necessary. # `path` is not left in the dictionary as that keyword argument is # not expected by `redis.client.Redis` and would raise an exception. - self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( - 'path' - ) + self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path') self.serializer = resolve_serializer(serializer) self._connection = None @@ -158,9 +162,7 @@ class RQScheduler: queue = Queue(registry.name, connection=self.connection, serializer=self.serializer) with self.connection.pipeline() as pipeline: - jobs = Job.fetch_many( - job_ids, connection=self.connection, serializer=self.serializer - ) + jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) @@ -181,8 +183,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", - ", ".join(self.acquired_locks)) + self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -194,8 +195,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", - ','.join(self._queue_names)) + self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -231,15 +231,11 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", - ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa - scheduler.log.error( - 'Scheduler [PID %s] raised an exception.\n%s', - os.getpid(), traceback.format_exc() - ) + scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) diff --git a/rq/serializers.py b/rq/serializers.py index 9e63bc7..b9b7d9c 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -11,7 +11,7 @@ class DefaultSerializer: loads = pickle.loads -class JSONSerializer(): +class JSONSerializer: @staticmethod def dumps(*args, **kwargs): return json.dumps(*args, **kwargs).encode('utf-8') @@ -29,7 +29,7 @@ def resolve_serializer(serializer=None): Args: serializer (Callable): The serializer to resolve. - + Returns: serializer (Callable): An object that implements the SerializerProtocol """ diff --git a/rq/timeouts.py b/rq/timeouts.py index c9f1e44..a1401c5 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -5,6 +5,7 @@ import threading class BaseTimeoutException(Exception): """Base exception for timeouts.""" + pass @@ -12,6 +13,7 @@ class JobTimeoutException(BaseTimeoutException): """Raised when a job takes longer to complete than the allowed maximum timeout value. """ + pass @@ -19,6 +21,7 @@ class HorseMonitorTimeoutException(BaseTimeoutException): """Raised when waiting for a horse exiting takes longer than the maximum timeout value. """ + pass @@ -56,10 +59,8 @@ class BaseDeathPenalty: class UnixSignalDeathPenalty(BaseDeathPenalty): - def handle_death_penalty(self, signum, frame): - raise self._exception('Task exceeded maximum timeout value ' - '({0} seconds)'.format(self._timeout)) + raise self._exception('Task exceeded maximum timeout value ' '({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises @@ -85,15 +86,12 @@ class TimerDeathPenalty(BaseDeathPenalty): # Monkey-patch exception with the message ahead of time # since PyThreadState_SetAsyncExc can only take a class def init_with_message(self, *args, **kwargs): # noqa - super(exception, self).__init__( - "Task exceeded maximum timeout value ({0} seconds)".format(timeout) - ) + super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout)) self._exception.__init__ = init_with_message def new_timer(self): - """Returns a new timer since timers can only be used once. - """ + """Returns a new timer since timers can only be used once.""" return threading.Timer(self._timeout, self.handle_death_penalty) def handle_death_penalty(self): @@ -111,13 +109,11 @@ class TimerDeathPenalty(BaseDeathPenalty): raise SystemError("PyThreadState_SetAsyncExc failed") def setup_death_penalty(self): - """Starts the timer. - """ + """Starts the timer.""" self._timer = self.new_timer() self._timer.start() def cancel_death_penalty(self): - """Cancels the timer. - """ + """Cancels the timer.""" self._timer.cancel() self._timer = None diff --git a/rq/utils.py b/rq/utils.py index 8993487..1c3fa01 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -41,10 +41,8 @@ class _Colorizer: self.codes["blink"] = esc + "05m" self.codes["overline"] = esc + "06m" - dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", - "purple", "teal", "lightgray"] - light_colors = ["darkgray", "red", "green", "yellow", "blue", - "fuchsia", "turquoise", "white"] + dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] + light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] x = 30 for d, l in zip(dark_colors, light_colors): @@ -90,8 +88,10 @@ def make_colorizer(color: str): >>> # You can then use: >>> print("It's either " + green('OK') + ' or ' + red('Oops')) """ + def inner(text): return colorizer.colorize(color, text) + return inner @@ -134,7 +134,7 @@ def compact(lst: List[Any]) -> List[Any]: Returns: object (list): The list without None values - """ + """ return [item for item in lst if item is not None] @@ -149,7 +149,7 @@ def as_text(v: Union[bytes, str]) -> Optional[str]: Returns: value (Optional[str]): Either the decoded string or None - """ + """ if v is None: return None elif isinstance(v, bytes): @@ -169,7 +169,7 @@ def decode_redis_hash(h) -> Dict[str, Any]: Returns: Dict[str, Any]: The decoded Redis data (Dictionary) - """ + """ return dict((as_text(k), h[k]) for k in h) @@ -230,8 +230,7 @@ def utcnow(): def now(): - """Return now in UTC - """ + """Return now in UTC""" return datetime.datetime.now(datetime.timezone.utc) @@ -356,8 +355,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]: def parse_timeout(timeout: Any): - """Transfer all kinds of timeout format to an integer representing seconds - """ + """Transfer all kinds of timeout format to an integer representing seconds""" if not isinstance(timeout, numbers.Integral) and timeout is not None: try: timeout = int(timeout) @@ -367,9 +365,11 @@ def parse_timeout(timeout: Any): try: timeout = int(digit) * unit_second[unit] except (ValueError, KeyError): - raise TimeoutFormatError('Timeout must be an integer or a string representing an integer, or ' - 'a string with format: digits + unit, unit can be "d", "h", "m", "s", ' - 'such as "1h", "23m".') + raise TimeoutFormatError( + 'Timeout must be an integer or a string representing an integer, or ' + 'a string with format: digits + unit, unit can be "d", "h", "m", "s", ' + 'such as "1h", "23m".' + ) return timeout @@ -381,7 +381,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: Args: connection (Redis): The Redis connection. - + Returns: version (Tuple[int, int, int]): A tuple representing the semantic versioning format (eg. (5, 0, 9)) """ @@ -391,7 +391,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: setattr( connection, "__rq_redis_server_version", - tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]) + tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]), ) return getattr(connection, "__rq_redis_server_version") except ResponseError: # fakeredis doesn't implement Redis' INFO command @@ -422,7 +422,7 @@ def split_list(a_list: List[Any], segment_size: int): list: The splitted listed """ for i in range(0, len(a_list), segment_size): - yield a_list[i:i + segment_size] + yield a_list[i : i + segment_size] def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: @@ -431,7 +431,7 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: Args: data (str): The data to truncate max_length (Optional[int], optional): The max length. Defaults to None. - + Returns: truncated (str): The truncated string """ @@ -440,8 +440,9 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: return (data[:max_length] + '...') if len(data) > max_length else data -def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], - max_length: Optional[int] = None) -> Optional[str]: +def get_call_string( + func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None +) -> Optional[str]: """ Returns a string representation of the call, formatted as a regular Python function invocation statement. If max_length is not None, truncate diff --git a/rq/worker.py b/rq/worker.py index 35355c7..06a707e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .defaults import ( + CALLBACK_TIMEOUT, + DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, + DEFAULT_LOGGING_DATE_FORMAT, +) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -46,8 +52,7 @@ from .results import Result from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, get_version, - make_colorizer, utcformat, utcnow, utcparse, compact) +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer @@ -55,9 +60,11 @@ from .serializers import resolve_serializer try: from setproctitle import setproctitle as setprocname except ImportError: + def setprocname(*args, **kwargs): # noqa pass + green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') @@ -69,10 +76,9 @@ class StopRequested(Exception): pass - -_signames = dict((getattr(signal, signame), signame) - for signame in dir(signal) - if signame.startswith('SIG') and '_' not in signame) +_signames = dict( + (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame +) def signal_name(signum): @@ -118,7 +124,7 @@ class Worker: job_class: Optional[Type['Job']] = None, queue_class: Optional[Type['Queue']] = None, queue: Optional['Queue'] = None, - serializer=None + serializer=None, ) -> List['Worker']: """Returns an iterable of all Workers. @@ -131,11 +137,12 @@ class Worker: connection = get_current_connection() worker_keys = get_keys(queue=queue, connection=connection) - workers = [cls.find_by_key(as_text(key), - connection=connection, - job_class=job_class, - queue_class=queue_class, serializer=serializer) - for key in worker_keys] + workers = [ + cls.find_by_key( + as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer + ) + for key in worker_keys + ] return compact(workers) @classmethod @@ -148,9 +155,8 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys - """ - return [as_text(key) - for key in get_keys(queue=queue, connection=connection)] + """ + return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): @@ -166,8 +172,14 @@ class Worker: return len(get_keys(queue=queue, connection=connection)) @classmethod - def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None, - queue_class: Type['Queue'] = None, serializer=None): + def find_by_key( + cls, + worker_key: str, + connection: Optional['Redis'] = None, + job_class: Type['Job'] = None, + queue_class: Type['Queue'] = None, + serializer=None, + ): """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. @@ -182,22 +194,39 @@ class Worker: connection.srem(cls.redis_workers_keys, worker_key) return None - name = worker_key[len(prefix):] - worker = cls([], name, connection=connection, job_class=job_class, - queue_class=queue_class, prepare_for_work=False, serializer=serializer) + name = worker_key[len(prefix) :] + worker = cls( + [], + name, + connection=connection, + job_class=job_class, + queue_class=queue_class, + prepare_for_work=False, + serializer=serializer, + ) worker.refresh() return worker - def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, - connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None, - default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None, - queue_class=None, log_job_description: bool = True, - job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, - disable_default_exception_handler: bool = False, - prepare_for_work: bool = True, serializer=None): # noqa - + def __init__( + self, + queues, + name: Optional[str] = None, + default_result_ttl=DEFAULT_RESULT_TTL, + connection: Optional['Redis'] = None, + exc_handler=None, + exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, + job_class: Type['Job'] = None, + queue_class=None, + log_job_description: bool = True, + job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, + disable_default_exception_handler: bool = False, + prepare_for_work: bool = True, + serializer=None, + ): # noqa + connection = self._set_connection(connection, default_worker_ttl) self.connection = connection self.redis_server_version = None @@ -208,11 +237,12 @@ class Worker: self.python_version = sys.version self.serializer = resolve_serializer(serializer) - queues = [self.queue_class(name=q, - connection=connection, - job_class=self.job_class, serializer=self.serializer) - if isinstance(q, str) else q - for q in ensure_list(queues)] + queues = [ + self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) + if isinstance(q, str) + else q + for q in ensure_list(queues) + ] self.name: str = name or uuid4().hex self.queues = queues @@ -250,24 +280,14 @@ class Worker: try: connection.client_setname(self.name) except redis.exceptions.ResponseError: - warnings.warn( - 'CLIENT SETNAME command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: - client_adresses = [ - client['addr'] - for client in connection.client_list() - if client['name'] == self.name - ] + client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name] if len(client_adresses) > 0: self.ip_address = client_adresses[0] else: - warnings.warn( - 'CLIENT LIST command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: self.hostname = None @@ -361,8 +381,7 @@ class Worker: def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s', self.name) - if self.connection.exists(self.key) and \ - not self.connection.hexists(self.key, 'death'): + if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'): msg = 'There exists an active worker named {0!r} already' raise ValueError(msg.format(self.name)) key = self.key @@ -436,10 +455,7 @@ class Worker: def _set_state(self, state): """Raise a DeprecationWarning if ``worker.state = X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.set_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning) self.set_state(state) def get_state(self): @@ -447,10 +463,7 @@ class Worker: def _get_state(self): """Raise a DeprecationWarning if ``worker.state == X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.get_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning) return self.get_state() state = property(_get_state, _set_state) @@ -516,8 +529,7 @@ class Worker: return pid, stat def request_force_stop(self, signum, frame): - """Terminates the application (cold shutdown). - """ + """Terminates the application (cold shutdown).""" self.log.warning('Cold shut down') # Take down the horse with the worker @@ -547,8 +559,7 @@ class Worker: if self.get_state() == WorkerStatus.BUSY: self._stop_requested = True self.set_shutdown_requested_date() - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') + self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') if self.scheduler: self.stop_scheduler() else: @@ -614,8 +625,15 @@ class Worker: def reorder_queues(self, reference_queue): pass - def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False): + def work( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + max_jobs=None, + with_scheduler: bool = False, + ): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -635,8 +653,13 @@ class Worker: if with_scheduler: self.scheduler = RQScheduler( - self.queues, connection=self.connection, logging_level=logging_level, - date_format=date_format, log_format=log_format, serializer=self.serializer) + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) self.scheduler.acquire_locks() # If lock is acquired, start scheduler if self.scheduler.acquired_locks: @@ -676,10 +699,7 @@ class Worker: completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info( - "Worker %s: finished executing %d jobs, quitting", - self.key, completed_jobs - ) + self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) break except redis.exceptions.TimeoutError: @@ -694,10 +714,7 @@ class Worker: raise except: # noqa - self.log.error( - 'Worker %s: found an unhandled exception, quitting...', - self.key, exc_info=True - ) + self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: if not self.is_horse: @@ -736,19 +753,20 @@ class Worker: self.run_maintenance_tasks() self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") - result = self.queue_class.dequeue_any(self._ordered_queues, timeout, - connection=self.connection, - job_class=self.job_class, - serializer=self.serializer) + result = self.queue_class.dequeue_any( + self._ordered_queues, + timeout, + connection=self.connection, + job_class=self.job_class, + serializer=self.serializer, + ) self.log.debug(f"Dequeued job {result[1]} from {result[0]}") if result is not None: job, queue = result job.redis_server_version = self.get_redis_server_version() if self.log_job_description: - self.log.info( - '%s: %s (%s)', green(queue.name), - blue(job.description), job.id) + self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) else: self.log.info('%s: %s', green(queue.name), job.id) @@ -756,8 +774,9 @@ class Worker: except DequeueTimeout: pass except redis.exceptions.ConnectionError as conn_err: - self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', - conn_err, connection_wait_time) + self.log.error( + 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time + ) time.sleep(connection_wait_time) connection_wait_time *= self.exponential_backoff_factor connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) @@ -782,18 +801,44 @@ class Worker: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) - self.log.debug('Sent heartbeat to prevent worker timeout. ' - 'Next one should arrive within %s seconds.', timeout) + self.log.debug( + 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout + ) def refresh(self): data = self.connection.hmget( - self.key, 'queues', 'state', 'current_job', 'last_heartbeat', - 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time', - 'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version', + self.key, + 'queues', + 'state', + 'current_job', + 'last_heartbeat', + 'birth', + 'failed_job_count', + 'successful_job_count', + 'total_working_time', + 'current_job_working_time', + 'hostname', + 'ip_address', + 'pid', + 'version', + 'python_version', ) - (queues, state, job_id, last_heartbeat, birth, failed_job_count, - successful_job_count, total_working_time, current_job_working_time, - hostname, ip_address, pid, version, python_version) = data + ( + queues, + state, + job_id, + last_heartbeat, + birth, + failed_job_count, + successful_job_count, + total_working_time, + current_job_working_time, + hostname, + ip_address, + pid, + version, + python_version, + ) = data queues = as_text(queues) self.hostname = as_text(hostname) self.ip_address = as_text(ip_address) @@ -820,10 +865,12 @@ class Worker: self.current_job_working_time = float(as_text(current_job_working_time)) if queues: - self.queues = [self.queue_class(queue, - connection=self.connection, - job_class=self.job_class, serializer=self.serializer) - for queue in queues.split(',')] + self.queues = [ + self.queue_class( + queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer + ) + for queue in queues.split(',') + ] def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection @@ -834,12 +881,10 @@ class Worker: connection.hincrby(self.key, 'successful_job_count', 1) def increment_total_working_time(self, job_execution_time, pipeline): - pipeline.hincrbyfloat(self.key, 'total_working_time', - job_execution_time.total_seconds()) + pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) def fork_work_horse(self, job: 'Job', queue: 'Queue'): - """Spawns a work horse to perform the actual work and passes it a job. - """ + """Spawns a work horse to perform the actual work and passes it a job.""" child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -909,24 +954,20 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure( - job, queue=queue, - exc_string="Job stopped by user, work-horse terminated." - ) + self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning(( - 'Moving job to FailedJobRegistry ' - '(work-horse terminated unexpectedly; waitpid returned {})' - ).format(ret_val)) + self.log.warning( + ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( + ret_val + ) + ) self.handle_job_failure( - job, queue=queue, - exc_string="Work-horse was terminated unexpectedly " - "(waitpid returned %s)" % ret_val + job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1009,8 +1050,7 @@ class Worker: msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, - exc_string=''): + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): """ Handles the failure or an executing job by: 1. Setting the job status to failed @@ -1023,10 +1063,7 @@ class Worker: with self.connection.pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry( - job.origin, - self.connection, - job_class=self.job_class, - serializer=self.serializer + job.origin, self.connection, job_class=self.job_class, serializer=self.serializer ) # check whether a job was stopped intentionally and set the job @@ -1045,14 +1082,19 @@ class Worker: started_job_registry.remove(job, pipeline=pipeline) if not self.disable_default_exception_handler and not retry: - failed_job_registry = FailedJobRegistry(job.origin, job.connection, - job_class=self.job_class, serializer=job.serializer) + failed_job_registry = FailedJobRegistry( + job.origin, job.connection, job_class=self.job_class, serializer=job.serializer + ) # Exception should be saved in job hash if server # doesn't support Redis streams _save_exc_to_job = not self.supports_redis_streams - failed_job_registry.add(job, ttl=job.failure_ttl, - exc_string=exc_string, pipeline=pipeline, - _save_exc_to_job=_save_exc_to_job) + failed_job_registry.add( + job, + ttl=job.failure_ttl, + exc_string=exc_string, + pipeline=pipeline, + _save_exc_to_job=_save_exc_to_job, + ) if self.supports_redis_streams: Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) with suppress(redis.exceptions.ConnectionError): @@ -1061,9 +1103,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_failed_job_count(pipeline) if job.started_at and job.ended_at: - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) if retry: job.retry(queue, pipeline) @@ -1099,9 +1139,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline) - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline # type: ignore - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: @@ -1111,16 +1149,15 @@ class Worker: # doesn't support Redis streams include_result = not self.supports_redis_streams # Don't clobber user's meta dictionary! - job.save(pipeline=pipeline, include_meta=False, - include_result=include_result) + job.save(pipeline=pipeline, include_meta=False, include_result=include_result) if self.supports_redis_streams: - Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result, - ttl=result_ttl, pipeline=pipeline) + Result.create( + job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline + ) finished_job_registry = queue.finished_job_registry finished_job_registry.add(job, result_ttl, pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) + job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) self.log.debug('Removing job %s from StartedJobRegistry', job.id) started_job_registry.remove(job, pipeline=pipeline) @@ -1172,9 +1209,7 @@ class Worker: if job.success_callback: self.execute_success_callback(job, rv) - self.handle_job_success(job=job, - queue=queue, - started_job_registry=started_job_registry) + self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA self.log.debug(f"Job {job.id} raised an exception.") job.ended_at = utcnow() @@ -1185,15 +1220,13 @@ class Worker: try: self.execute_failure_callback(job) except: # noqa - self.log.error( - 'Worker %s: error while executing failure callback', - self.key, exc_info=True - ) + self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, - started_job_registry=started_job_registry) + self.handle_job_failure( + job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry + ) self.handle_exception(job, *exc_info) return False @@ -1237,10 +1270,9 @@ class Worker: # the properties below should be safe however extra.update({'queue': job.origin, 'job_id': job.id}) - + # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, - extra=extra) + self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1322,6 +1354,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ + imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] @@ -1335,10 +1368,7 @@ class HerokuWorker(Worker): def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: - self.log.info( - 'Worker %s: warm shut down requested, sending horse SIGRTMIN signal', - self.key - ) + self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key) self.kill_horse(sig=signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') @@ -1348,8 +1378,9 @@ class HerokuWorker(Worker): self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately') self.request_force_stop_sigrtmin(signum, frame) else: - self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', - self.imminent_shutdown_delay) + self.log.warning( + 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay + ) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) @@ -1367,7 +1398,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] class RandomWorker(Worker):