From b639ee640602586e850abec8450370b1e9684427 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 8 Apr 2023 18:55:28 +0700 Subject: [PATCH] Cleaned up cli.py (#1876) --- rq/cli/cli.py | 75 +++++++-------------------------- rq/cli/helpers.py | 103 ++++++++++++++++++++++++++++++++++++---------- 2 files changed, 98 insertions(+), 80 deletions(-) diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 27058e8..fbfe2bf 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -2,7 +2,6 @@ RQ command line tool """ -from functools import update_wrapper import os import sys import warnings @@ -18,22 +17,18 @@ from rq.cli.helpers import ( show_both, show_queues, show_workers, - CliConfig, parse_function_args, parse_schedule, + pass_cli_config, ) from rq.contrib.legacy import cleanup_ghosts from rq.defaults import ( - DEFAULT_CONNECTION_CLASS, - DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, - DEFAULT_WORKER_CLASS, DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, - DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_MAINTENANCE_TASK_INTERVAL, ) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries @@ -45,50 +40,6 @@ from rq.job import JobStatus blue = make_colorizer('darkblue') -# Disable the warning that Click displays (as of Click version 5.0) when users -# use unicode_literals in Python 2. -# See http://click.pocoo.org/dev/python3/#unicode-literals for more details. -click.disable_unicode_literals_warning = True - - -shared_options = [ - click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'), - click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'), - click.option( - '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use' - ), - click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'), - click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'), - click.option( - '--connection-class', - envvar='RQ_CONNECTION_CLASS', - default=DEFAULT_CONNECTION_CLASS, - help='Redis client class to use', - ), - click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True), - click.option( - '--serializer', - '-S', - default=DEFAULT_SERIALIZER_CLASS, - help='Path to serializer, defaults to rq.serializers.DefaultSerializer', - ), -] - - -def pass_cli_config(func): - # add all the shared options to the command - for option in shared_options: - func = option(func) - - # pass the cli config object into the command - def wrapper(*args, **kwargs): - ctx = click.get_current_context() - cli_config = CliConfig(**kwargs) - return ctx.invoke(func, cli_config, *args[1:], **kwargs) - - return update_wrapper(wrapper, func) - - @click.group() @click.version_option(version) def main(): @@ -105,8 +56,10 @@ def empty(cli_config, all, queues, serializer, **options): if all: queues = cli_config.queue_class.all( - connection=cli_config.connection, job_class=cli_config.job_class, - death_penalty_class=cli_config.death_penalty_class, serializer=serializer + connection=cli_config.connection, + job_class=cli_config.job_class, + death_penalty_class=cli_config.death_penalty_class, + serializer=serializer, ) else: queues = [ @@ -206,7 +159,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, '--maintenance-interval', type=int, default=DEFAULT_MAINTENANCE_TASK_INTERVAL, - help='Maintenance task interval (in seconds) to be used' + help='Maintenance task interval (in seconds) to be used', ) @click.option( '--job-monitoring-interval', @@ -227,7 +180,9 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') -@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues') +@click.option( + '--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues' +) @click.argument('queues', nargs=-1) @pass_cli_config def worker( @@ -274,12 +229,14 @@ def worker( worker_name = cli_config.worker_class.__qualname__ if worker_name in ["RoundRobinWorker", "RandomWorker"]: strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" - msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy." + msg = f"WARNING: {worker_name} is deprecated. Use `--dequeue-strategy {strategy_alternative}` instead." warnings.warn(msg, DeprecationWarning) click.secho(msg, fg='yellow') if dequeue_strategy not in ("default", "random", "round_robin"): - click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red') + click.secho( + "ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red' + ) sys.exit(1) setup_loghandlers_from_args(verbose, quiet, date_format, log_format) @@ -313,7 +270,7 @@ def worker( exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer + serializer=serializer, ) # Should we configure Sentry? @@ -335,7 +292,7 @@ def worker( max_jobs=max_jobs, max_idle_time=max_idle_time, with_scheduler=with_scheduler, - dequeue_strategy=dequeue_strategy + dequeue_strategy=dequeue_strategy, ) except ConnectionError as e: print(e) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 0721198..bea2c37 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -2,7 +2,8 @@ import sys import importlib import time import os -from functools import partial + +from functools import partial, update_wrapper from enum import Enum from datetime import datetime, timezone, timedelta @@ -13,8 +14,15 @@ from shutil import get_terminal_size import click from redis import Redis from redis.sentinel import Sentinel -from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, \ - DEFAULT_DEATH_PENALTY_CLASS + +from rq.defaults import ( + DEFAULT_CONNECTION_CLASS, + DEFAULT_DEATH_PENALTY_CLASS, + DEFAULT_JOB_CLASS, + DEFAULT_QUEUE_CLASS, + DEFAULT_WORKER_CLASS, + DEFAULT_SERIALIZER_CLASS, +) from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout from rq.worker import WorkerStatus @@ -42,7 +50,7 @@ def get_redis_from_config(settings, connection_class=Redis): elif settings.get('SENTINEL') is not None: instances = settings['SENTINEL'].get('INSTANCES', [('localhost', 26379)]) master_name = settings['SENTINEL'].get('MASTER_NAME', 'mymaster') - + connection_kwargs = { 'db': settings['SENTINEL'].get('DB', 0), 'username': settings['SENTINEL'].get('USERNAME', None), @@ -52,10 +60,8 @@ def get_redis_from_config(settings, connection_class=Redis): } connection_kwargs.update(settings['SENTINEL'].get('CONNECTION_KWARGS', {})) sentinel_kwargs = settings['SENTINEL'].get('SENTINEL_KWARGS', {}) - - sn = Sentinel( - instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs - ) + + sn = Sentinel(instances, sentinel_kwargs=sentinel_kwargs, **connection_kwargs) return sn.master_for(master_name) ssl = settings.get('REDIS_SSL', False) @@ -124,13 +130,22 @@ def show_queues(queues, raw, by_queue, queue_class, worker_class): count = counts[q] if not raw: chart = green('|' + '█' * int(ratio * count)) - line = '%-12s %s %d, %d executing, %d finished, %d failed' \ - % (q.name, chart, count, q.started_job_registry.count, \ - q.finished_job_registry.count, q.failed_job_registry.count) + line = '%-12s %s %d, %d executing, %d finished, %d failed' % ( + q.name, + chart, + count, + q.started_job_registry.count, + q.finished_job_registry.count, + q.failed_job_registry.count, + ) else: - line = 'queue %s %d, %d executing, %d finished, %d failed' \ - % (q.name, count, q.started_job_registry.count, \ - q.finished_job_registry.count, q.failed_job_registry.count) + line = 'queue %s %d, %d executing, %d finished, %d failed' % ( + q.name, + count, + q.started_job_registry.count, + q.finished_job_registry.count, + q.failed_job_registry.count, + ) click.echo(line) num_jobs += count @@ -155,14 +170,22 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): queue_names = ', '.join(worker.queue_names()) name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid) if not raw: - line = '%s: %s %s. jobs: %d finished, %d failed' \ - % (name, state_symbol(worker.get_state()), queue_names, \ - worker.successful_job_count, worker.failed_job_count) + line = '%s: %s %s. jobs: %d finished, %d failed' % ( + name, + state_symbol(worker.get_state()), + queue_names, + worker.successful_job_count, + worker.failed_job_count, + ) click.echo(line) else: - line = 'worker %s %s %s. jobs: %d finished, %d failed' \ - % (name, worker.get_state(), queue_names,\ - worker.successful_job_count, worker.failed_job_count) + line = 'worker %s %s %s. jobs: %d finished, %d failed' % ( + name, + worker.get_state(), + queue_names, + worker.successful_job_count, + worker.failed_job_count, + ) click.echo(line) else: @@ -318,7 +341,7 @@ class CliConfig: connection_class=DEFAULT_CONNECTION_CLASS, path=None, *args, - **kwargs + **kwargs, ): self._connection = None self.url = url @@ -363,3 +386,41 @@ class CliConfig: else: self._connection = get_redis_from_config(os.environ, self.connection_class) return self._connection + + +shared_options = [ + click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'), + click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'), + click.option( + '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use' + ), + click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'), + click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'), + click.option( + '--connection-class', + envvar='RQ_CONNECTION_CLASS', + default=DEFAULT_CONNECTION_CLASS, + help='Redis client class to use', + ), + click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True), + click.option( + '--serializer', + '-S', + default=DEFAULT_SERIALIZER_CLASS, + help='Path to serializer, defaults to rq.serializers.DefaultSerializer', + ), +] + + +def pass_cli_config(func): + # add all the shared options to the command + for option in shared_options: + func = option(func) + + # pass the cli config object into the command + def wrapper(*args, **kwargs): + ctx = click.get_current_context() + cli_config = CliConfig(**kwargs) + return ctx.invoke(func, cli_config, *args[1:], **kwargs) + + return update_wrapper(wrapper, func)