diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index ea767a7..c99348d 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -25,7 +25,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 + pip install flake8 black - name: Lint with flake8 run: | @@ -33,3 +33,7 @@ jobs: flake8 . --select=E9,F63,F7,F82 --show-source # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide flake8 . --exit-zero --max-complexity=5 + + - name: Lint with black + run: | + black -S -l 120 rq/ diff --git a/README.md b/README.md index 40d314b..5fe80cd 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,8 @@ RQ requires Redis >= 3.0.0. [![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) [![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq) +[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) + Full documentation can be found [here][d]. diff --git a/rq/__init__.py b/rq/__init__.py index abca97c..d5db681 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,7 +1,6 @@ # flake8: noqa -from .connections import (Connection, get_current_connection, pop_connection, - push_connection, use_connection) +from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection from .job import cancel_job, get_current_job, requeue_job, Retry from .queue import Queue from .version import VERSION diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 75c5974..f781010 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -10,22 +10,34 @@ import click from redis.exceptions import ConnectionError from rq import Connection, Retry, __version__ as version -from rq.cli.helpers import (read_config_file, refresh, - setup_loghandlers_from_args, - show_both, show_queues, show_workers, CliConfig, parse_function_args, - parse_schedule) +from rq.cli.helpers import ( + read_config_file, + refresh, + setup_loghandlers_from_args, + show_both, + show_queues, + show_workers, + CliConfig, + parse_function_args, + parse_schedule, +) from rq.contrib.legacy import cleanup_ghosts -from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, - DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, - DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, - DEFAULT_SERIALIZER_CLASS) +from rq.defaults import ( + DEFAULT_CONNECTION_CLASS, + DEFAULT_JOB_CLASS, + DEFAULT_QUEUE_CLASS, + DEFAULT_WORKER_CLASS, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, + DEFAULT_LOGGING_DATE_FORMAT, + DEFAULT_SERIALIZER_CLASS, +) from rq.exceptions import InvalidJobOperationError from rq.registry import FailedJobRegistry, clean_registries from rq.utils import import_attribute, get_call_string, make_colorizer -from rq.suspension import (suspend as connection_suspend, - resume as connection_resume, is_suspended) +from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended from rq.worker_registration import clean_worker_registry from rq.job import JobStatus @@ -39,35 +51,26 @@ click.disable_unicode_literals_warning = True shared_options = [ - click.option('--url', '-u', - envvar='RQ_REDIS_URL', - help='URL describing Redis connection details.'), - click.option('--config', '-c', - envvar='RQ_CONFIG', - help='Module containing RQ settings.'), - click.option('--worker-class', '-w', - envvar='RQ_WORKER_CLASS', - default=DEFAULT_WORKER_CLASS, - help='RQ Worker class to use'), - click.option('--job-class', '-j', - envvar='RQ_JOB_CLASS', - default=DEFAULT_JOB_CLASS, - help='RQ Job class to use'), - click.option('--queue-class', - envvar='RQ_QUEUE_CLASS', - default=DEFAULT_QUEUE_CLASS, - help='RQ Queue class to use'), - click.option('--connection-class', - envvar='RQ_CONNECTION_CLASS', - default=DEFAULT_CONNECTION_CLASS, - help='Redis client class to use'), - click.option('--path', '-P', - default=['.'], - help='Specify the import path.', - multiple=True), - click.option('--serializer', '-S', - default=DEFAULT_SERIALIZER_CLASS, - help='Path to serializer, defaults to rq.serializers.DefaultSerializer') + click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'), + click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'), + click.option( + '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use' + ), + click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'), + click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'), + click.option( + '--connection-class', + envvar='RQ_CONNECTION_CLASS', + default=DEFAULT_CONNECTION_CLASS, + help='Redis client class to use', + ), + click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True), + click.option( + '--serializer', + '-S', + default=DEFAULT_SERIALIZER_CLASS, + help='Path to serializer, defaults to rq.serializers.DefaultSerializer', + ), ] @@ -100,15 +103,16 @@ def empty(cli_config, all, queues, serializer, **options): """Empty given queues.""" if all: - queues = cli_config.queue_class.all(connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) + queues = cli_config.queue_class.all( + connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) else: - queues = [cli_config.queue_class(queue, - connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) - for queue in queues] + queues = [ + cli_config.queue_class( + queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) + for queue in queues + ] if not queues: click.echo('Nothing to do') @@ -127,10 +131,9 @@ def empty(cli_config, all, queues, serializer, **options): def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): """Requeue failed jobs.""" - failed_job_registry = FailedJobRegistry(queue, - connection=cli_config.connection, - job_class=job_class, - serializer=serializer) + failed_job_registry = FailedJobRegistry( + queue, connection=cli_config.connection, job_class=job_class, serializer=serializer + ) if all: job_ids = failed_job_registry.get_job_ids() @@ -159,8 +162,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') @click.argument('queues', nargs=-1) @pass_cli_config -def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, - **options): +def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, **options): """RQ command-line monitor.""" if only_queues: @@ -182,8 +184,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, clean_registries(queue) clean_worker_registry(queue) - refresh(interval, func, qs, raw, by_queue, - cli_config.queue_class, cli_config.worker_class) + refresh(interval, func, qs, raw, by_queue, cli_config.queue_class, cli_config.worker_class) except ConnectionError as e: click.echo(e) sys.exit(1) @@ -200,7 +201,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--name', '-n', help='Specify a different name') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') @click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') -@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used') +@click.option( + '--job-monitoring-interval', + type=int, + default=DEFAULT_JOB_MONITORING_INTERVAL, + help='Default job monitoring interval to be used', +) @click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') @click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--quiet', '-q', is_flag=True, help='Show less output') @@ -215,11 +221,31 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.argument('queues', nargs=-1) @pass_cli_config -def worker(cli_config, burst, logging_level, name, results_ttl, - worker_ttl, job_monitoring_interval, disable_job_desc_logging, - verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn, - exception_handler, pid, disable_default_exception_handler, max_jobs, - with_scheduler, queues, log_format, date_format, serializer, **options): +def worker( + cli_config, + burst, + logging_level, + name, + results_ttl, + worker_ttl, + job_monitoring_interval, + disable_job_desc_logging, + verbose, + quiet, + sentry_ca_certs, + sentry_debug, + sentry_dsn, + exception_handler, + pid, + disable_default_exception_handler, + max_jobs, + with_scheduler, + queues, + log_format, + date_format, + serializer, + **options +): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} # Worker specific default arguments @@ -245,38 +271,46 @@ def worker(cli_config, burst, logging_level, name, results_ttl, click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') sys.exit(1) - queues = [cli_config.queue_class(queue, - connection=cli_config.connection, - job_class=cli_config.job_class, - serializer=serializer) - for queue in queues] + queues = [ + cli_config.queue_class( + queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer + ) + for queue in queues + ] worker = cli_config.worker_class( - queues, name=name, connection=cli_config.connection, - default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, + queues, + name=name, + connection=cli_config.connection, + default_worker_ttl=worker_ttl, + default_result_ttl=results_ttl, job_monitoring_interval=job_monitoring_interval, - job_class=cli_config.job_class, queue_class=cli_config.queue_class, + job_class=cli_config.job_class, + queue_class=cli_config.queue_class, exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer + serializer=serializer, ) # Should we configure Sentry? if sentry_dsn: - sentry_opts = { - "ca_certs": sentry_ca_certs, - "debug": sentry_debug - } + sentry_opts = {"ca_certs": sentry_ca_certs, "debug": sentry_debug} from rq.contrib.sentry import register_sentry + register_sentry(sentry_dsn, **sentry_opts) # if --verbose or --quiet, override --logging_level if verbose or quiet: logging_level = None - worker.work(burst=burst, logging_level=logging_level, - date_format=date_format, log_format=log_format, - max_jobs=max_jobs, with_scheduler=with_scheduler) + worker.work( + burst=burst, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + max_jobs=max_jobs, + with_scheduler=with_scheduler, + ) except ConnectionError as e: print(e) sys.exit(1) @@ -296,7 +330,9 @@ def suspend(cli_config, duration, **options): if duration: msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will - automatically resume""".format(duration) + automatically resume""".format( + duration + ) click.echo(msg) else: click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") @@ -312,27 +348,51 @@ def resume(cli_config, **options): @main.command() @click.option('--queue', '-q', help='The name of the queue.', default='default') -@click.option('--timeout', - help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.') +@click.option( + '--timeout', help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.' +) @click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.') @click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.') @click.option('--failure-ttl', help='Specifies how long failed jobs are kept.') @click.option('--description', help='Additional description of the job') -@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.', - multiple=True) +@click.option( + '--depends-on', help='Specifies another job id that must complete before this job will be queued.', multiple=True +) @click.option('--job-id', help='The id of this job') @click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') @click.option('--retry-max', help='Maximum amount of retries', default=0, type=int) @click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) @click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') -@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' - 'timezone (e.g. 2021-05-27T21:45:00).') +@click.option( + '--schedule-at', + help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' + 'timezone (e.g. 2021-05-27T21:45:00).', +) @click.option('--quiet', is_flag=True, help='Only logs errors.') @click.argument('function') @click.argument('arguments', nargs=-1) @pass_cli_config -def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, - retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options): +def enqueue( + cli_config, + queue, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + at_front, + retry_max, + retry_interval, + schedule_in, + schedule_at, + quiet, + serializer, + function, + arguments, + **options +): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) function_string = get_call_string(function, args, kwargs) @@ -348,11 +408,37 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio queue = cli_config.queue_class(queue, serializer=serializer) if schedule is None: - job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, - description, depends_on, job_id, at_front, None, retry) + job = queue.enqueue_call( + function, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + at_front, + None, + retry, + ) else: - job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, - description, depends_on, job_id, None, JobStatus.SCHEDULED, retry) + job = queue.create_job( + function, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + depends_on, + job_id, + None, + JobStatus.SCHEDULED, + retry, + ) queue.schedule_job(job, schedule) if not quiet: diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index af812ce..fb20109 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -13,8 +13,7 @@ from shutil import get_terminal_size import click from redis import Redis from redis.sentinel import Sentinel -from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, - DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) +from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS from rq.logutils import setup_loghandlers from rq.utils import import_attribute, parse_timeout from rq.worker import WorkerStatus @@ -27,16 +26,14 @@ yellow = partial(click.style, fg='yellow') def read_config_file(module): """Reads all UPPERCASE variables defined in the given module file.""" settings = importlib.import_module(module) - return dict([(k, v) - for k, v in settings.__dict__.items() - if k.upper() == k]) + return dict([(k, v) for k, v in settings.__dict__.items() if k.upper() == k]) def get_redis_from_config(settings, connection_class=Redis): """Returns a StrictRedis instance from a dictionary of settings. - To use redis sentinel, you must specify a dictionary in the configuration file. - Example of a dictionary with keys without values: - SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':} + To use redis sentinel, you must specify a dictionary in the configuration file. + Example of a dictionary with keys without values: + SENTINEL = {'INSTANCES':, 'SOCKET_TIMEOUT':, 'PASSWORD':,'DB':, 'MASTER_NAME':} """ if settings.get('REDIS_URL') is not None: return connection_class.from_url(settings['REDIS_URL']) @@ -50,8 +47,7 @@ def get_redis_from_config(settings, connection_class=Redis): ssl = settings['SENTINEL'].get('SSL', False) arguments = {'password': password, 'ssl': ssl} sn = Sentinel( - instances, socket_timeout=socket_timeout, password=password, - db=db, ssl=ssl, sentinel_kwargs=arguments + instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments ) return sn.master_for(master_name) @@ -168,9 +164,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): for queue in queue_dict: if queue_dict[queue]: queues_str = ", ".join( - sorted( - map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]) - ) + sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])) ) else: queues_str = '–' @@ -188,6 +182,7 @@ def show_both(queues, raw, by_queue, queue_class, worker_class): if not raw: click.echo('') import datetime + click.echo('Updated: %s' % datetime.datetime.now()) @@ -233,14 +228,14 @@ def parse_function_arg(argument, arg_pos): if index > 0: if ':' in argument and argument.index(':') + 1 == index: # keyword, json mode = ParsingMode.JSON - keyword = argument[:index - 1] + keyword = argument[: index - 1] elif '%' in argument and argument.index('%') + 1 == index: # keyword, literal_eval mode = ParsingMode.LITERAL_EVAL - keyword = argument[:index - 1] + keyword = argument[: index - 1] else: # keyword, text mode = ParsingMode.PLAIN_TEXT keyword = argument[:index] - value = argument[index + 1:] + value = argument[index + 1 :] else: # no keyword, text mode = ParsingMode.PLAIN_TEXT value = argument @@ -261,9 +256,11 @@ def parse_function_arg(argument, arg_pos): try: value = literal_eval(value) except Exception: - raise click.BadParameter('Unable to eval %s as Python object. See ' - 'https://docs.python.org/3/library/ast.html#ast.literal_eval' - % (keyword or '%s. non keyword argument' % arg_pos)) + raise click.BadParameter( + 'Unable to eval %s as Python object. See ' + 'https://docs.python.org/3/library/ast.html#ast.literal_eval' + % (keyword or '%s. non keyword argument' % arg_pos) + ) return keyword, value @@ -294,9 +291,19 @@ def parse_schedule(schedule_in, schedule_at): class CliConfig: """A helper class to be used with click commands, to handle shared options""" - def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS, - job_class=DEFAULT_JOB_CLASS, queue_class=DEFAULT_QUEUE_CLASS, - connection_class=DEFAULT_CONNECTION_CLASS, path=None, *args, **kwargs): + + def __init__( + self, + url=None, + config=None, + worker_class=DEFAULT_WORKER_CLASS, + job_class=DEFAULT_JOB_CLASS, + queue_class=DEFAULT_QUEUE_CLASS, + connection_class=DEFAULT_CONNECTION_CLASS, + path=None, + *args, + **kwargs + ): self._connection = None self.url = url self.config = config @@ -331,9 +338,7 @@ class CliConfig: self._connection = self.connection_class.from_url(self.url) elif self.config: settings = read_config_file(self.config) if self.config else {} - self._connection = get_redis_from_config(settings, - self.connection_class) + self._connection = get_redis_from_config(settings, self.connection_class) else: - self._connection = get_redis_from_config(os.environ, - self.connection_class) + self._connection = get_redis_from_config(os.environ, self.connection_class) return self._connection diff --git a/rq/command.py b/rq/command.py index 0f8ef6e..4566ec0 100644 --- a/rq/command.py +++ b/rq/command.py @@ -22,7 +22,7 @@ def send_command(connection: 'Redis', worker_name: str, command: str, **kwargs): - `shutdown`: Shuts down a worker - `kill-horse`: Command for the worker to kill the current working horse - `stop-job`: A command for the worker to stop the currently running job - + The command string will be parsed into a dictionary and send to a PubSub Topic. Workers listen to the PubSub, and `handle` the specific command. diff --git a/rq/connections.py b/rq/connections.py index c5ebc20..413ee5a 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -30,8 +30,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa Args: connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. """ - warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.", - DeprecationWarning) + warnings.warn( + "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning + ) if connection is None: connection = Redis() push_connection(connection) @@ -39,9 +40,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa yield finally: popped = pop_connection() - assert popped == connection, \ - 'Unexpected Redis connection was popped off the stack. ' \ - 'Check your Redis connection setup.' + assert popped == connection, ( + 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.' + ) def push_connection(redis: 'Redis'): @@ -72,8 +73,7 @@ def use_connection(redis: Optional['Redis'] = None): Args: redis (Optional[Redis], optional): A Redis Connection. Defaults to None. """ - assert len(_connection_stack) <= 1, \ - 'You should not mix Connection contexts with use_connection()' + assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()' release_local(_connection_stack) if redis is None: @@ -118,5 +118,4 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis': _connection_stack = LocalStack() -__all__ = ['Connection', 'get_current_connection', 'push_connection', - 'pop_connection', 'use_connection'] +__all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection'] diff --git a/rq/contrib/sentry.py b/rq/contrib/sentry.py index edf0fe4..efb55b2 100644 --- a/rq/contrib/sentry.py +++ b/rq/contrib/sentry.py @@ -4,4 +4,5 @@ def register_sentry(sentry_dsn, **opts): """ import sentry_sdk from sentry_sdk.integrations.rq import RqIntegration + sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()], **opts) diff --git a/rq/decorators.py b/rq/decorators.py index 70a61aa..2bf46e8 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -13,12 +13,23 @@ from .utils import backend_class class job: # noqa queue_class = Queue - def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None, - result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None, - queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None, - meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None, - retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None, - on_success: Optional[Callable[..., Any]] = None): + def __init__( + self, + queue: Union['Queue', str], + connection: Optional['Redis'] = None, + timeout: Optional[int] = None, + result_ttl: int = DEFAULT_RESULT_TTL, + ttl: Optional[int] = None, + queue_class: Optional['Queue'] = None, + depends_on: Optional[List[Any]] = None, + at_front: Optional[bool] = None, + meta: Optional[Dict[Any, Any]] = None, + description: Optional[str] = None, + failure_ttl: Optional[int] = None, + retry: Optional['Retry'] = None, + on_failure: Optional[Callable[..., Any]] = None, + on_success: Optional[Callable[..., Any]] = None, + ): """A decorator that adds a ``delay`` method to the decorated function, which in turn creates a RQ job when called. Accepts a required ``queue`` argument that can be either a ``Queue`` instance or a string @@ -68,8 +79,7 @@ class job: # noqa @wraps(f) def delay(*args, **kwargs): if isinstance(self.queue, str): - queue = self.queue_class(name=self.queue, - connection=self.connection) + queue = self.queue_class(name=self.queue, connection=self.connection) else: queue = self.queue @@ -83,10 +93,23 @@ class job: # noqa if not at_front: at_front = self.at_front - return queue.enqueue_call(f, args=args, kwargs=kwargs, - timeout=self.timeout, result_ttl=self.result_ttl, - ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front, - meta=self.meta, description=self.description, failure_ttl=self.failure_ttl, - retry=self.retry, on_failure=self.on_failure, on_success=self.on_success) + return queue.enqueue_call( + f, + args=args, + kwargs=kwargs, + timeout=self.timeout, + result_ttl=self.result_ttl, + ttl=self.ttl, + depends_on=depends_on, + job_id=job_id, + at_front=at_front, + meta=self.meta, + description=self.description, + failure_ttl=self.failure_ttl, + retry=self.retry, + on_failure=self.on_failure, + on_success=self.on_success, + ) + f.delay = delay return f diff --git a/rq/defaults.py b/rq/defaults.py index ef76678..bd50489 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -88,4 +88,3 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' Uses Python's default attributes as defined https://docs.python.org/3/library/logging.html#logrecord-attributes """ - diff --git a/rq/job.py b/rq/job.py index 9104747..61b359f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -8,8 +8,7 @@ from collections.abc import Iterable from datetime import datetime, timedelta, timezone from enum import Enum from redis import WatchError -from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List, - Optional, Tuple, Union) +from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union from uuid import uuid4 @@ -20,18 +19,27 @@ if TYPE_CHECKING: from redis.client import Pipeline from .connections import resolve_connection -from .exceptions import (DeserializationError, InvalidJobOperation, - NoSuchJobError) +from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack from .serializers import resolve_serializer from .types import FunctionReferenceType, JobDependencyType -from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string, - get_version, import_attribute, parse_timeout, str_to_date, - utcformat, utcnow) +from .utils import ( + as_text, + decode_redis_hash, + ensure_list, + get_call_string, + get_version, + import_attribute, + parse_timeout, + str_to_date, + utcformat, + utcnow, +) class JobStatus(str, Enum): - """The Status of Job within its lifecycle at any given time. """ + """The Status of Job within its lifecycle at any given time.""" + QUEUED = 'queued' FINISHED = 'finished' FAILED = 'failed' @@ -57,13 +65,9 @@ class Dependency: Raises: ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty - """ + """ dependent_jobs = ensure_list(jobs) - if not all( - isinstance(job, Job) or isinstance(job, str) - for job in dependent_jobs - if job - ): + if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job): raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") elif len(dependent_jobs) < 1: raise ValueError("jobs: cannot be empty.") @@ -79,8 +83,7 @@ yet been evaluated. """ -def cancel_job(job_id: str, connection: Optional['Redis'] = None, - serializer=None, enqueue_dependents: bool = False): +def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False): """Cancels the job with the given job ID, preventing execution. Use with caution. This will discard any job info (i.e. it can't be requeued later). @@ -103,13 +106,11 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J Returns: job (Optional[Job]): The current Job running - """ + """ if connection: - warnings.warn("connection argument for get_current_job is deprecated.", - DeprecationWarning) + warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning) if job_class: - warnings.warn("job_class argument for get_current_job is deprecated.", - DeprecationWarning) + warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning) return _job_stack.top @@ -123,26 +124,38 @@ def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job': Returns: Job: The requeued Job object. - """ + """ job = Job.fetch(job_id, connection=connection, serializer=serializer) return job.requeue() class Job: """A Job is just a convenient datastructure to pass around job (meta) data.""" + redis_job_namespace_prefix = 'rq:job:' @classmethod - def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None, - kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None, - result_ttl: Optional[int] = None, ttl: Optional[int] = None, - status: Optional[JobStatus] = None, description: Optional[str] =None, - depends_on: Optional[JobDependencyType] = None, - timeout: Optional[int] = None, id: Optional[str] = None, - origin=None, meta: Optional[Dict[str, Any]] = None, - failure_ttl: Optional[int] = None, serializer=None, *, - on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None) -> 'Job': + def create( + cls, + func: FunctionReferenceType, + args: Union[List[Any], Optional[Tuple]] = None, + kwargs: Optional[Dict[str, Any]] = None, + connection: Optional['Redis'] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + status: Optional[JobStatus] = None, + description: Optional[str] = None, + depends_on: Optional[JobDependencyType] = None, + timeout: Optional[int] = None, + id: Optional[str] = None, + origin=None, + meta: Optional[Dict[str, Any]] = None, + failure_ttl: Optional[int] = None, + serializer=None, + *, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None + ) -> 'Job': """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -185,7 +198,7 @@ class Job: Returns: Job: A job instance. - """ + """ if args is None: args = () if kwargs is None: @@ -247,10 +260,7 @@ class Job: depends_on_list = depends_on.dependencies else: depends_on_list = ensure_list(depends_on) - job._dependency_ids = [ - dep.id if isinstance(dep, Job) else dep - for dep in depends_on_list - ] + job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list] return job @@ -259,8 +269,9 @@ class Job: Returns: position (Optional[int]): The position - """ + """ from .queue import Queue + if self.origin: q = Queue(name=self.origin, connection=self.connection) return q.get_job_position(self._id) @@ -274,7 +285,7 @@ class Job: Returns: status (JobStatus): The Job Status - """ + """ if refresh: self._status = as_text(self.connection.hget(self.key, 'status')) return self._status @@ -285,7 +296,7 @@ class Job: Args: status (JobStatus): The Job Status to be set pipeline (Optional[Pipeline], optional): Optional Redis Pipeline to use. Defaults to None. - """ + """ self._status = status connection: 'Redis' = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'status', self._status) @@ -298,7 +309,7 @@ class Job: Returns: meta (Dict): The dictionary of metadata - """ + """ if refresh: meta = self.connection.hget(self.key, 'meta') self.meta = self.serializer.loads(meta) if meta else {} @@ -401,7 +412,7 @@ class Job: Raises: DeserializationError: Cathes any deserialization error (since serializers are generic) - """ + """ try: self._func_name, self._instance, self._args, self._kwargs = self.serializer.loads(self.data) except Exception as e: @@ -522,7 +533,7 @@ class Job: job_ids (Iterable[str]): A list of job ids. connection (Redis): Redis connection serializer (Callable): A serializer - + Returns: jobs (list[Job]): A list of Jobs instances. """ @@ -580,17 +591,14 @@ class Job: self.enqueue_at_front: Optional[bool] = None from .results import Result + self._cached_result: Optional[Result] = None def __repr__(self): # noqa # pragma: no cover - return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, - self._id, - self.enqueued_at) + return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, self._id, self.enqueued_at) def __str__(self): - return '<{0} {1}: {2}>'.format(self.__class__.__name__, - self.id, - self.description) + return '<{0} {1}: {2}>'.format(self.__class__.__name__, self.id, self.description) def __eq__(self, other): # noqa return isinstance(other, self.__class__) and self.id == other.id @@ -612,7 +620,7 @@ class Job: def set_id(self, value: str) -> None: """Sets a job ID for the given job - + Args: value (str): The value to set as Job ID """ @@ -630,7 +638,7 @@ class Job: ttl (int): The time to live pipeline (Optional[Pipeline], optional): Can receive a Redis' pipeline to use. Defaults to None. xx (bool, optional): Only sets the key if already exists. Defaults to False. - """ + """ self.last_heartbeat = timestamp connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) @@ -690,14 +698,15 @@ class Job: Returns: jobs (list[Job]): A list of Jobs - """ + """ connection = pipeline if pipeline is not None else self.connection if watch and self._dependency_ids: - connection.watch(*[self.key_for(dependency_id) - for dependency_id in self._dependency_ids]) + connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) - dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer) + dependencies_list = self.fetch_many( + self._dependency_ids, connection=self.connection, serializer=self.serializer + ) jobs = [job for job in dependencies_list if job] return jobs @@ -706,8 +715,7 @@ class Job: """ Get the latest result and returns `exc_info` only if the latest result is a failure. """ - warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", - DeprecationWarning) + warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning) from .results import Result @@ -730,6 +738,7 @@ class Job: result (Optional[Any]): The job return value. """ from .results import Result + if refresh: self._cached_result = None @@ -741,7 +750,7 @@ class Job: if self._cached_result and self._cached_result.type == Result.Type.SUCCESSFUL: return self._cached_result.return_value - + return None @property @@ -762,8 +771,7 @@ class Job: seconds by default). """ - warnings.warn("job.result is deprecated, use job.return_value instead.", - DeprecationWarning) + warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning) from .results import Result @@ -789,6 +797,7 @@ class Job: all_results (List[Result]): A list of 'Result' objects """ from .results import Result + return Result.all(self, serializer=self.serializer) def latest_result(self) -> Optional['Result']: @@ -796,9 +805,10 @@ class Job: Returns: result (Result): The Result object - """ + """ """Returns the latest Result object""" from .results import Result + return Result.fetch_latest(self, serializer=self.serializer) def restore(self, raw_data) -> Any: @@ -849,8 +859,7 @@ class Job: dep_ids = obj.get('dependency_ids') dep_id = obj.get('dependency_id') # for backwards compatibility - self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids - else [dep_id.decode()] if dep_id else []) + self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else [] allow_failures = obj.get('allow_dependency_failures') self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None @@ -902,7 +911,7 @@ class Job: 'started_at': utcformat(self.started_at) if self.started_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', - 'worker_name': self.worker_name or '' + 'worker_name': self.worker_name or '', } if self.retries_left is not None: @@ -948,8 +957,7 @@ class Job: return obj - def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, - include_result: bool = True): + def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, include_result: bool = True): """Dumps the current job instance to its corresponding Redis key. Exclude saving the `meta` dictionary by setting @@ -980,7 +988,7 @@ class Job: def get_redis_server_version(self) -> Tuple[int, int, int]: """Return Redis server version of connection - + Returns: redis_server_version (Tuple[int, int, int]): The Redis version within a Tuple of integers, eg (5, 0, 9) """ @@ -1016,15 +1024,13 @@ class Job: raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) from .registry import CanceledJobRegistry from .queue import Queue + pipe = pipeline or self.connection.pipeline() while True: try: q = Queue( - name=self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer + name=self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer ) self.set_status(JobStatus.CANCELED, pipeline=pipe) @@ -1033,16 +1039,10 @@ class Job: if pipeline is None: pipe.watch(self.dependents_key) q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id) - self._remove_from_registries( - pipeline=pipe, - remove_from_queue=True - ) + self._remove_from_registries(pipeline=pipe, remove_from_queue=True) registry = CanceledJobRegistry( - self.origin, - self.connection, - job_class=self.__class__, - serializer=self.serializer + self.origin, self.connection, job_class=self.__class__, serializer=self.serializer ) registry.add(self, pipeline=pipe) if pipeline is None: @@ -1070,41 +1070,43 @@ class Job: def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): from .registry import BaseRegistry + if remove_from_queue: from .queue import Queue + q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q.remove(self, pipeline=pipeline) registry: BaseRegistry if self.is_finished: from .registry import FinishedJobRegistry - registry = FinishedJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = FinishedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_deferred: from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = DeferredJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_started: from .registry import StartedJobRegistry - registry = StartedJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = StartedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_scheduled: from .registry import ScheduledJobRegistry - registry = ScheduledJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = ScheduledJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) elif self.is_failed or self.is_stopped: @@ -1112,13 +1114,15 @@ class Job: elif self.is_canceled: from .registry import CanceledJobRegistry - registry = CanceledJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + registry = CanceledJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.remove(self, pipeline=pipeline) - def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, - delete_dependents: bool = False): + def delete( + self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents: bool = False + ): """Cancels the job and deletes the job hash from Redis. Jobs depending on this job can optionally be deleted as well. @@ -1184,7 +1188,7 @@ class Job: 'last_heartbeat': utcformat(self.last_heartbeat), 'status': self._status, 'started_at': utcformat(self.started_at), # type: ignore - 'worker_name': worker_name + 'worker_name': worker_name, } if self.get_redis_server_version() >= (4, 0, 0): pipeline.hset(self.key, mapping=mapping) @@ -1199,7 +1203,7 @@ class Job: Returns: result (Any): The function result - """ + """ result = self.func(*self.args, **self.kwargs) if asyncio.iscoroutine(result): loop = asyncio.new_event_loop() @@ -1214,7 +1218,7 @@ class Job: Args: default_ttl (Optional[int]): The default time to live for the job - + Returns: ttl (int): The time to live """ @@ -1227,7 +1231,7 @@ class Job: Args: default_ttl (Optional[int]): The default time to live for the job result - + Returns: ttl (int): The time to live for the result """ @@ -1244,8 +1248,7 @@ class Job: call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75) return call_repr - def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, - remove_from_queue: bool = True): + def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its result depends on the value of ttl: @@ -1271,16 +1274,18 @@ class Job: @property def started_job_registry(self): from .registry import StartedJobRegistry - return StartedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + return StartedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) @property def failed_job_registry(self): from .registry import FailedJobRegistry - return FailedJobRegistry(self.origin, connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + + return FailedJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) def get_retry_interval(self) -> int: """Returns the desired retry interval. @@ -1332,10 +1337,9 @@ class Job: """ from .registry import DeferredJobRegistry - registry = DeferredJobRegistry(self.origin, - connection=self.connection, - job_class=self.__class__, - serializer=self.serializer) + registry = DeferredJobRegistry( + self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer + ) registry.add(self, pipeline=pipeline) connection = pipeline if pipeline is not None else self.connection @@ -1348,11 +1352,14 @@ class Job: @property def dependency_ids(self) -> List[bytes]: dependencies = self.connection.smembers(self.dependencies_key) - return [Job.key_for(_id.decode()) - for _id in dependencies] - - def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None, - exclude_job_id: Optional[str] = None) -> bool: + return [Job.key_for(_id.decode()) for _id in dependencies] + + def dependencies_are_met( + self, + parent_job: Optional['Job'] = None, + pipeline: Optional['Pipeline'] = None, + exclude_job_id: Optional[str] = None, + ) -> bool: """Returns a boolean indicating if all of this job's dependencies are `FINISHED` If a pipeline is passed, all dependencies are WATCHed. @@ -1373,8 +1380,7 @@ class Job: connection = pipeline if pipeline is not None else self.connection if pipeline is not None: - connection.watch(*[self.key_for(dependency_id) - for dependency_id in self._dependency_ids]) + connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids]) dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} @@ -1407,12 +1413,7 @@ class Job: if self.allow_dependency_failures: allowed_statuses.append(JobStatus.FAILED) - return all( - status.decode() in allowed_statuses - for status - in dependencies_statuses - if status - ) + return all(status.decode() in allowed_statuses for status in dependencies_statuses if status) _job_stack = LocalStack() diff --git a/rq/local.py b/rq/local.py index 458cd83..8e94457 100644 --- a/rq/local.py +++ b/rq/local.py @@ -122,6 +122,7 @@ class LocalStack: def _set__ident_func__(self, value): # noqa object.__setattr__(self._local, '__ident_func__', value) + __ident_func__ = property(_get__ident_func__, _set__ident_func__) del _get__ident_func__, _set__ident_func__ @@ -131,6 +132,7 @@ class LocalStack: if rv is None: raise RuntimeError('object unbound') return rv + return LocalProxy(_lookup) def push(self, obj): @@ -223,10 +225,7 @@ class LocalManager: release_local(local) def __repr__(self): - return '<%s storages: %d>' % ( - self.__class__.__name__, - len(self.locals) - ) + return '<%s storages: %d>' % (self.__class__.__name__, len(self.locals)) class LocalProxy: @@ -264,6 +263,7 @@ class LocalProxy: .. versionchanged:: 0.6.1 The class can be instanciated with a callable as well now. """ + __slots__ = ('__local', '__dict__', '__name__') def __init__(self, local, name=None): diff --git a/rq/logutils.py b/rq/logutils.py index 36a404d..33e0949 100644 --- a/rq/logutils.py +++ b/rq/logutils.py @@ -3,12 +3,15 @@ import sys from typing import Union from rq.utils import ColorizingStreamHandler -from rq.defaults import (DEFAULT_LOGGING_FORMAT, - DEFAULT_LOGGING_DATE_FORMAT) +from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT -def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT, - log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'): +def setup_loghandlers( + level: Union[int, str, None] = None, + date_format: str = DEFAULT_LOGGING_DATE_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT, + name: str = 'rq.worker', +): """Sets up a log handler. Args: @@ -17,7 +20,7 @@ def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DE date_format (str, optional): The date format to use. Defaults to DEFAULT_LOGGING_DATE_FORMAT ('%H:%M:%S'). log_format (str, optional): The log format to use. Defaults to DEFAULT_LOGGING_FORMAT ('%(asctime)s %(message)s'). name (str, optional): The looger name. Defaults to 'rq.worker'. - """ + """ logger = logging.getLogger(name) if not _has_effective_handler(logger): diff --git a/rq/queue.py b/rq/queue.py index 79125ce..bc7f070 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -30,11 +30,27 @@ blue = make_colorizer('darkblue') logger = logging.getLogger("rq.queue") - -class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout", - "result_ttl", "ttl", "failure_ttl", - "description", "job_id", - "at_front", "meta", "retry", "on_success", "on_failure"])): +class EnqueueData( + namedtuple( + 'EnqueueData', + [ + "func", + "args", + "kwargs", + "timeout", + "result_ttl", + "ttl", + "failure_ttl", + "description", + "job_id", + "at_front", + "meta", + "retry", + "on_success", + "on_failure", + ], + ) +): """Helper type to use when calling enqueue_many NOTE: Does not support `depends_on` yet. """ @@ -50,7 +66,9 @@ class Queue: redis_queues_keys: str = 'rq:queues' @classmethod - def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']: + def all( + cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None + ) -> List['Queue']: """Returns an iterable of all Queues. Args: @@ -64,17 +82,22 @@ class Queue: connection = resolve_connection(connection) def to_queue(queue_key): - return cls.from_queue_key(as_text(queue_key), - connection=connection, - job_class=job_class, serializer=serializer) + return cls.from_queue_key( + as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer + ) all_registerd_queues = connection.smembers(cls.redis_queues_keys) all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key] return all_queues @classmethod - def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue': + def from_queue_key( + cls, + queue_key: str, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, + ) -> 'Queue': """Returns a Queue instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Queues by their Redis keys. @@ -94,11 +117,19 @@ class Queue: prefix = cls.redis_queue_namespace_prefix if not queue_key.startswith(prefix): raise ValueError('Not a valid RQ queue key: {0}'.format(queue_key)) - name = queue_key[len(prefix):] + name = queue_key[len(prefix) :] return cls(name, connection=connection, job_class=job_class, serializer=serializer) - def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None, - is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs): + def __init__( + self, + name: str = 'default', + default_timeout: Optional[int] = None, + connection: Optional['Redis'] = None, + is_async: bool = True, + job_class: Union[str, Type['Job'], None] = None, + serializer: Any = None, + **kwargs, + ): """Initializes a Queue object. Args: @@ -109,7 +140,7 @@ class Queue: If `is_async` is false, jobs will run on the same process from where it was called. Defaults to True. job_class (Union[str, 'Job', optional): Job class or a string referencing the Job class path. Defaults to None. serializer (Any, optional): Serializer. Defaults to None. - """ + """ self.connection = resolve_connection(connection) prefix = self.redis_queue_namespace_prefix self.name = name @@ -145,7 +176,7 @@ class Queue: def get_redis_server_version(self) -> Tuple[int, int, int]: """Return Redis server version of connection - + Returns: redis_version (Tuple): A tuple with the parsed Redis version (eg: (5,0,0)) """ @@ -184,7 +215,7 @@ class Queue: Returns: script (...): The Lua Script is called. - """ + """ script = """ local prefix = "{0}" local q = KEYS[1] @@ -201,13 +232,17 @@ class Queue: count = count + 1 end return count - """.format(self.job_class.redis_job_namespace_prefix).encode("utf-8") + """.format( + self.job_class.redis_job_namespace_prefix + ).encode( + "utf-8" + ) script = self.connection.register_script(script) return script(keys=[self.key]) def delete(self, delete_jobs: bool = True): """Deletes the queue. - + Args: delete_jobs (bool): If true, removes all the associated messages on the queue first. """ @@ -221,7 +256,7 @@ class Queue: def is_empty(self) -> bool: """Returns whether the current queue is empty. - + Returns: is_empty (bool): Whether the queue is empty """ @@ -242,7 +277,7 @@ class Queue: Returns: job (Optional[Job]): The job if found - """ + """ try: job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: @@ -251,7 +286,7 @@ class Queue: if job.origin == self.name: return job - def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]: + def get_job_position(self, job_or_id: Union['Job', str]) -> Optional[int]: """Returns the position of a job within the queue Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of @@ -293,11 +328,7 @@ class Queue: end = offset + (length - 1) else: end = length - job_ids = [ - as_text(job_id) - for job_id - in self.connection.lrange(self.key, start, end) - ] + job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)] self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") return job_ids @@ -333,18 +364,21 @@ class Queue: def failed_job_registry(self): """Returns this queue's FailedJobRegistry.""" from rq.registry import FailedJobRegistry + return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def started_job_registry(self): """Returns this queue's StartedJobRegistry.""" from rq.registry import StartedJobRegistry + return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def finished_job_registry(self): """Returns this queue's FinishedJobRegistry.""" from rq.registry import FinishedJobRegistry + # TODO: Why was job_class only ommited here before? Was it intentional? return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @@ -352,18 +386,21 @@ class Queue: def deferred_job_registry(self): """Returns this queue's DeferredJobRegistry.""" from rq.registry import DeferredJobRegistry + return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def scheduled_job_registry(self): """Returns this queue's ScheduledJobRegistry.""" from rq.registry import ScheduledJobRegistry + return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) @property def canceled_job_registry(self): """Returns this queue's CanceledJobRegistry.""" from rq.registry import CanceledJobRegistry + return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None): @@ -387,8 +424,7 @@ class Queue: """Removes all "dead" jobs from the queue by cycling through it, while guaranteeing FIFO semantics. """ - COMPACT_QUEUE = '{0}_compact:{1}'.format( - self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa + COMPACT_QUEUE = '{0}_compact:{1}'.format(self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa self.connection.rename(self.key, COMPACT_QUEUE) while True: @@ -414,12 +450,25 @@ class Queue: result = connection.rpush(self.key, job_id) self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") - def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None, - job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED, - retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None, - on_failure: Optional[Callable] = None) -> Job: + def create_job( + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + meta: Optional[Dict] = None, + status: JobStatus = JobStatus.QUEUED, + retry: Optional['Retry'] = None, + *, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, + ) -> Job: """Creates a job based on parameters given Args: @@ -461,12 +510,23 @@ class Queue: raise ValueError('Job ttl must be greater than 0') job = self.job_class.create( - func, args=args, kwargs=kwargs, connection=self.connection, - result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - status=status, description=description, - depends_on=depends_on, timeout=timeout, id=job_id, - origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success, - on_failure=on_failure + func, + args=args, + kwargs=kwargs, + connection=self.connection, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + status=status, + description=description, + depends_on=depends_on, + timeout=timeout, + id=job_id, + origin=self.name, + meta=meta, + serializer=self.serializer, + on_success=on_success, + on_failure=on_failure, ) if retry: @@ -501,10 +561,7 @@ class Queue: # is called from within this method. pipe.watch(job.dependencies_key) - dependencies = job.fetch_dependencies( - watch=True, - pipeline=pipe - ) + dependencies = job.fetch_dependencies(watch=True, pipeline=pipe) pipe.multi() @@ -535,12 +592,25 @@ class Queue: pipeline.multi() # Ensure pipeline in multi mode before returning to caller return job - def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None, - job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None, - retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None, - on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job: + def enqueue_call( + self, + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + depends_on: Optional['JobDependencyType'] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable[..., Any]] = None, + on_failure: Optional[Callable[..., Any]] = None, + pipeline: Optional['Pipeline'] = None, + ) -> Job: """Creates a job to represent the delayed function call and enqueues it. It is much like `.enqueue()`, except that it takes the function's args @@ -567,30 +637,49 @@ class Queue: Returns: Job: The enqueued Job - """ + """ job = self.create_job( - func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, - failure_ttl=failure_ttl, description=description, depends_on=depends_on, - job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, - retry=retry, on_success=on_success, on_failure=on_failure + func, + args=args, + kwargs=kwargs, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + meta=meta, + status=JobStatus.QUEUED, + timeout=timeout, + retry=retry, + on_success=on_success, + on_failure=on_failure, ) - job = self.setup_dependencies( - job, - pipeline=pipeline - ) + job = self.setup_dependencies(job, pipeline=pipeline) # If we do not depend on an unfinished job, enqueue the job. if job.get_status(refresh=False) != JobStatus.DEFERRED: return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) return job @staticmethod - def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, - timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, - failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None, - at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, - on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData: + def prepare_data( + func: 'FunctionReferenceType', + args: Union[Tuple, List, None] = None, + kwargs: Optional[Dict] = None, + timeout: Optional[int] = None, + result_ttl: Optional[int] = None, + ttl: Optional[int] = None, + failure_ttl: Optional[int] = None, + description: Optional[str] = None, + job_id: Optional[str] = None, + at_front: bool = False, + meta: Optional[Dict] = None, + retry: Optional['Retry'] = None, + on_success: Optional[Callable] = None, + on_failure: Optional[Callable] = None, + ) -> EnqueueData: """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples And can keep this logic within EnqueueData @@ -614,10 +703,20 @@ class Queue: EnqueueData: The EnqueueData """ return EnqueueData( - func, args, kwargs, timeout, - result_ttl, ttl, failure_ttl, - description, job_id, - at_front, meta, retry, on_success, on_failure + func, + args, + kwargs, + timeout, + result_ttl, + ttl, + failure_ttl, + description, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, ) def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]: @@ -635,18 +734,24 @@ class Queue: jobs = [ self.enqueue_job( self.create_job( - job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl, + job_data.func, + args=job_data.args, + kwargs=job_data.kwargs, + result_ttl=job_data.result_ttl, ttl=job_data.ttl, - failure_ttl=job_data.failure_ttl, description=job_data.description, + failure_ttl=job_data.failure_ttl, + description=job_data.description, depends_on=None, - job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED, + job_id=job_data.job_id, + meta=job_data.meta, + status=JobStatus.QUEUED, timeout=job_data.timeout, retry=job_data.retry, on_success=job_data.on_success, - on_failure=job_data.on_failure + on_failure=job_data.on_failure, ), pipeline=pipe, - at_front=job_data.at_front + at_front=job_data.at_front, ) for job_data in job_datas ] @@ -662,7 +767,7 @@ class Queue: Returns: Job: _description_ - """ + """ job.perform() job.set_status(JobStatus.FINISHED) job.save(include_meta=False) @@ -687,8 +792,7 @@ class Queue: kwargs (*kwargs): function kargs """ if not isinstance(f, str) and f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed ' - 'by workers') + raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers') # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) @@ -711,9 +815,24 @@ class Queue: args = kwargs.pop('args', None) kwargs = kwargs.pop('kwargs', None) - return (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, on_failure, - pipeline, args, kwargs) + return ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job': """Creates a job to represent the delayed function call and enqueues it. @@ -727,16 +846,42 @@ class Queue: Returns: job (Job): The created Job """ - (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, - on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) + ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) = Queue.parse_args(f, *args, **kwargs) return self.enqueue_call( - func=f, args=args, kwargs=kwargs, timeout=timeout, - result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, - description=description, depends_on=depends_on, job_id=job_id, - at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure, - pipeline=pipeline + func=f, + args=args, + kwargs=kwargs, + timeout=timeout, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + at_front=at_front, + meta=meta, + retry=retry, + on_success=on_success, + on_failure=on_failure, + pipeline=pipeline, ) def enqueue_at(self, datetime: datetime, f, *args, **kwargs): @@ -749,14 +894,41 @@ class Queue: Returns: _type_: _description_ """ - (f, timeout, description, result_ttl, ttl, failure_ttl, - depends_on, job_id, at_front, meta, retry, on_success, on_failure, - pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) - job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, ttl=ttl, - failure_ttl=failure_ttl, description=description, - depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, - on_success=on_success, on_failure=on_failure) + ( + f, + timeout, + description, + result_ttl, + ttl, + failure_ttl, + depends_on, + job_id, + at_front, + meta, + retry, + on_success, + on_failure, + pipeline, + args, + kwargs, + ) = Queue.parse_args(f, *args, **kwargs) + job = self.create_job( + f, + status=JobStatus.SCHEDULED, + args=args, + kwargs=kwargs, + timeout=timeout, + result_ttl=result_ttl, + ttl=ttl, + failure_ttl=failure_ttl, + description=description, + depends_on=depends_on, + job_id=job_id, + meta=meta, + retry=retry, + on_success=on_success, + on_failure=on_failure, + ) if at_front: job.enqueue_at_front = True return self.schedule_job(job, datetime, pipeline=pipeline) @@ -773,6 +945,7 @@ class Queue: _type_: _description_ """ from .registry import ScheduledJobRegistry + registry = ScheduledJobRegistry(queue=self) pipe = pipeline if pipeline is not None else self.connection.pipeline() @@ -795,8 +968,7 @@ class Queue: Returns: job (Job): The enqueued Job """ - return self.enqueue_at(datetime.now(timezone.utc) + time_delta, - func, *args, **kwargs) + return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs) def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: """Enqueues a job for delayed execution. @@ -838,14 +1010,14 @@ class Queue: return job def run_sync(self, job: 'Job') -> 'Job': - """Run a job synchronously, meaning on the same process the method was called. + """Run a job synchronously, meaning on the same process the method was called. Args: job (Job): The job to run Returns: Job: The job instance - """ + """ with self.connection.pipeline() as pipeline: job.prepare_for_execution('sync', pipeline) @@ -861,7 +1033,9 @@ class Queue: return job - def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None): + def enqueue_dependents( + self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None + ): """Enqueues all jobs in the given job's dependents set and clears it. When called without a pipeline, this method uses WATCH/MULTI/EXEC. @@ -886,20 +1060,19 @@ class Queue: if pipeline is None: pipe.watch(dependents_key) - dependent_job_ids = {as_text(_id) - for _id in pipe.smembers(dependents_key)} + dependent_job_ids = {as_text(_id) for _id in pipe.smembers(dependents_key)} # There's no dependents if not dependent_job_ids: break jobs_to_enqueue = [ - dependent_job for dependent_job - in self.job_class.fetch_many( - dependent_job_ids, - connection=self.connection, - serializer=self.serializer - ) if dependent_job and dependent_job.dependencies_are_met( + dependent_job + for dependent_job in self.job_class.fetch_many( + dependent_job_ids, connection=self.connection, serializer=self.serializer + ) + if dependent_job + and dependent_job.dependencies_are_met( parent_job=job, pipeline=pipe, exclude_job_id=exclude_job_id, @@ -914,10 +1087,9 @@ class Queue: for dependent in jobs_to_enqueue: enqueue_at_front = dependent.enqueue_at_front or False - registry = DeferredJobRegistry(dependent.origin, - self.connection, - job_class=self.job_class, - serializer=self.serializer) + registry = DeferredJobRegistry( + dependent.origin, self.connection, job_class=self.job_class, serializer=self.serializer + ) registry.remove(dependent, pipeline=pipe) if dependent.origin == self.name: @@ -1000,8 +1172,14 @@ class Queue: return None @classmethod - def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None, - job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']: + def dequeue_any( + cls, + queues: List['Queue'], + timeout: int, + connection: Optional['Redis'] = None, + job_class: Optional['Job'] = None, + serializer: Any = None, + ) -> Tuple['Job', 'Queue']: """Class method returning the job_class instance at the front of the given set of Queues, where the order of the queues is important. @@ -1033,10 +1211,7 @@ class Queue: if result is None: return None queue_key, job_id = map(as_text, result) - queue = cls.from_queue_key(queue_key, - connection=connection, - job_class=job_class, - serializer=serializer) + queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer) try: job = job_class.fetch(job_id, connection=connection, serializer=serializer) except NoSuchJobError: diff --git a/rq/registry.py b/rq/registry.py index f581776..509bd87 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -23,11 +23,18 @@ class BaseRegistry: Each job is stored as a key in the registry, scored by expiration time (unix timestamp). """ + job_class = Job key_template = 'rq:registry:{0}' - def __init__(self, name: str = 'default', connection: Optional['Redis'] = None, - job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None): + def __init__( + self, + name: str = 'default', + connection: Optional['Redis'] = None, + job_class: Optional[Type['Job']] = None, + queue: Optional['Queue'] = None, + serializer: Any = None, + ): if queue: self.name = queue.name self.connection = resolve_connection(queue.connection) @@ -46,8 +53,8 @@ class BaseRegistry: def __eq__(self, other): return ( - self.name == other.name and - self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs + self.name == other.name + and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs ) def __contains__(self, item: Union[str, 'Job']): @@ -134,8 +141,7 @@ class BaseRegistry: _type_: _description_ """ self.cleanup() - return [as_text(job_id) for job_id in - self.connection.zrange(self.key, start, end)] + return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)] def get_queue(self): """Returns Queue object associated with this registry.""" @@ -175,8 +181,7 @@ class BaseRegistry: raise InvalidJobOperation with self.connection.pipeline() as pipeline: - queue = Queue(job.origin, connection=self.connection, - job_class=self.job_class, serializer=serializer) + queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer) job.started_at = None job.ended_at = None job._exc_info = '' @@ -195,6 +200,7 @@ class StartedJobRegistry(BaseRegistry): Jobs are added to registry right before they are executed and removed right after completion (success or failure). """ + key_template = 'rq:wip:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -216,9 +222,7 @@ class StartedJobRegistry(BaseRegistry): with self.connection.pipeline() as pipeline: for job_id in job_ids: try: - job = self.job_class.fetch(job_id, - connection=self.connection, - serializer=self.serializer) + job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer) except NoSuchJobError: continue @@ -246,6 +250,7 @@ class FinishedJobRegistry(BaseRegistry): Registry of jobs that have been completed. Jobs are added to this registry after they have successfully completed for monitoring purposes. """ + key_template = 'rq:finished:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -263,6 +268,7 @@ class FailedJobRegistry(BaseRegistry): """ Registry of containing failed jobs. """ + key_template = 'rq:failed:{0}' def cleanup(self, timestamp: Optional[float] = None): @@ -275,8 +281,14 @@ class FailedJobRegistry(BaseRegistry): score = timestamp if timestamp is not None else current_timestamp() self.connection.zremrangebyscore(self.key, 0, score) - def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None, - _save_exc_to_job: bool = False): + def add( + self, + job: 'Job', + ttl=None, + exc_string: str = '', + pipeline: Optional['Pipeline'] = None, + _save_exc_to_job: bool = False, + ): """ Adds a job to a registry with expiry time of now + ttl. `ttl` defaults to DEFAULT_FAILURE_TTL if not specified. @@ -303,6 +315,7 @@ class DeferredJobRegistry(BaseRegistry): """ Registry of deferred jobs (waiting for another job to finish). """ + key_template = 'rq:deferred:{0}' def cleanup(self): @@ -316,6 +329,7 @@ class ScheduledJobRegistry(BaseRegistry): """ Registry of scheduled jobs. """ + key_template = 'rq:scheduled:{0}' def __init__(self, *args, **kwargs): @@ -396,7 +410,7 @@ class ScheduledJobRegistry(BaseRegistry): class CanceledJobRegistry(BaseRegistry): key_template = 'rq:canceled:{0}' - def get_expired_job_ids(self, timestamp: Optional[datetime] = None): + def get_expired_job_ids(self, timestamp: Optional[datetime] = None): raise NotImplementedError def cleanup(self): @@ -412,19 +426,16 @@ def clean_registries(queue: 'Queue'): Args: queue (Queue): The queue to clean """ - registry = FinishedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = FinishedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() - registry = StartedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = StartedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() - registry = FailedJobRegistry(name=queue.name, - connection=queue.connection, - job_class=queue.job_class, - serializer=queue.serializer) + registry = FailedJobRegistry( + name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer + ) registry.cleanup() diff --git a/rq/results.py b/rq/results.py index 0df131a..a6dafde 100644 --- a/rq/results.py +++ b/rq/results.py @@ -17,15 +17,22 @@ def get_key(job_id): class Result(object): - class Type(Enum): SUCCESSFUL = 1 FAILED = 2 STOPPED = 3 - def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None, - created_at: Optional[datetime] = None, return_value: Optional[Any] = None, - exc_string: Optional[str] = None, serializer=None): + def __init__( + self, + job_id: str, + type: Type, + connection: Redis, + id: Optional[str] = None, + created_at: Optional[datetime] = None, + return_value: Optional[Any] = None, + exc_string: Optional[str] = None, + serializer=None, + ): self.return_value = return_value self.exc_string = exc_string self.type = type @@ -49,16 +56,26 @@ class Result(object): @classmethod def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None): - result = cls(job_id=job.id, type=type, connection=job.connection, - return_value=return_value, - exc_string=exc_string, serializer=job.serializer) + result = cls( + job_id=job.id, + type=type, + connection=job.connection, + return_value=return_value, + exc_string=exc_string, + serializer=job.serializer, + ) result.save(ttl=ttl, pipeline=pipeline) return result @classmethod def create_failure(cls, job, ttl, exc_string, pipeline=None): - result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection, - exc_string=exc_string, serializer=job.serializer) + result = cls( + job_id=job.id, + type=cls.Type.FAILED, + connection=job.connection, + exc_string=exc_string, + serializer=job.serializer, + ) result.save(ttl=ttl, pipeline=pipeline) return result @@ -70,8 +87,7 @@ class Result(object): results = [] for (result_id, payload) in response: results.append( - cls.restore(job.id, result_id.decode(), payload, - connection=job.connection, serializer=serializer) + cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) ) return results @@ -89,9 +105,7 @@ class Result(object): @classmethod def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result': """Create a Result object from given Redis payload""" - created_at = datetime.fromtimestamp( - int(result_id.split('-')[0]) / 1000, tz=timezone.utc - ) + created_at = datetime.fromtimestamp(int(result_id.split('-')[0]) / 1000, tz=timezone.utc) payload = decode_redis_hash(payload) # data, timestamp = payload # result_data = json.loads(data) @@ -106,11 +120,15 @@ class Result(object): if exc_string: exc_string = zlib.decompress(b64decode(exc_string)).decode() - return Result(job_id, Result.Type(int(payload['type'])), connection=connection, - id=result_id, - created_at=created_at, - return_value=return_value, - exc_string=exc_string) + return Result( + job_id, + Result.Type(int(payload['type'])), + connection=connection, + id=result_id, + created_at=created_at, + return_value=return_value, + exc_string=exc_string, + ) @classmethod def fetch(cls, job: Job, serializer=None) -> Optional['Result']: @@ -134,8 +152,7 @@ class Result(object): return None result_id, payload = response[0] - return cls.restore(job.id, result_id.decode(), payload, - connection=job.connection, serializer=serializer) + return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) @classmethod def get_key(cls, job_id): diff --git a/rq/scheduler.py b/rq/scheduler.py index de8f26e..da59b0d 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -9,8 +9,7 @@ from multiprocessing import Process from redis import SSLConnection, UnixDomainSocketConnection -from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, - DEFAULT_SCHEDULER_FALLBACK_PERIOD) +from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD from .job import Job from .logutils import setup_loghandlers from .queue import Queue @@ -35,9 +34,16 @@ class RQScheduler: Status = SchedulerStatus - def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, - date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, serializer=None): + def __init__( + self, + queues, + connection, + interval=1, + logging_level=logging.INFO, + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + serializer=None, + ): self._queue_names = set(parse_names(queues)) self._acquired_locks = set() self._scheduled_job_registries = [] @@ -59,9 +65,7 @@ class RQScheduler: # the key is necessary. # `path` is not left in the dictionary as that keyword argument is # not expected by `redis.client.Redis` and would raise an exception. - self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( - 'path' - ) + self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path') self.serializer = resolve_serializer(serializer) self._connection = None @@ -158,9 +162,7 @@ class RQScheduler: queue = Queue(registry.name, connection=self.connection, serializer=self.serializer) with self.connection.pipeline() as pipeline: - jobs = Job.fetch_many( - job_ids, connection=self.connection, serializer=self.serializer - ) + jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer) for job in jobs: if job is not None: queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) @@ -181,8 +183,7 @@ class RQScheduler: def heartbeat(self): """Updates the TTL on scheduler keys and the locks""" - self.log.debug("Scheduler sending heartbeat to %s", - ", ".join(self.acquired_locks)) + self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks)) if len(self._queue_names) > 1: with self.connection.pipeline() as pipeline: for name in self._acquired_locks: @@ -194,8 +195,7 @@ class RQScheduler: self.connection.expire(key, self.interval + 60) def stop(self): - self.log.info("Scheduler stopping, releasing locks for %s...", - ','.join(self._queue_names)) + self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names)) self.release_locks() self._status = self.Status.STOPPED @@ -231,15 +231,11 @@ class RQScheduler: def run(scheduler): - scheduler.log.info("Scheduler for %s started with PID %s", - ','.join(scheduler._queue_names), os.getpid()) + scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid()) try: scheduler.work() except: # noqa - scheduler.log.error( - 'Scheduler [PID %s] raised an exception.\n%s', - os.getpid(), traceback.format_exc() - ) + scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc()) raise scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) diff --git a/rq/serializers.py b/rq/serializers.py index 9e63bc7..b9b7d9c 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -11,7 +11,7 @@ class DefaultSerializer: loads = pickle.loads -class JSONSerializer(): +class JSONSerializer: @staticmethod def dumps(*args, **kwargs): return json.dumps(*args, **kwargs).encode('utf-8') @@ -29,7 +29,7 @@ def resolve_serializer(serializer=None): Args: serializer (Callable): The serializer to resolve. - + Returns: serializer (Callable): An object that implements the SerializerProtocol """ diff --git a/rq/timeouts.py b/rq/timeouts.py index c9f1e44..a1401c5 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -5,6 +5,7 @@ import threading class BaseTimeoutException(Exception): """Base exception for timeouts.""" + pass @@ -12,6 +13,7 @@ class JobTimeoutException(BaseTimeoutException): """Raised when a job takes longer to complete than the allowed maximum timeout value. """ + pass @@ -19,6 +21,7 @@ class HorseMonitorTimeoutException(BaseTimeoutException): """Raised when waiting for a horse exiting takes longer than the maximum timeout value. """ + pass @@ -56,10 +59,8 @@ class BaseDeathPenalty: class UnixSignalDeathPenalty(BaseDeathPenalty): - def handle_death_penalty(self, signum, frame): - raise self._exception('Task exceeded maximum timeout value ' - '({0} seconds)'.format(self._timeout)) + raise self._exception('Task exceeded maximum timeout value ' '({0} seconds)'.format(self._timeout)) def setup_death_penalty(self): """Sets up an alarm signal and a signal handler that raises @@ -85,15 +86,12 @@ class TimerDeathPenalty(BaseDeathPenalty): # Monkey-patch exception with the message ahead of time # since PyThreadState_SetAsyncExc can only take a class def init_with_message(self, *args, **kwargs): # noqa - super(exception, self).__init__( - "Task exceeded maximum timeout value ({0} seconds)".format(timeout) - ) + super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout)) self._exception.__init__ = init_with_message def new_timer(self): - """Returns a new timer since timers can only be used once. - """ + """Returns a new timer since timers can only be used once.""" return threading.Timer(self._timeout, self.handle_death_penalty) def handle_death_penalty(self): @@ -111,13 +109,11 @@ class TimerDeathPenalty(BaseDeathPenalty): raise SystemError("PyThreadState_SetAsyncExc failed") def setup_death_penalty(self): - """Starts the timer. - """ + """Starts the timer.""" self._timer = self.new_timer() self._timer.start() def cancel_death_penalty(self): - """Cancels the timer. - """ + """Cancels the timer.""" self._timer.cancel() self._timer = None diff --git a/rq/utils.py b/rq/utils.py index 8993487..1c3fa01 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -41,10 +41,8 @@ class _Colorizer: self.codes["blink"] = esc + "05m" self.codes["overline"] = esc + "06m" - dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", - "purple", "teal", "lightgray"] - light_colors = ["darkgray", "red", "green", "yellow", "blue", - "fuchsia", "turquoise", "white"] + dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"] + light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"] x = 30 for d, l in zip(dark_colors, light_colors): @@ -90,8 +88,10 @@ def make_colorizer(color: str): >>> # You can then use: >>> print("It's either " + green('OK') + ' or ' + red('Oops')) """ + def inner(text): return colorizer.colorize(color, text) + return inner @@ -134,7 +134,7 @@ def compact(lst: List[Any]) -> List[Any]: Returns: object (list): The list without None values - """ + """ return [item for item in lst if item is not None] @@ -149,7 +149,7 @@ def as_text(v: Union[bytes, str]) -> Optional[str]: Returns: value (Optional[str]): Either the decoded string or None - """ + """ if v is None: return None elif isinstance(v, bytes): @@ -169,7 +169,7 @@ def decode_redis_hash(h) -> Dict[str, Any]: Returns: Dict[str, Any]: The decoded Redis data (Dictionary) - """ + """ return dict((as_text(k), h[k]) for k in h) @@ -230,8 +230,7 @@ def utcnow(): def now(): - """Return now in UTC - """ + """Return now in UTC""" return datetime.datetime.now(datetime.timezone.utc) @@ -356,8 +355,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]: def parse_timeout(timeout: Any): - """Transfer all kinds of timeout format to an integer representing seconds - """ + """Transfer all kinds of timeout format to an integer representing seconds""" if not isinstance(timeout, numbers.Integral) and timeout is not None: try: timeout = int(timeout) @@ -367,9 +365,11 @@ def parse_timeout(timeout: Any): try: timeout = int(digit) * unit_second[unit] except (ValueError, KeyError): - raise TimeoutFormatError('Timeout must be an integer or a string representing an integer, or ' - 'a string with format: digits + unit, unit can be "d", "h", "m", "s", ' - 'such as "1h", "23m".') + raise TimeoutFormatError( + 'Timeout must be an integer or a string representing an integer, or ' + 'a string with format: digits + unit, unit can be "d", "h", "m", "s", ' + 'such as "1h", "23m".' + ) return timeout @@ -381,7 +381,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: Args: connection (Redis): The Redis connection. - + Returns: version (Tuple[int, int, int]): A tuple representing the semantic versioning format (eg. (5, 0, 9)) """ @@ -391,7 +391,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]: setattr( connection, "__rq_redis_server_version", - tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]) + tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]), ) return getattr(connection, "__rq_redis_server_version") except ResponseError: # fakeredis doesn't implement Redis' INFO command @@ -422,7 +422,7 @@ def split_list(a_list: List[Any], segment_size: int): list: The splitted listed """ for i in range(0, len(a_list), segment_size): - yield a_list[i:i + segment_size] + yield a_list[i : i + segment_size] def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: @@ -431,7 +431,7 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: Args: data (str): The data to truncate max_length (Optional[int], optional): The max length. Defaults to None. - + Returns: truncated (str): The truncated string """ @@ -440,8 +440,9 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str: return (data[:max_length] + '...') if len(data) > max_length else data -def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], - max_length: Optional[int] = None) -> Optional[str]: +def get_call_string( + func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None +) -> Optional[str]: """ Returns a string representation of the call, formatted as a regular Python function invocation statement. If max_length is not None, truncate diff --git a/rq/worker.py b/rq/worker.py index 35355c7..06a707e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection -from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, - DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, - DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) +from .defaults import ( + CALLBACK_TIMEOUT, + DEFAULT_MAINTENANCE_TASK_INTERVAL, + DEFAULT_RESULT_TTL, + DEFAULT_WORKER_TTL, + DEFAULT_JOB_MONITORING_INTERVAL, + DEFAULT_LOGGING_FORMAT, + DEFAULT_LOGGING_DATE_FORMAT, +) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .job import Job, JobStatus from .logutils import setup_loghandlers @@ -46,8 +52,7 @@ from .results import Result from .scheduler import RQScheduler from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty -from .utils import (backend_class, ensure_list, get_version, - make_colorizer, utcformat, utcnow, utcparse, compact) +from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact from .version import VERSION from .worker_registration import clean_worker_registry, get_keys from .serializers import resolve_serializer @@ -55,9 +60,11 @@ from .serializers import resolve_serializer try: from setproctitle import setproctitle as setprocname except ImportError: + def setprocname(*args, **kwargs): # noqa pass + green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') @@ -69,10 +76,9 @@ class StopRequested(Exception): pass - -_signames = dict((getattr(signal, signame), signame) - for signame in dir(signal) - if signame.startswith('SIG') and '_' not in signame) +_signames = dict( + (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame +) def signal_name(signum): @@ -118,7 +124,7 @@ class Worker: job_class: Optional[Type['Job']] = None, queue_class: Optional[Type['Queue']] = None, queue: Optional['Queue'] = None, - serializer=None + serializer=None, ) -> List['Worker']: """Returns an iterable of all Workers. @@ -131,11 +137,12 @@ class Worker: connection = get_current_connection() worker_keys = get_keys(queue=queue, connection=connection) - workers = [cls.find_by_key(as_text(key), - connection=connection, - job_class=job_class, - queue_class=queue_class, serializer=serializer) - for key in worker_keys] + workers = [ + cls.find_by_key( + as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer + ) + for key in worker_keys + ] return compact(workers) @classmethod @@ -148,9 +155,8 @@ class Worker: Returns: list_keys (List[str]): A list of worker keys - """ - return [as_text(key) - for key in get_keys(queue=queue, connection=connection)] + """ + return [as_text(key) for key in get_keys(queue=queue, connection=connection)] @classmethod def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): @@ -166,8 +172,14 @@ class Worker: return len(get_keys(queue=queue, connection=connection)) @classmethod - def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None, - queue_class: Type['Queue'] = None, serializer=None): + def find_by_key( + cls, + worker_key: str, + connection: Optional['Redis'] = None, + job_class: Type['Job'] = None, + queue_class: Type['Queue'] = None, + serializer=None, + ): """Returns a Worker instance, based on the naming conventions for naming the internal Redis keys. Can be used to reverse-lookup Workers by their Redis keys. @@ -182,22 +194,39 @@ class Worker: connection.srem(cls.redis_workers_keys, worker_key) return None - name = worker_key[len(prefix):] - worker = cls([], name, connection=connection, job_class=job_class, - queue_class=queue_class, prepare_for_work=False, serializer=serializer) + name = worker_key[len(prefix) :] + worker = cls( + [], + name, + connection=connection, + job_class=job_class, + queue_class=queue_class, + prepare_for_work=False, + serializer=serializer, + ) worker.refresh() return worker - def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, - connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None, - default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None, - queue_class=None, log_job_description: bool = True, - job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, - disable_default_exception_handler: bool = False, - prepare_for_work: bool = True, serializer=None): # noqa - + def __init__( + self, + queues, + name: Optional[str] = None, + default_result_ttl=DEFAULT_RESULT_TTL, + connection: Optional['Redis'] = None, + exc_handler=None, + exception_handlers=None, + default_worker_ttl=DEFAULT_WORKER_TTL, + job_class: Type['Job'] = None, + queue_class=None, + log_job_description: bool = True, + job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, + disable_default_exception_handler: bool = False, + prepare_for_work: bool = True, + serializer=None, + ): # noqa + connection = self._set_connection(connection, default_worker_ttl) self.connection = connection self.redis_server_version = None @@ -208,11 +237,12 @@ class Worker: self.python_version = sys.version self.serializer = resolve_serializer(serializer) - queues = [self.queue_class(name=q, - connection=connection, - job_class=self.job_class, serializer=self.serializer) - if isinstance(q, str) else q - for q in ensure_list(queues)] + queues = [ + self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) + if isinstance(q, str) + else q + for q in ensure_list(queues) + ] self.name: str = name or uuid4().hex self.queues = queues @@ -250,24 +280,14 @@ class Worker: try: connection.client_setname(self.name) except redis.exceptions.ResponseError: - warnings.warn( - 'CLIENT SETNAME command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: - client_adresses = [ - client['addr'] - for client in connection.client_list() - if client['name'] == self.name - ] + client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name] if len(client_adresses) > 0: self.ip_address = client_adresses[0] else: - warnings.warn( - 'CLIENT LIST command not supported, setting ip_address to unknown', - Warning - ) + warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning) self.ip_address = 'unknown' else: self.hostname = None @@ -361,8 +381,7 @@ class Worker: def register_birth(self): """Registers its own birth.""" self.log.debug('Registering birth of worker %s', self.name) - if self.connection.exists(self.key) and \ - not self.connection.hexists(self.key, 'death'): + if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'): msg = 'There exists an active worker named {0!r} already' raise ValueError(msg.format(self.name)) key = self.key @@ -436,10 +455,7 @@ class Worker: def _set_state(self, state): """Raise a DeprecationWarning if ``worker.state = X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.set_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning) self.set_state(state) def get_state(self): @@ -447,10 +463,7 @@ class Worker: def _get_state(self): """Raise a DeprecationWarning if ``worker.state == X`` is used""" - warnings.warn( - "worker.state is deprecated, use worker.get_state() instead.", - DeprecationWarning - ) + warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning) return self.get_state() state = property(_get_state, _set_state) @@ -516,8 +529,7 @@ class Worker: return pid, stat def request_force_stop(self, signum, frame): - """Terminates the application (cold shutdown). - """ + """Terminates the application (cold shutdown).""" self.log.warning('Cold shut down') # Take down the horse with the worker @@ -547,8 +559,7 @@ class Worker: if self.get_state() == WorkerStatus.BUSY: self._stop_requested = True self.set_shutdown_requested_date() - self.log.debug('Stopping after current horse is finished. ' - 'Press Ctrl+C again for a cold shutdown.') + self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.') if self.scheduler: self.stop_scheduler() else: @@ -614,8 +625,15 @@ class Worker: def reorder_queues(self, reference_queue): pass - def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, - log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False): + def work( + self, + burst: bool = False, + logging_level: str = "INFO", + date_format=DEFAULT_LOGGING_DATE_FORMAT, + log_format=DEFAULT_LOGGING_FORMAT, + max_jobs=None, + with_scheduler: bool = False, + ): """Starts the work loop. Pops and performs all jobs on the current list of queues. When all @@ -635,8 +653,13 @@ class Worker: if with_scheduler: self.scheduler = RQScheduler( - self.queues, connection=self.connection, logging_level=logging_level, - date_format=date_format, log_format=log_format, serializer=self.serializer) + self.queues, + connection=self.connection, + logging_level=logging_level, + date_format=date_format, + log_format=log_format, + serializer=self.serializer, + ) self.scheduler.acquire_locks() # If lock is acquired, start scheduler if self.scheduler.acquired_locks: @@ -676,10 +699,7 @@ class Worker: completed_jobs += 1 if max_jobs is not None: if completed_jobs >= max_jobs: - self.log.info( - "Worker %s: finished executing %d jobs, quitting", - self.key, completed_jobs - ) + self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs) break except redis.exceptions.TimeoutError: @@ -694,10 +714,7 @@ class Worker: raise except: # noqa - self.log.error( - 'Worker %s: found an unhandled exception, quitting...', - self.key, exc_info=True - ) + self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True) break finally: if not self.is_horse: @@ -736,19 +753,20 @@ class Worker: self.run_maintenance_tasks() self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") - result = self.queue_class.dequeue_any(self._ordered_queues, timeout, - connection=self.connection, - job_class=self.job_class, - serializer=self.serializer) + result = self.queue_class.dequeue_any( + self._ordered_queues, + timeout, + connection=self.connection, + job_class=self.job_class, + serializer=self.serializer, + ) self.log.debug(f"Dequeued job {result[1]} from {result[0]}") if result is not None: job, queue = result job.redis_server_version = self.get_redis_server_version() if self.log_job_description: - self.log.info( - '%s: %s (%s)', green(queue.name), - blue(job.description), job.id) + self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id) else: self.log.info('%s: %s', green(queue.name), job.id) @@ -756,8 +774,9 @@ class Worker: except DequeueTimeout: pass except redis.exceptions.ConnectionError as conn_err: - self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', - conn_err, connection_wait_time) + self.log.error( + 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time + ) time.sleep(connection_wait_time) connection_wait_time *= self.exponential_backoff_factor connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) @@ -782,18 +801,44 @@ class Worker: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) - self.log.debug('Sent heartbeat to prevent worker timeout. ' - 'Next one should arrive within %s seconds.', timeout) + self.log.debug( + 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout + ) def refresh(self): data = self.connection.hmget( - self.key, 'queues', 'state', 'current_job', 'last_heartbeat', - 'birth', 'failed_job_count', 'successful_job_count', 'total_working_time', - 'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version', + self.key, + 'queues', + 'state', + 'current_job', + 'last_heartbeat', + 'birth', + 'failed_job_count', + 'successful_job_count', + 'total_working_time', + 'current_job_working_time', + 'hostname', + 'ip_address', + 'pid', + 'version', + 'python_version', ) - (queues, state, job_id, last_heartbeat, birth, failed_job_count, - successful_job_count, total_working_time, current_job_working_time, - hostname, ip_address, pid, version, python_version) = data + ( + queues, + state, + job_id, + last_heartbeat, + birth, + failed_job_count, + successful_job_count, + total_working_time, + current_job_working_time, + hostname, + ip_address, + pid, + version, + python_version, + ) = data queues = as_text(queues) self.hostname = as_text(hostname) self.ip_address = as_text(ip_address) @@ -820,10 +865,12 @@ class Worker: self.current_job_working_time = float(as_text(current_job_working_time)) if queues: - self.queues = [self.queue_class(queue, - connection=self.connection, - job_class=self.job_class, serializer=self.serializer) - for queue in queues.split(',')] + self.queues = [ + self.queue_class( + queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer + ) + for queue in queues.split(',') + ] def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): connection = pipeline if pipeline is not None else self.connection @@ -834,12 +881,10 @@ class Worker: connection.hincrby(self.key, 'successful_job_count', 1) def increment_total_working_time(self, job_execution_time, pipeline): - pipeline.hincrbyfloat(self.key, 'total_working_time', - job_execution_time.total_seconds()) + pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds()) def fork_work_horse(self, job: 'Job', queue: 'Queue'): - """Spawns a work horse to perform the actual work and passes it a job. - """ + """Spawns a work horse to perform the actual work and passes it a job.""" child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -909,24 +954,20 @@ class Worker: elif self._stopped_job_id == job.id: # Work-horse killed deliberately self.log.warning('Job stopped by user, moving job to FailedJobRegistry') - self.handle_job_failure( - job, queue=queue, - exc_string="Job stopped by user, work-horse terminated." - ) + self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.") elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: if not job.ended_at: job.ended_at = utcnow() # Unhandled failure: move the job to the failed queue - self.log.warning(( - 'Moving job to FailedJobRegistry ' - '(work-horse terminated unexpectedly; waitpid returned {})' - ).format(ret_val)) + self.log.warning( + ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format( + ret_val + ) + ) self.handle_job_failure( - job, queue=queue, - exc_string="Work-horse was terminated unexpectedly " - "(waitpid returned %s)" % ret_val + job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val ) def execute_job(self, job: 'Job', queue: 'Queue'): @@ -1009,8 +1050,7 @@ class Worker: msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, - exc_string=''): + def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''): """ Handles the failure or an executing job by: 1. Setting the job status to failed @@ -1023,10 +1063,7 @@ class Worker: with self.connection.pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry( - job.origin, - self.connection, - job_class=self.job_class, - serializer=self.serializer + job.origin, self.connection, job_class=self.job_class, serializer=self.serializer ) # check whether a job was stopped intentionally and set the job @@ -1045,14 +1082,19 @@ class Worker: started_job_registry.remove(job, pipeline=pipeline) if not self.disable_default_exception_handler and not retry: - failed_job_registry = FailedJobRegistry(job.origin, job.connection, - job_class=self.job_class, serializer=job.serializer) + failed_job_registry = FailedJobRegistry( + job.origin, job.connection, job_class=self.job_class, serializer=job.serializer + ) # Exception should be saved in job hash if server # doesn't support Redis streams _save_exc_to_job = not self.supports_redis_streams - failed_job_registry.add(job, ttl=job.failure_ttl, - exc_string=exc_string, pipeline=pipeline, - _save_exc_to_job=_save_exc_to_job) + failed_job_registry.add( + job, + ttl=job.failure_ttl, + exc_string=exc_string, + pipeline=pipeline, + _save_exc_to_job=_save_exc_to_job, + ) if self.supports_redis_streams: Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) with suppress(redis.exceptions.ConnectionError): @@ -1061,9 +1103,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_failed_job_count(pipeline) if job.started_at and job.ended_at: - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) if retry: job.retry(queue, pipeline) @@ -1099,9 +1139,7 @@ class Worker: self.set_current_job_id(None, pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline) - self.increment_total_working_time( - job.ended_at - job.started_at, pipeline # type: ignore - ) + self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: @@ -1111,16 +1149,15 @@ class Worker: # doesn't support Redis streams include_result = not self.supports_redis_streams # Don't clobber user's meta dictionary! - job.save(pipeline=pipeline, include_meta=False, - include_result=include_result) + job.save(pipeline=pipeline, include_meta=False, include_result=include_result) if self.supports_redis_streams: - Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result, - ttl=result_ttl, pipeline=pipeline) + Result.create( + job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline + ) finished_job_registry = queue.finished_job_registry finished_job_registry.add(job, result_ttl, pipeline) - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) + job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False) self.log.debug('Removing job %s from StartedJobRegistry', job.id) started_job_registry.remove(job, pipeline=pipeline) @@ -1172,9 +1209,7 @@ class Worker: if job.success_callback: self.execute_success_callback(job, rv) - self.handle_job_success(job=job, - queue=queue, - started_job_registry=started_job_registry) + self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry) except: # NOQA self.log.debug(f"Job {job.id} raised an exception.") job.ended_at = utcnow() @@ -1185,15 +1220,13 @@ class Worker: try: self.execute_failure_callback(job) except: # noqa - self.log.error( - 'Worker %s: error while executing failure callback', - self.key, exc_info=True - ) + self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True) exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) - self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, - started_job_registry=started_job_registry) + self.handle_job_failure( + job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry + ) self.handle_exception(job, *exc_info) return False @@ -1237,10 +1270,9 @@ class Worker: # the properties below should be safe however extra.update({'queue': job.origin, 'job_id': job.id}) - + # func_name - self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, - extra=extra) + self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra) for handler in self._exc_handlers: self.log.debug('Invoking exception handler %s', handler) @@ -1322,6 +1354,7 @@ class HerokuWorker(Worker): * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn causes the horse to crash `imminent_shutdown_delay` seconds later """ + imminent_shutdown_delay = 6 frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] @@ -1335,10 +1368,7 @@ class HerokuWorker(Worker): def handle_warm_shutdown_request(self): """If horse is alive send it SIGRTMIN""" if self.horse_pid != 0: - self.log.info( - 'Worker %s: warm shut down requested, sending horse SIGRTMIN signal', - self.key - ) + self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key) self.kill_horse(sig=signal.SIGRTMIN) else: self.log.warning('Warm shut down requested, no horse found') @@ -1348,8 +1378,9 @@ class HerokuWorker(Worker): self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately') self.request_force_stop_sigrtmin(signum, frame) else: - self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', - self.imminent_shutdown_delay) + self.log.warning( + 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay + ) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.alarm(self.imminent_shutdown_delay) @@ -1367,7 +1398,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[:pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] class RandomWorker(Worker):