Black style (#1292)

* treewide: apply black style

This PR applied the black code style, adds it to the CI and README. The
changes look big, however no functional changed are applied at all.

The line length is set to 120, which is different from black
recommendation of 88. I would suggest to stick to the 88 recommendation
and adapt the flake8 check.

Add the `-S` parameter to `black` to keep single quotes.

Signed-off-by: Paul Spooren <mail@aparcar.org>

* README: add black badge

Help people to find the used code style.

Signed-off-by: Paul Spooren <mail@aparcar.org>

* CI: check codestyle via black

Automatically run the CI and check that the code style is still black.

Signed-off-by: Paul Spooren <mail@aparcar.org>

---------

Signed-off-by: Paul Spooren <mail@aparcar.org>
main
Paul Spooren 2 years ago committed by GitHub
parent cd62b4cb50
commit fdb14df181
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,7 +25,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
pip install flake8 pip install flake8 black
- name: Lint with flake8 - name: Lint with flake8
run: | run: |
@ -33,3 +33,7 @@ jobs:
flake8 . --select=E9,F63,F7,F82 --show-source flake8 . --select=E9,F63,F7,F82 --show-source
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide # exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --exit-zero --max-complexity=5 flake8 . --exit-zero --max-complexity=5
- name: Lint with black
run: |
black -S -l 120 rq/

@ -8,6 +8,8 @@ RQ requires Redis >= 3.0.0.
[![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22) [![Build status](https://github.com/rq/rq/workflows/Test%20rq/badge.svg)](https://github.com/rq/rq/actions?query=workflow%3A%22Test+rq%22)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)
[![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq) [![Coverage](https://codecov.io/gh/rq/rq/branch/master/graph/badge.svg)](https://codecov.io/gh/rq/rq)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
Full documentation can be found [here][d]. Full documentation can be found [here][d].

@ -1,7 +1,6 @@
# flake8: noqa # flake8: noqa
from .connections import (Connection, get_current_connection, pop_connection, from .connections import Connection, get_current_connection, pop_connection, push_connection, use_connection
push_connection, use_connection)
from .job import cancel_job, get_current_job, requeue_job, Retry from .job import cancel_job, get_current_job, requeue_job, Retry
from .queue import Queue from .queue import Queue
from .version import VERSION from .version import VERSION

@ -10,22 +10,34 @@ import click
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Connection, Retry, __version__ as version from rq import Connection, Retry, __version__ as version
from rq.cli.helpers import (read_config_file, refresh, from rq.cli.helpers import (
read_config_file,
refresh,
setup_loghandlers_from_args, setup_loghandlers_from_args,
show_both, show_queues, show_workers, CliConfig, parse_function_args, show_both,
parse_schedule) show_queues,
show_workers,
CliConfig,
parse_function_args,
parse_schedule,
)
from rq.contrib.legacy import cleanup_ghosts from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, from rq.defaults import (
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS, DEFAULT_CONNECTION_CLASS,
DEFAULT_RESULT_TTL, DEFAULT_WORKER_TTL, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS,
DEFAULT_WORKER_CLASS,
DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT,
DEFAULT_SERIALIZER_CLASS) DEFAULT_LOGGING_DATE_FORMAT,
DEFAULT_SERIALIZER_CLASS,
)
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries from rq.registry import FailedJobRegistry, clean_registries
from rq.utils import import_attribute, get_call_string, make_colorizer from rq.utils import import_attribute, get_call_string, make_colorizer
from rq.suspension import (suspend as connection_suspend, from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended
resume as connection_resume, is_suspended)
from rq.worker_registration import clean_worker_registry from rq.worker_registration import clean_worker_registry
from rq.job import JobStatus from rq.job import JobStatus
@ -39,35 +51,26 @@ click.disable_unicode_literals_warning = True
shared_options = [ shared_options = [
click.option('--url', '-u', click.option('--url', '-u', envvar='RQ_REDIS_URL', help='URL describing Redis connection details.'),
envvar='RQ_REDIS_URL', click.option('--config', '-c', envvar='RQ_CONFIG', help='Module containing RQ settings.'),
help='URL describing Redis connection details.'), click.option(
click.option('--config', '-c', '--worker-class', '-w', envvar='RQ_WORKER_CLASS', default=DEFAULT_WORKER_CLASS, help='RQ Worker class to use'
envvar='RQ_CONFIG', ),
help='Module containing RQ settings.'), click.option('--job-class', '-j', envvar='RQ_JOB_CLASS', default=DEFAULT_JOB_CLASS, help='RQ Job class to use'),
click.option('--worker-class', '-w', click.option('--queue-class', envvar='RQ_QUEUE_CLASS', default=DEFAULT_QUEUE_CLASS, help='RQ Queue class to use'),
envvar='RQ_WORKER_CLASS', click.option(
default=DEFAULT_WORKER_CLASS, '--connection-class',
help='RQ Worker class to use'),
click.option('--job-class', '-j',
envvar='RQ_JOB_CLASS',
default=DEFAULT_JOB_CLASS,
help='RQ Job class to use'),
click.option('--queue-class',
envvar='RQ_QUEUE_CLASS',
default=DEFAULT_QUEUE_CLASS,
help='RQ Queue class to use'),
click.option('--connection-class',
envvar='RQ_CONNECTION_CLASS', envvar='RQ_CONNECTION_CLASS',
default=DEFAULT_CONNECTION_CLASS, default=DEFAULT_CONNECTION_CLASS,
help='Redis client class to use'), help='Redis client class to use',
click.option('--path', '-P', ),
default=['.'], click.option('--path', '-P', default=['.'], help='Specify the import path.', multiple=True),
help='Specify the import path.', click.option(
multiple=True), '--serializer',
click.option('--serializer', '-S', '-S',
default=DEFAULT_SERIALIZER_CLASS, default=DEFAULT_SERIALIZER_CLASS,
help='Path to serializer, defaults to rq.serializers.DefaultSerializer') help='Path to serializer, defaults to rq.serializers.DefaultSerializer',
),
] ]
@ -100,15 +103,16 @@ def empty(cli_config, all, queues, serializer, **options):
"""Empty given queues.""" """Empty given queues."""
if all: if all:
queues = cli_config.queue_class.all(connection=cli_config.connection, queues = cli_config.queue_class.all(
job_class=cli_config.job_class, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
serializer=serializer) )
else: else:
queues = [cli_config.queue_class(queue, queues = [
connection=cli_config.connection, cli_config.queue_class(
job_class=cli_config.job_class, queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
serializer=serializer) )
for queue in queues] for queue in queues
]
if not queues: if not queues:
click.echo('Nothing to do') click.echo('Nothing to do')
@ -127,10 +131,9 @@ def empty(cli_config, all, queues, serializer, **options):
def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options): def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
"""Requeue failed jobs.""" """Requeue failed jobs."""
failed_job_registry = FailedJobRegistry(queue, failed_job_registry = FailedJobRegistry(
connection=cli_config.connection, queue, connection=cli_config.connection, job_class=job_class, serializer=serializer
job_class=job_class, )
serializer=serializer)
if all: if all:
job_ids = failed_job_registry.get_job_ids() job_ids = failed_job_registry.get_job_ids()
@ -159,8 +162,7 @@ def requeue(cli_config, queue, all, job_class, serializer, job_ids, **options):
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, **options):
**options):
"""RQ command-line monitor.""" """RQ command-line monitor."""
if only_queues: if only_queues:
@ -182,8 +184,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
clean_registries(queue) clean_registries(queue)
clean_worker_registry(queue) clean_worker_registry(queue)
refresh(interval, func, qs, raw, by_queue, refresh(interval, func, qs, raw, by_queue, cli_config.queue_class, cli_config.worker_class)
cli_config.queue_class, cli_config.worker_class)
except ConnectionError as e: except ConnectionError as e:
click.echo(e) click.echo(e)
sys.exit(1) sys.exit(1)
@ -200,7 +201,12 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--name', '-n', help='Specify a different name') @click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') @click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used')
@click.option('--job-monitoring-interval', type=int, default=DEFAULT_JOB_MONITORING_INTERVAL, help='Default job monitoring interval to be used') @click.option(
'--job-monitoring-interval',
type=int,
default=DEFAULT_JOB_MONITORING_INTERVAL,
help='Default job monitoring interval to be used',
)
@click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.') @click.option('--disable-job-desc-logging', is_flag=True, help='Turn off description logging.')
@click.option('--verbose', '-v', is_flag=True, help='Show more output') @click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output') @click.option('--quiet', '-q', is_flag=True, help='Show less output')
@ -215,11 +221,31 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def worker(cli_config, burst, logging_level, name, results_ttl, def worker(
worker_ttl, job_monitoring_interval, disable_job_desc_logging, cli_config,
verbose, quiet, sentry_ca_certs, sentry_debug, sentry_dsn, burst,
exception_handler, pid, disable_default_exception_handler, max_jobs, logging_level,
with_scheduler, queues, log_format, date_format, serializer, **options): name,
results_ttl,
worker_ttl,
job_monitoring_interval,
disable_job_desc_logging,
verbose,
quiet,
sentry_ca_certs,
sentry_debug,
sentry_dsn,
exception_handler,
pid,
disable_default_exception_handler,
max_jobs,
with_scheduler,
queues,
log_format,
date_format,
serializer,
**options
):
"""Starts an RQ worker.""" """Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {} settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments # Worker specific default arguments
@ -245,38 +271,46 @@ def worker(cli_config, burst, logging_level, name, results_ttl,
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red') click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1) sys.exit(1)
queues = [cli_config.queue_class(queue, queues = [
connection=cli_config.connection, cli_config.queue_class(
job_class=cli_config.job_class, queue, connection=cli_config.connection, job_class=cli_config.job_class, serializer=serializer
serializer=serializer) )
for queue in queues] for queue in queues
]
worker = cli_config.worker_class( worker = cli_config.worker_class(
queues, name=name, connection=cli_config.connection, queues,
default_worker_ttl=worker_ttl, default_result_ttl=results_ttl, name=name,
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_monitoring_interval=job_monitoring_interval, job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class, queue_class=cli_config.queue_class, job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None, exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler, disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging, log_job_description=not disable_job_desc_logging,
serializer=serializer serializer=serializer,
) )
# Should we configure Sentry? # Should we configure Sentry?
if sentry_dsn: if sentry_dsn:
sentry_opts = { sentry_opts = {"ca_certs": sentry_ca_certs, "debug": sentry_debug}
"ca_certs": sentry_ca_certs,
"debug": sentry_debug
}
from rq.contrib.sentry import register_sentry from rq.contrib.sentry import register_sentry
register_sentry(sentry_dsn, **sentry_opts) register_sentry(sentry_dsn, **sentry_opts)
# if --verbose or --quiet, override --logging_level # if --verbose or --quiet, override --logging_level
if verbose or quiet: if verbose or quiet:
logging_level = None logging_level = None
worker.work(burst=burst, logging_level=logging_level, worker.work(
date_format=date_format, log_format=log_format, burst=burst,
max_jobs=max_jobs, with_scheduler=with_scheduler) logging_level=logging_level,
date_format=date_format,
log_format=log_format,
max_jobs=max_jobs,
with_scheduler=with_scheduler,
)
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)
@ -296,7 +330,9 @@ def suspend(cli_config, duration, **options):
if duration: if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration) automatically resume""".format(
duration
)
click.echo(msg) click.echo(msg)
else: else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed") click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")
@ -312,27 +348,51 @@ def resume(cli_config, **options):
@main.command() @main.command()
@click.option('--queue', '-q', help='The name of the queue.', default='default') @click.option('--queue', '-q', help='The name of the queue.', default='default')
@click.option('--timeout', @click.option(
help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.') '--timeout', help='Specifies the maximum runtime of the job before it is interrupted and marked as failed.'
)
@click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.') @click.option('--result-ttl', help='Specifies how long successful jobs and their results are kept.')
@click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.') @click.option('--ttl', help='Specifies the maximum queued time of the job before it is discarded.')
@click.option('--failure-ttl', help='Specifies how long failed jobs are kept.') @click.option('--failure-ttl', help='Specifies how long failed jobs are kept.')
@click.option('--description', help='Additional description of the job') @click.option('--description', help='Additional description of the job')
@click.option('--depends-on', help='Specifies another job id that must complete before this job will be queued.', @click.option(
multiple=True) '--depends-on', help='Specifies another job id that must complete before this job will be queued.', multiple=True
)
@click.option('--job-id', help='The id of this job') @click.option('--job-id', help='The id of this job')
@click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end') @click.option('--at-front', is_flag=True, help='Will place the job at the front of the queue, instead of the end')
@click.option('--retry-max', help='Maximum amount of retries', default=0, type=int) @click.option('--retry-max', help='Maximum amount of retries', default=0, type=int)
@click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0]) @click.option('--retry-interval', help='Interval between retries in seconds', multiple=True, type=int, default=[0])
@click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).') @click.option('--schedule-in', help='Delay until the function is enqueued (e.g. 10s, 5m, 2d).')
@click.option('--schedule-at', help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without ' @click.option(
'timezone (e.g. 2021-05-27T21:45:00).') '--schedule-at',
help='Schedule job to be enqueued at a certain time formatted in ISO 8601 without '
'timezone (e.g. 2021-05-27T21:45:00).',
)
@click.option('--quiet', is_flag=True, help='Only logs errors.') @click.option('--quiet', is_flag=True, help='Only logs errors.')
@click.argument('function') @click.argument('function')
@click.argument('arguments', nargs=-1) @click.argument('arguments', nargs=-1)
@pass_cli_config @pass_cli_config
def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, description, depends_on, job_id, at_front, def enqueue(
retry_max, retry_interval, schedule_in, schedule_at, quiet, serializer, function, arguments, **options): cli_config,
queue,
timeout,
result_ttl,
ttl,
failure_ttl,
description,
depends_on,
job_id,
at_front,
retry_max,
retry_interval,
schedule_in,
schedule_at,
quiet,
serializer,
function,
arguments,
**options
):
"""Enqueues a job from the command line""" """Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments) args, kwargs = parse_function_args(arguments)
function_string = get_call_string(function, args, kwargs) function_string = get_call_string(function, args, kwargs)
@ -348,11 +408,37 @@ def enqueue(cli_config, queue, timeout, result_ttl, ttl, failure_ttl, descriptio
queue = cli_config.queue_class(queue, serializer=serializer) queue = cli_config.queue_class(queue, serializer=serializer)
if schedule is None: if schedule is None:
job = queue.enqueue_call(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, job = queue.enqueue_call(
description, depends_on, job_id, at_front, None, retry) function,
args,
kwargs,
timeout,
result_ttl,
ttl,
failure_ttl,
description,
depends_on,
job_id,
at_front,
None,
retry,
)
else: else:
job = queue.create_job(function, args, kwargs, timeout, result_ttl, ttl, failure_ttl, job = queue.create_job(
description, depends_on, job_id, None, JobStatus.SCHEDULED, retry) function,
args,
kwargs,
timeout,
result_ttl,
ttl,
failure_ttl,
description,
depends_on,
job_id,
None,
JobStatus.SCHEDULED,
retry,
)
queue.schedule_job(job, schedule) queue.schedule_job(job, schedule)
if not quiet: if not quiet:

@ -13,8 +13,7 @@ from shutil import get_terminal_size
import click import click
from redis import Redis from redis import Redis
from redis.sentinel import Sentinel from redis.sentinel import Sentinel
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, from rq.defaults import DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
from rq.logutils import setup_loghandlers from rq.logutils import setup_loghandlers
from rq.utils import import_attribute, parse_timeout from rq.utils import import_attribute, parse_timeout
from rq.worker import WorkerStatus from rq.worker import WorkerStatus
@ -27,9 +26,7 @@ yellow = partial(click.style, fg='yellow')
def read_config_file(module): def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file.""" """Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module) settings = importlib.import_module(module)
return dict([(k, v) return dict([(k, v) for k, v in settings.__dict__.items() if k.upper() == k])
for k, v in settings.__dict__.items()
if k.upper() == k])
def get_redis_from_config(settings, connection_class=Redis): def get_redis_from_config(settings, connection_class=Redis):
@ -50,8 +47,7 @@ def get_redis_from_config(settings, connection_class=Redis):
ssl = settings['SENTINEL'].get('SSL', False) ssl = settings['SENTINEL'].get('SSL', False)
arguments = {'password': password, 'ssl': ssl} arguments = {'password': password, 'ssl': ssl}
sn = Sentinel( sn = Sentinel(
instances, socket_timeout=socket_timeout, password=password, instances, socket_timeout=socket_timeout, password=password, db=db, ssl=ssl, sentinel_kwargs=arguments
db=db, ssl=ssl, sentinel_kwargs=arguments
) )
return sn.master_for(master_name) return sn.master_for(master_name)
@ -168,9 +164,7 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
for queue in queue_dict: for queue in queue_dict:
if queue_dict[queue]: if queue_dict[queue]:
queues_str = ", ".join( queues_str = ", ".join(
sorted( sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue]))
map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queue_dict[queue])
)
) )
else: else:
queues_str = '' queues_str = ''
@ -188,6 +182,7 @@ def show_both(queues, raw, by_queue, queue_class, worker_class):
if not raw: if not raw:
click.echo('') click.echo('')
import datetime import datetime
click.echo('Updated: %s' % datetime.datetime.now()) click.echo('Updated: %s' % datetime.datetime.now())
@ -261,9 +256,11 @@ def parse_function_arg(argument, arg_pos):
try: try:
value = literal_eval(value) value = literal_eval(value)
except Exception: except Exception:
raise click.BadParameter('Unable to eval %s as Python object. See ' raise click.BadParameter(
'Unable to eval %s as Python object. See '
'https://docs.python.org/3/library/ast.html#ast.literal_eval' 'https://docs.python.org/3/library/ast.html#ast.literal_eval'
% (keyword or '%s. non keyword argument' % arg_pos)) % (keyword or '%s. non keyword argument' % arg_pos)
)
return keyword, value return keyword, value
@ -294,9 +291,19 @@ def parse_schedule(schedule_in, schedule_at):
class CliConfig: class CliConfig:
"""A helper class to be used with click commands, to handle shared options""" """A helper class to be used with click commands, to handle shared options"""
def __init__(self, url=None, config=None, worker_class=DEFAULT_WORKER_CLASS,
job_class=DEFAULT_JOB_CLASS, queue_class=DEFAULT_QUEUE_CLASS, def __init__(
connection_class=DEFAULT_CONNECTION_CLASS, path=None, *args, **kwargs): self,
url=None,
config=None,
worker_class=DEFAULT_WORKER_CLASS,
job_class=DEFAULT_JOB_CLASS,
queue_class=DEFAULT_QUEUE_CLASS,
connection_class=DEFAULT_CONNECTION_CLASS,
path=None,
*args,
**kwargs
):
self._connection = None self._connection = None
self.url = url self.url = url
self.config = config self.config = config
@ -331,9 +338,7 @@ class CliConfig:
self._connection = self.connection_class.from_url(self.url) self._connection = self.connection_class.from_url(self.url)
elif self.config: elif self.config:
settings = read_config_file(self.config) if self.config else {} settings = read_config_file(self.config) if self.config else {}
self._connection = get_redis_from_config(settings, self._connection = get_redis_from_config(settings, self.connection_class)
self.connection_class)
else: else:
self._connection = get_redis_from_config(os.environ, self._connection = get_redis_from_config(os.environ, self.connection_class)
self.connection_class)
return self._connection return self._connection

@ -30,8 +30,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa
Args: Args:
connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None. connection (Optional[Redis], optional): A Redis Connection instance. Defaults to None.
""" """
warnings.warn("The Conneciton context manager is deprecated. Use the `connection` parameter instead.", warnings.warn(
DeprecationWarning) "The Conneciton context manager is deprecated. Use the `connection` parameter instead.", DeprecationWarning
)
if connection is None: if connection is None:
connection = Redis() connection = Redis()
push_connection(connection) push_connection(connection)
@ -39,9 +40,9 @@ def Connection(connection: Optional['Redis'] = None): # noqa
yield yield
finally: finally:
popped = pop_connection() popped = pop_connection()
assert popped == connection, \ assert popped == connection, (
'Unexpected Redis connection was popped off the stack. ' \ 'Unexpected Redis connection was popped off the stack. ' 'Check your Redis connection setup.'
'Check your Redis connection setup.' )
def push_connection(redis: 'Redis'): def push_connection(redis: 'Redis'):
@ -72,8 +73,7 @@ def use_connection(redis: Optional['Redis'] = None):
Args: Args:
redis (Optional[Redis], optional): A Redis Connection. Defaults to None. redis (Optional[Redis], optional): A Redis Connection. Defaults to None.
""" """
assert len(_connection_stack) <= 1, \ assert len(_connection_stack) <= 1, 'You should not mix Connection contexts with use_connection()'
'You should not mix Connection contexts with use_connection()'
release_local(_connection_stack) release_local(_connection_stack)
if redis is None: if redis is None:
@ -118,5 +118,4 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
_connection_stack = LocalStack() _connection_stack = LocalStack()
__all__ = ['Connection', 'get_current_connection', 'push_connection', __all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', 'use_connection']
'pop_connection', 'use_connection']

@ -4,4 +4,5 @@ def register_sentry(sentry_dsn, **opts):
""" """
import sentry_sdk import sentry_sdk
from sentry_sdk.integrations.rq import RqIntegration from sentry_sdk.integrations.rq import RqIntegration
sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()], **opts) sentry_sdk.init(sentry_dsn, integrations=[RqIntegration()], **opts)

@ -13,12 +13,23 @@ from .utils import backend_class
class job: # noqa class job: # noqa
queue_class = Queue queue_class = Queue
def __init__(self, queue: Union['Queue', str], connection: Optional['Redis'] = None, timeout: Optional[int] = None, def __init__(
result_ttl: int = DEFAULT_RESULT_TTL, ttl: Optional[int] = None, self,
queue_class: Optional['Queue'] = None, depends_on: Optional[List[Any]] = None, at_front: Optional[bool] = None, queue: Union['Queue', str],
meta: Optional[Dict[Any, Any]] = None, description: Optional[str] = None, failure_ttl: Optional[int] = None, connection: Optional['Redis'] = None,
retry: Optional['Retry'] = None, on_failure: Optional[Callable[..., Any]] = None, timeout: Optional[int] = None,
on_success: Optional[Callable[..., Any]] = None): result_ttl: int = DEFAULT_RESULT_TTL,
ttl: Optional[int] = None,
queue_class: Optional['Queue'] = None,
depends_on: Optional[List[Any]] = None,
at_front: Optional[bool] = None,
meta: Optional[Dict[Any, Any]] = None,
description: Optional[str] = None,
failure_ttl: Optional[int] = None,
retry: Optional['Retry'] = None,
on_failure: Optional[Callable[..., Any]] = None,
on_success: Optional[Callable[..., Any]] = None,
):
"""A decorator that adds a ``delay`` method to the decorated function, """A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a required which in turn creates a RQ job when called. Accepts a required
``queue`` argument that can be either a ``Queue`` instance or a string ``queue`` argument that can be either a ``Queue`` instance or a string
@ -68,8 +79,7 @@ class job: # noqa
@wraps(f) @wraps(f)
def delay(*args, **kwargs): def delay(*args, **kwargs):
if isinstance(self.queue, str): if isinstance(self.queue, str):
queue = self.queue_class(name=self.queue, queue = self.queue_class(name=self.queue, connection=self.connection)
connection=self.connection)
else: else:
queue = self.queue queue = self.queue
@ -83,10 +93,23 @@ class job: # noqa
if not at_front: if not at_front:
at_front = self.at_front at_front = self.at_front
return queue.enqueue_call(f, args=args, kwargs=kwargs, return queue.enqueue_call(
timeout=self.timeout, result_ttl=self.result_ttl, f,
ttl=self.ttl, depends_on=depends_on, job_id=job_id, at_front=at_front, args=args,
meta=self.meta, description=self.description, failure_ttl=self.failure_ttl, kwargs=kwargs,
retry=self.retry, on_failure=self.on_failure, on_success=self.on_success) timeout=self.timeout,
result_ttl=self.result_ttl,
ttl=self.ttl,
depends_on=depends_on,
job_id=job_id,
at_front=at_front,
meta=self.meta,
description=self.description,
failure_ttl=self.failure_ttl,
retry=self.retry,
on_failure=self.on_failure,
on_success=self.on_success,
)
f.delay = delay f.delay = delay
return f return f

@ -88,4 +88,3 @@ DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
Uses Python's default attributes as defined Uses Python's default attributes as defined
https://docs.python.org/3/library/logging.html#logrecord-attributes https://docs.python.org/3/library/logging.html#logrecord-attributes
""" """

@ -8,8 +8,7 @@ from collections.abc import Iterable
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from enum import Enum from enum import Enum
from redis import WatchError from redis import WatchError
from typing import (TYPE_CHECKING, Any, Callable, Dict, Iterable, List, from typing import TYPE_CHECKING, Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
Optional, Tuple, Union)
from uuid import uuid4 from uuid import uuid4
@ -20,18 +19,27 @@ if TYPE_CHECKING:
from redis.client import Pipeline from redis.client import Pipeline
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import (DeserializationError, InvalidJobOperation, from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError
NoSuchJobError)
from .local import LocalStack from .local import LocalStack
from .serializers import resolve_serializer from .serializers import resolve_serializer
from .types import FunctionReferenceType, JobDependencyType from .types import FunctionReferenceType, JobDependencyType
from .utils import (as_text, decode_redis_hash, ensure_list, get_call_string, from .utils import (
get_version, import_attribute, parse_timeout, str_to_date, as_text,
utcformat, utcnow) decode_redis_hash,
ensure_list,
get_call_string,
get_version,
import_attribute,
parse_timeout,
str_to_date,
utcformat,
utcnow,
)
class JobStatus(str, Enum): class JobStatus(str, Enum):
"""The Status of Job within its lifecycle at any given time.""" """The Status of Job within its lifecycle at any given time."""
QUEUED = 'queued' QUEUED = 'queued'
FINISHED = 'finished' FINISHED = 'finished'
FAILED = 'failed' FAILED = 'failed'
@ -59,11 +67,7 @@ class Dependency:
ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty ValueError: If the `jobs` param has anything different than `str` or `Job` class or the job list is empty
""" """
dependent_jobs = ensure_list(jobs) dependent_jobs = ensure_list(jobs)
if not all( if not all(isinstance(job, Job) or isinstance(job, str) for job in dependent_jobs if job):
isinstance(job, Job) or isinstance(job, str)
for job in dependent_jobs
if job
):
raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids") raise ValueError("jobs: must contain objects of type Job and/or strings representing Job ids")
elif len(dependent_jobs) < 1: elif len(dependent_jobs) < 1:
raise ValueError("jobs: cannot be empty.") raise ValueError("jobs: cannot be empty.")
@ -79,8 +83,7 @@ yet been evaluated.
""" """
def cancel_job(job_id: str, connection: Optional['Redis'] = None, def cancel_job(job_id: str, connection: Optional['Redis'] = None, serializer=None, enqueue_dependents: bool = False):
serializer=None, enqueue_dependents: bool = False):
"""Cancels the job with the given job ID, preventing execution. """Cancels the job with the given job ID, preventing execution.
Use with caution. This will discard any job info (i.e. it can't be requeued later). Use with caution. This will discard any job info (i.e. it can't be requeued later).
@ -105,11 +108,9 @@ def get_current_job(connection: Optional['Redis'] = None, job_class: Optional['J
job (Optional[Job]): The current Job running job (Optional[Job]): The current Job running
""" """
if connection: if connection:
warnings.warn("connection argument for get_current_job is deprecated.", warnings.warn("connection argument for get_current_job is deprecated.", DeprecationWarning)
DeprecationWarning)
if job_class: if job_class:
warnings.warn("job_class argument for get_current_job is deprecated.", warnings.warn("job_class argument for get_current_job is deprecated.", DeprecationWarning)
DeprecationWarning)
return _job_stack.top return _job_stack.top
@ -130,19 +131,31 @@ def requeue_job(job_id: str, connection: 'Redis', serializer=None) -> 'Job':
class Job: class Job:
"""A Job is just a convenient datastructure to pass around job (meta) data.""" """A Job is just a convenient datastructure to pass around job (meta) data."""
redis_job_namespace_prefix = 'rq:job:' redis_job_namespace_prefix = 'rq:job:'
@classmethod @classmethod
def create(cls, func: FunctionReferenceType, args: Union[List[Any], Optional[Tuple]] = None, def create(
kwargs: Optional[Dict[str, Any]] = None, connection: Optional['Redis'] = None, cls,
result_ttl: Optional[int] = None, ttl: Optional[int] = None, func: FunctionReferenceType,
status: Optional[JobStatus] = None, description: Optional[str] =None, args: Union[List[Any], Optional[Tuple]] = None,
kwargs: Optional[Dict[str, Any]] = None,
connection: Optional['Redis'] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
status: Optional[JobStatus] = None,
description: Optional[str] = None,
depends_on: Optional[JobDependencyType] = None, depends_on: Optional[JobDependencyType] = None,
timeout: Optional[int] = None, id: Optional[str] = None, timeout: Optional[int] = None,
origin=None, meta: Optional[Dict[str, Any]] = None, id: Optional[str] = None,
failure_ttl: Optional[int] = None, serializer=None, *, origin=None,
meta: Optional[Dict[str, Any]] = None,
failure_ttl: Optional[int] = None,
serializer=None,
*,
on_success: Optional[Callable[..., Any]] = None, on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None) -> 'Job': on_failure: Optional[Callable[..., Any]] = None
) -> 'Job':
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
@ -247,10 +260,7 @@ class Job:
depends_on_list = depends_on.dependencies depends_on_list = depends_on.dependencies
else: else:
depends_on_list = ensure_list(depends_on) depends_on_list = ensure_list(depends_on)
job._dependency_ids = [ job._dependency_ids = [dep.id if isinstance(dep, Job) else dep for dep in depends_on_list]
dep.id if isinstance(dep, Job) else dep
for dep in depends_on_list
]
return job return job
@ -261,6 +271,7 @@ class Job:
position (Optional[int]): The position position (Optional[int]): The position
""" """
from .queue import Queue from .queue import Queue
if self.origin: if self.origin:
q = Queue(name=self.origin, connection=self.connection) q = Queue(name=self.origin, connection=self.connection)
return q.get_job_position(self._id) return q.get_job_position(self._id)
@ -580,17 +591,14 @@ class Job:
self.enqueue_at_front: Optional[bool] = None self.enqueue_at_front: Optional[bool] = None
from .results import Result from .results import Result
self._cached_result: Optional[Result] = None self._cached_result: Optional[Result] = None
def __repr__(self): # noqa # pragma: no cover def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, self._id, self.enqueued_at)
self._id,
self.enqueued_at)
def __str__(self): def __str__(self):
return '<{0} {1}: {2}>'.format(self.__class__.__name__, return '<{0} {1}: {2}>'.format(self.__class__.__name__, self.id, self.description)
self.id,
self.description)
def __eq__(self, other): # noqa def __eq__(self, other): # noqa
return isinstance(other, self.__class__) and self.id == other.id return isinstance(other, self.__class__) and self.id == other.id
@ -694,10 +702,11 @@ class Job:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if watch and self._dependency_ids: if watch and self._dependency_ids:
connection.watch(*[self.key_for(dependency_id) connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids])
for dependency_id in self._dependency_ids])
dependencies_list = self.fetch_many(self._dependency_ids, connection=self.connection, serializer=self.serializer) dependencies_list = self.fetch_many(
self._dependency_ids, connection=self.connection, serializer=self.serializer
)
jobs = [job for job in dependencies_list if job] jobs = [job for job in dependencies_list if job]
return jobs return jobs
@ -706,8 +715,7 @@ class Job:
""" """
Get the latest result and returns `exc_info` only if the latest result is a failure. Get the latest result and returns `exc_info` only if the latest result is a failure.
""" """
warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", warnings.warn("job.exc_info is deprecated, use job.latest_result() instead.", DeprecationWarning)
DeprecationWarning)
from .results import Result from .results import Result
@ -730,6 +738,7 @@ class Job:
result (Optional[Any]): The job return value. result (Optional[Any]): The job return value.
""" """
from .results import Result from .results import Result
if refresh: if refresh:
self._cached_result = None self._cached_result = None
@ -762,8 +771,7 @@ class Job:
seconds by default). seconds by default).
""" """
warnings.warn("job.result is deprecated, use job.return_value instead.", warnings.warn("job.result is deprecated, use job.return_value instead.", DeprecationWarning)
DeprecationWarning)
from .results import Result from .results import Result
@ -789,6 +797,7 @@ class Job:
all_results (List[Result]): A list of 'Result' objects all_results (List[Result]): A list of 'Result' objects
""" """
from .results import Result from .results import Result
return Result.all(self, serializer=self.serializer) return Result.all(self, serializer=self.serializer)
def latest_result(self) -> Optional['Result']: def latest_result(self) -> Optional['Result']:
@ -799,6 +808,7 @@ class Job:
""" """
"""Returns the latest Result object""" """Returns the latest Result object"""
from .results import Result from .results import Result
return Result.fetch_latest(self, serializer=self.serializer) return Result.fetch_latest(self, serializer=self.serializer)
def restore(self, raw_data) -> Any: def restore(self, raw_data) -> Any:
@ -849,8 +859,7 @@ class Job:
dep_ids = obj.get('dependency_ids') dep_ids = obj.get('dependency_ids')
dep_id = obj.get('dependency_id') # for backwards compatibility dep_id = obj.get('dependency_id') # for backwards compatibility
self._dependency_ids = (json.loads(dep_ids.decode()) if dep_ids self._dependency_ids = json.loads(dep_ids.decode()) if dep_ids else [dep_id.decode()] if dep_id else []
else [dep_id.decode()] if dep_id else [])
allow_failures = obj.get('allow_dependency_failures') allow_failures = obj.get('allow_dependency_failures')
self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None self.allow_dependency_failures = bool(int(allow_failures)) if allow_failures else None
self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None self.enqueue_at_front = bool(int(obj['enqueue_at_front'])) if 'enqueue_at_front' in obj else None
@ -902,7 +911,7 @@ class Job:
'started_at': utcformat(self.started_at) if self.started_at else '', 'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
'worker_name': self.worker_name or '' 'worker_name': self.worker_name or '',
} }
if self.retries_left is not None: if self.retries_left is not None:
@ -948,8 +957,7 @@ class Job:
return obj return obj
def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, def save(self, pipeline: Optional['Pipeline'] = None, include_meta: bool = True, include_result: bool = True):
include_result: bool = True):
"""Dumps the current job instance to its corresponding Redis key. """Dumps the current job instance to its corresponding Redis key.
Exclude saving the `meta` dictionary by setting Exclude saving the `meta` dictionary by setting
@ -1016,15 +1024,13 @@ class Job:
raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id())) raise InvalidJobOperation("Cannot cancel already canceled job: {}".format(self.get_id()))
from .registry import CanceledJobRegistry from .registry import CanceledJobRegistry
from .queue import Queue from .queue import Queue
pipe = pipeline or self.connection.pipeline() pipe = pipeline or self.connection.pipeline()
while True: while True:
try: try:
q = Queue( q = Queue(
name=self.origin, name=self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
connection=self.connection,
job_class=self.__class__,
serializer=self.serializer
) )
self.set_status(JobStatus.CANCELED, pipeline=pipe) self.set_status(JobStatus.CANCELED, pipeline=pipe)
@ -1033,16 +1039,10 @@ class Job:
if pipeline is None: if pipeline is None:
pipe.watch(self.dependents_key) pipe.watch(self.dependents_key)
q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id) q.enqueue_dependents(self, pipeline=pipeline, exclude_job_id=self.id)
self._remove_from_registries( self._remove_from_registries(pipeline=pipe, remove_from_queue=True)
pipeline=pipe,
remove_from_queue=True
)
registry = CanceledJobRegistry( registry = CanceledJobRegistry(
self.origin, self.origin, self.connection, job_class=self.__class__, serializer=self.serializer
self.connection,
job_class=self.__class__,
serializer=self.serializer
) )
registry.add(self, pipeline=pipe) registry.add(self, pipeline=pipe)
if pipeline is None: if pipeline is None:
@ -1070,41 +1070,43 @@ class Job:
def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True): def _remove_from_registries(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
from .registry import BaseRegistry from .registry import BaseRegistry
if remove_from_queue: if remove_from_queue:
from .queue import Queue from .queue import Queue
q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer) q = Queue(name=self.origin, connection=self.connection, serializer=self.serializer)
q.remove(self, pipeline=pipeline) q.remove(self, pipeline=pipeline)
registry: BaseRegistry registry: BaseRegistry
if self.is_finished: if self.is_finished:
from .registry import FinishedJobRegistry from .registry import FinishedJobRegistry
registry = FinishedJobRegistry(self.origin,
connection=self.connection, registry = FinishedJobRegistry(
job_class=self.__class__, self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
serializer=self.serializer) )
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_deferred: elif self.is_deferred:
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin,
connection=self.connection, registry = DeferredJobRegistry(
job_class=self.__class__, self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
serializer=self.serializer) )
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_started: elif self.is_started:
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
registry = StartedJobRegistry(self.origin,
connection=self.connection, registry = StartedJobRegistry(
job_class=self.__class__, self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
serializer=self.serializer) )
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_scheduled: elif self.is_scheduled:
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(self.origin,
connection=self.connection, registry = ScheduledJobRegistry(
job_class=self.__class__, self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
serializer=self.serializer) )
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
elif self.is_failed or self.is_stopped: elif self.is_failed or self.is_stopped:
@ -1112,13 +1114,15 @@ class Job:
elif self.is_canceled: elif self.is_canceled:
from .registry import CanceledJobRegistry from .registry import CanceledJobRegistry
registry = CanceledJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__, registry = CanceledJobRegistry(
serializer=self.serializer) self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
registry.remove(self, pipeline=pipeline) registry.remove(self, pipeline=pipeline)
def delete(self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, def delete(
delete_dependents: bool = False): self, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True, delete_dependents: bool = False
):
"""Cancels the job and deletes the job hash from Redis. Jobs depending """Cancels the job and deletes the job hash from Redis. Jobs depending
on this job can optionally be deleted as well. on this job can optionally be deleted as well.
@ -1184,7 +1188,7 @@ class Job:
'last_heartbeat': utcformat(self.last_heartbeat), 'last_heartbeat': utcformat(self.last_heartbeat),
'status': self._status, 'status': self._status,
'started_at': utcformat(self.started_at), # type: ignore 'started_at': utcformat(self.started_at), # type: ignore
'worker_name': worker_name 'worker_name': worker_name,
} }
if self.get_redis_server_version() >= (4, 0, 0): if self.get_redis_server_version() >= (4, 0, 0):
pipeline.hset(self.key, mapping=mapping) pipeline.hset(self.key, mapping=mapping)
@ -1244,8 +1248,7 @@ class Job:
call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75) call_repr = get_call_string(self.func_name, self.args, self.kwargs, max_length=75)
return call_repr return call_repr
def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, def cleanup(self, ttl: Optional[int] = None, pipeline: Optional['Pipeline'] = None, remove_from_queue: bool = True):
remove_from_queue: bool = True):
"""Prepare job for eventual deletion (if needed). """Prepare job for eventual deletion (if needed).
This method is usually called after successful execution. This method is usually called after successful execution.
How long we persist the job and its result depends on the value of ttl: How long we persist the job and its result depends on the value of ttl:
@ -1271,16 +1274,18 @@ class Job:
@property @property
def started_job_registry(self): def started_job_registry(self):
from .registry import StartedJobRegistry from .registry import StartedJobRegistry
return StartedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__, return StartedJobRegistry(
serializer=self.serializer) self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
@property @property
def failed_job_registry(self): def failed_job_registry(self):
from .registry import FailedJobRegistry from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__, return FailedJobRegistry(
serializer=self.serializer) self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
)
def get_retry_interval(self) -> int: def get_retry_interval(self) -> int:
"""Returns the desired retry interval. """Returns the desired retry interval.
@ -1332,10 +1337,9 @@ class Job:
""" """
from .registry import DeferredJobRegistry from .registry import DeferredJobRegistry
registry = DeferredJobRegistry(self.origin, registry = DeferredJobRegistry(
connection=self.connection, self.origin, connection=self.connection, job_class=self.__class__, serializer=self.serializer
job_class=self.__class__, )
serializer=self.serializer)
registry.add(self, pipeline=pipeline) registry.add(self, pipeline=pipeline)
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
@ -1348,11 +1352,14 @@ class Job:
@property @property
def dependency_ids(self) -> List[bytes]: def dependency_ids(self) -> List[bytes]:
dependencies = self.connection.smembers(self.dependencies_key) dependencies = self.connection.smembers(self.dependencies_key)
return [Job.key_for(_id.decode()) return [Job.key_for(_id.decode()) for _id in dependencies]
for _id in dependencies]
def dependencies_are_met(
def dependencies_are_met(self, parent_job: Optional['Job'] = None, pipeline: Optional['Pipeline'] = None, self,
exclude_job_id: Optional[str] = None) -> bool: parent_job: Optional['Job'] = None,
pipeline: Optional['Pipeline'] = None,
exclude_job_id: Optional[str] = None,
) -> bool:
"""Returns a boolean indicating if all of this job's dependencies are `FINISHED` """Returns a boolean indicating if all of this job's dependencies are `FINISHED`
If a pipeline is passed, all dependencies are WATCHed. If a pipeline is passed, all dependencies are WATCHed.
@ -1373,8 +1380,7 @@ class Job:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
if pipeline is not None: if pipeline is not None:
connection.watch(*[self.key_for(dependency_id) connection.watch(*[self.key_for(dependency_id) for dependency_id in self._dependency_ids])
for dependency_id in self._dependency_ids])
dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)} dependencies_ids = {_id.decode() for _id in connection.smembers(self.dependencies_key)}
@ -1407,12 +1413,7 @@ class Job:
if self.allow_dependency_failures: if self.allow_dependency_failures:
allowed_statuses.append(JobStatus.FAILED) allowed_statuses.append(JobStatus.FAILED)
return all( return all(status.decode() in allowed_statuses for status in dependencies_statuses if status)
status.decode() in allowed_statuses
for status
in dependencies_statuses
if status
)
_job_stack = LocalStack() _job_stack = LocalStack()

@ -122,6 +122,7 @@ class LocalStack:
def _set__ident_func__(self, value): # noqa def _set__ident_func__(self, value): # noqa
object.__setattr__(self._local, '__ident_func__', value) object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__) __ident_func__ = property(_get__ident_func__, _set__ident_func__)
del _get__ident_func__, _set__ident_func__ del _get__ident_func__, _set__ident_func__
@ -131,6 +132,7 @@ class LocalStack:
if rv is None: if rv is None:
raise RuntimeError('object unbound') raise RuntimeError('object unbound')
return rv return rv
return LocalProxy(_lookup) return LocalProxy(_lookup)
def push(self, obj): def push(self, obj):
@ -223,10 +225,7 @@ class LocalManager:
release_local(local) release_local(local)
def __repr__(self): def __repr__(self):
return '<%s storages: %d>' % ( return '<%s storages: %d>' % (self.__class__.__name__, len(self.locals))
self.__class__.__name__,
len(self.locals)
)
class LocalProxy: class LocalProxy:
@ -264,6 +263,7 @@ class LocalProxy:
.. versionchanged:: 0.6.1 .. versionchanged:: 0.6.1
The class can be instanciated with a callable as well now. The class can be instanciated with a callable as well now.
""" """
__slots__ = ('__local', '__dict__', '__name__') __slots__ = ('__local', '__dict__', '__name__')
def __init__(self, local, name=None): def __init__(self, local, name=None):

@ -3,12 +3,15 @@ import sys
from typing import Union from typing import Union
from rq.utils import ColorizingStreamHandler from rq.utils import ColorizingStreamHandler
from rq.defaults import (DEFAULT_LOGGING_FORMAT, from rq.defaults import DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT
DEFAULT_LOGGING_DATE_FORMAT)
def setup_loghandlers(level: Union[int, str, None] = None, date_format: str = DEFAULT_LOGGING_DATE_FORMAT, def setup_loghandlers(
log_format: str = DEFAULT_LOGGING_FORMAT, name: str = 'rq.worker'): level: Union[int, str, None] = None,
date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT,
name: str = 'rq.worker',
):
"""Sets up a log handler. """Sets up a log handler.
Args: Args:

@ -30,11 +30,27 @@ blue = make_colorizer('darkblue')
logger = logging.getLogger("rq.queue") logger = logging.getLogger("rq.queue")
class EnqueueData(
class EnqueueData(namedtuple('EnqueueData', ["func", "args", "kwargs", "timeout", namedtuple(
"result_ttl", "ttl", "failure_ttl", 'EnqueueData',
"description", "job_id", [
"at_front", "meta", "retry", "on_success", "on_failure"])): "func",
"args",
"kwargs",
"timeout",
"result_ttl",
"ttl",
"failure_ttl",
"description",
"job_id",
"at_front",
"meta",
"retry",
"on_success",
"on_failure",
],
)
):
"""Helper type to use when calling enqueue_many """Helper type to use when calling enqueue_many
NOTE: Does not support `depends_on` yet. NOTE: Does not support `depends_on` yet.
""" """
@ -50,7 +66,9 @@ class Queue:
redis_queues_keys: str = 'rq:queues' redis_queues_keys: str = 'rq:queues'
@classmethod @classmethod
def all(cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None) -> List['Queue']: def all(
cls, connection: Optional['Redis'] = None, job_class: Optional[Type['Job']] = None, serializer=None
) -> List['Queue']:
"""Returns an iterable of all Queues. """Returns an iterable of all Queues.
Args: Args:
@ -64,17 +82,22 @@ class Queue:
connection = resolve_connection(connection) connection = resolve_connection(connection)
def to_queue(queue_key): def to_queue(queue_key):
return cls.from_queue_key(as_text(queue_key), return cls.from_queue_key(
connection=connection, as_text(queue_key), connection=connection, job_class=job_class, serializer=serializer
job_class=job_class, serializer=serializer) )
all_registerd_queues = connection.smembers(cls.redis_queues_keys) all_registerd_queues = connection.smembers(cls.redis_queues_keys)
all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key] all_queues = [to_queue(rq_key) for rq_key in all_registerd_queues if rq_key]
return all_queues return all_queues
@classmethod @classmethod
def from_queue_key(cls, queue_key: str, connection: Optional['Redis'] = None, def from_queue_key(
job_class: Optional['Job'] = None, serializer: Any = None) -> 'Queue': cls,
queue_key: str,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
) -> 'Queue':
"""Returns a Queue instance, based on the naming conventions for naming """Returns a Queue instance, based on the naming conventions for naming
the internal Redis keys. Can be used to reverse-lookup Queues by their the internal Redis keys. Can be used to reverse-lookup Queues by their
Redis keys. Redis keys.
@ -97,8 +120,16 @@ class Queue:
name = queue_key[len(prefix) :] name = queue_key[len(prefix) :]
return cls(name, connection=connection, job_class=job_class, serializer=serializer) return cls(name, connection=connection, job_class=job_class, serializer=serializer)
def __init__(self, name: str = 'default', default_timeout: Optional[int] = None, connection: Optional['Redis'] = None, def __init__(
is_async: bool = True, job_class: Union[str, Type['Job'], None] = None, serializer: Any = None, **kwargs): self,
name: str = 'default',
default_timeout: Optional[int] = None,
connection: Optional['Redis'] = None,
is_async: bool = True,
job_class: Union[str, Type['Job'], None] = None,
serializer: Any = None,
**kwargs,
):
"""Initializes a Queue object. """Initializes a Queue object.
Args: Args:
@ -201,7 +232,11 @@ class Queue:
count = count + 1 count = count + 1
end end
return count return count
""".format(self.job_class.redis_job_namespace_prefix).encode("utf-8") """.format(
self.job_class.redis_job_namespace_prefix
).encode(
"utf-8"
)
script = self.connection.register_script(script) script = self.connection.register_script(script)
return script(keys=[self.key]) return script(keys=[self.key])
@ -293,11 +328,7 @@ class Queue:
end = offset + (length - 1) end = offset + (length - 1)
else: else:
end = length end = length
job_ids = [ job_ids = [as_text(job_id) for job_id in self.connection.lrange(self.key, start, end)]
as_text(job_id)
for job_id
in self.connection.lrange(self.key, start, end)
]
self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.")
return job_ids return job_ids
@ -333,18 +364,21 @@ class Queue:
def failed_job_registry(self): def failed_job_registry(self):
"""Returns this queue's FailedJobRegistry.""" """Returns this queue's FailedJobRegistry."""
from rq.registry import FailedJobRegistry from rq.registry import FailedJobRegistry
return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return FailedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def started_job_registry(self): def started_job_registry(self):
"""Returns this queue's StartedJobRegistry.""" """Returns this queue's StartedJobRegistry."""
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return StartedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def finished_job_registry(self): def finished_job_registry(self):
"""Returns this queue's FinishedJobRegistry.""" """Returns this queue's FinishedJobRegistry."""
from rq.registry import FinishedJobRegistry from rq.registry import FinishedJobRegistry
# TODO: Why was job_class only ommited here before? Was it intentional? # TODO: Why was job_class only ommited here before? Was it intentional?
return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return FinishedJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@ -352,18 +386,21 @@ class Queue:
def deferred_job_registry(self): def deferred_job_registry(self):
"""Returns this queue's DeferredJobRegistry.""" """Returns this queue's DeferredJobRegistry."""
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return DeferredJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def scheduled_job_registry(self): def scheduled_job_registry(self):
"""Returns this queue's ScheduledJobRegistry.""" """Returns this queue's ScheduledJobRegistry."""
from rq.registry import ScheduledJobRegistry from rq.registry import ScheduledJobRegistry
return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return ScheduledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
@property @property
def canceled_job_registry(self): def canceled_job_registry(self):
"""Returns this queue's CanceledJobRegistry.""" """Returns this queue's CanceledJobRegistry."""
from rq.registry import CanceledJobRegistry from rq.registry import CanceledJobRegistry
return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer) return CanceledJobRegistry(queue=self, job_class=self.job_class, serializer=self.serializer)
def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None): def remove(self, job_or_id: Union['Job', str], pipeline: Optional['Pipeline'] = None):
@ -387,8 +424,7 @@ class Queue:
"""Removes all "dead" jobs from the queue by cycling through it, """Removes all "dead" jobs from the queue by cycling through it,
while guaranteeing FIFO semantics. while guaranteeing FIFO semantics.
""" """
COMPACT_QUEUE = '{0}_compact:{1}'.format( COMPACT_QUEUE = '{0}_compact:{1}'.format(self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa
self.redis_queue_namespace_prefix, uuid.uuid4()) # noqa
self.connection.rename(self.key, COMPACT_QUEUE) self.connection.rename(self.key, COMPACT_QUEUE)
while True: while True:
@ -414,12 +450,25 @@ class Queue:
result = connection.rpush(self.key, job_id) result = connection.rpush(self.key, job_id)
self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.")
def create_job(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, def create_job(
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, self,
failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType']=None, func: 'FunctionReferenceType',
job_id: Optional[str] = None, meta: Optional[Dict] = None, status: JobStatus = JobStatus.QUEUED, args: Union[Tuple, List, None] = None,
retry: Optional['Retry'] = None, *, on_success: Optional[Callable] = None, kwargs: Optional[Dict] = None,
on_failure: Optional[Callable] = None) -> Job: timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
meta: Optional[Dict] = None,
status: JobStatus = JobStatus.QUEUED,
retry: Optional['Retry'] = None,
*,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> Job:
"""Creates a job based on parameters given """Creates a job based on parameters given
Args: Args:
@ -461,12 +510,23 @@ class Queue:
raise ValueError('Job ttl must be greater than 0') raise ValueError('Job ttl must be greater than 0')
job = self.job_class.create( job = self.job_class.create(
func, args=args, kwargs=kwargs, connection=self.connection, func,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, args=args,
status=status, description=description, kwargs=kwargs,
depends_on=depends_on, timeout=timeout, id=job_id, connection=self.connection,
origin=self.name, meta=meta, serializer=self.serializer, on_success=on_success, result_ttl=result_ttl,
on_failure=on_failure ttl=ttl,
failure_ttl=failure_ttl,
status=status,
description=description,
depends_on=depends_on,
timeout=timeout,
id=job_id,
origin=self.name,
meta=meta,
serializer=self.serializer,
on_success=on_success,
on_failure=on_failure,
) )
if retry: if retry:
@ -501,10 +561,7 @@ class Queue:
# is called from within this method. # is called from within this method.
pipe.watch(job.dependencies_key) pipe.watch(job.dependencies_key)
dependencies = job.fetch_dependencies( dependencies = job.fetch_dependencies(watch=True, pipeline=pipe)
watch=True,
pipeline=pipe
)
pipe.multi() pipe.multi()
@ -535,12 +592,25 @@ class Queue:
pipeline.multi() # Ensure pipeline in multi mode before returning to caller pipeline.multi() # Ensure pipeline in multi mode before returning to caller
return job return job
def enqueue_call(self, func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, def enqueue_call(
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, self,
failure_ttl: Optional[int] = None, description: Optional[str] = None, depends_on: Optional['JobDependencyType'] = None, func: 'FunctionReferenceType',
job_id: Optional[str] = None, at_front: bool = False, meta: Optional[Dict] = None, args: Union[Tuple, List, None] = None,
retry: Optional['Retry'] = None, on_success: Optional[Callable[..., Any]] = None, kwargs: Optional[Dict] = None,
on_failure: Optional[Callable[..., Any]] = None, pipeline: Optional['Pipeline'] = None) -> Job: timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
depends_on: Optional['JobDependencyType'] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable[..., Any]] = None,
on_failure: Optional[Callable[..., Any]] = None,
pipeline: Optional['Pipeline'] = None,
) -> Job:
"""Creates a job to represent the delayed function call and enqueues it. """Creates a job to represent the delayed function call and enqueues it.
It is much like `.enqueue()`, except that it takes the function's args It is much like `.enqueue()`, except that it takes the function's args
@ -570,27 +640,46 @@ class Queue:
""" """
job = self.create_job( job = self.create_job(
func, args=args, kwargs=kwargs, result_ttl=result_ttl, ttl=ttl, func,
failure_ttl=failure_ttl, description=description, depends_on=depends_on, args=args,
job_id=job_id, meta=meta, status=JobStatus.QUEUED, timeout=timeout, kwargs=kwargs,
retry=retry, on_success=on_success, on_failure=on_failure result_ttl=result_ttl,
ttl=ttl,
failure_ttl=failure_ttl,
description=description,
depends_on=depends_on,
job_id=job_id,
meta=meta,
status=JobStatus.QUEUED,
timeout=timeout,
retry=retry,
on_success=on_success,
on_failure=on_failure,
) )
job = self.setup_dependencies( job = self.setup_dependencies(job, pipeline=pipeline)
job,
pipeline=pipeline
)
# If we do not depend on an unfinished job, enqueue the job. # If we do not depend on an unfinished job, enqueue the job.
if job.get_status(refresh=False) != JobStatus.DEFERRED: if job.get_status(refresh=False) != JobStatus.DEFERRED:
return self.enqueue_job(job, pipeline=pipeline, at_front=at_front) return self.enqueue_job(job, pipeline=pipeline, at_front=at_front)
return job return job
@staticmethod @staticmethod
def prepare_data(func: 'FunctionReferenceType', args: Union[Tuple, List, None] = None, kwargs: Optional[Dict] = None, def prepare_data(
timeout: Optional[int] = None, result_ttl: Optional[int] = None, ttl: Optional[int] = None, func: 'FunctionReferenceType',
failure_ttl: Optional[int] = None, description: Optional[str] = None, job_id: Optional[str] = None, args: Union[Tuple, List, None] = None,
at_front: bool = False, meta: Optional[Dict] = None, retry: Optional['Retry'] = None, kwargs: Optional[Dict] = None,
on_success: Optional[Callable] = None, on_failure: Optional[Callable] = None) -> EnqueueData: timeout: Optional[int] = None,
result_ttl: Optional[int] = None,
ttl: Optional[int] = None,
failure_ttl: Optional[int] = None,
description: Optional[str] = None,
job_id: Optional[str] = None,
at_front: bool = False,
meta: Optional[Dict] = None,
retry: Optional['Retry'] = None,
on_success: Optional[Callable] = None,
on_failure: Optional[Callable] = None,
) -> EnqueueData:
"""Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples """Need this till support dropped for python_version < 3.7, where defaults can be specified for named tuples
And can keep this logic within EnqueueData And can keep this logic within EnqueueData
@ -614,10 +703,20 @@ class Queue:
EnqueueData: The EnqueueData EnqueueData: The EnqueueData
""" """
return EnqueueData( return EnqueueData(
func, args, kwargs, timeout, func,
result_ttl, ttl, failure_ttl, args,
description, job_id, kwargs,
at_front, meta, retry, on_success, on_failure timeout,
result_ttl,
ttl,
failure_ttl,
description,
job_id,
at_front,
meta,
retry,
on_success,
on_failure,
) )
def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]: def enqueue_many(self, job_datas: List['EnqueueData'], pipeline: Optional['Pipeline'] = None) -> List[Job]:
@ -635,18 +734,24 @@ class Queue:
jobs = [ jobs = [
self.enqueue_job( self.enqueue_job(
self.create_job( self.create_job(
job_data.func, args=job_data.args, kwargs=job_data.kwargs, result_ttl=job_data.result_ttl, job_data.func,
args=job_data.args,
kwargs=job_data.kwargs,
result_ttl=job_data.result_ttl,
ttl=job_data.ttl, ttl=job_data.ttl,
failure_ttl=job_data.failure_ttl, description=job_data.description, failure_ttl=job_data.failure_ttl,
description=job_data.description,
depends_on=None, depends_on=None,
job_id=job_data.job_id, meta=job_data.meta, status=JobStatus.QUEUED, job_id=job_data.job_id,
meta=job_data.meta,
status=JobStatus.QUEUED,
timeout=job_data.timeout, timeout=job_data.timeout,
retry=job_data.retry, retry=job_data.retry,
on_success=job_data.on_success, on_success=job_data.on_success,
on_failure=job_data.on_failure on_failure=job_data.on_failure,
), ),
pipeline=pipe, pipeline=pipe,
at_front=job_data.at_front at_front=job_data.at_front,
) )
for job_data in job_datas for job_data in job_datas
] ]
@ -687,8 +792,7 @@ class Queue:
kwargs (*kwargs): function kargs kwargs (*kwargs): function kargs
""" """
if not isinstance(f, str) and f.__module__ == '__main__': if not isinstance(f, str) and f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed ' raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers')
'by workers')
# Detect explicit invocations, i.e. of the form: # Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30) # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, job_timeout=30)
@ -711,9 +815,24 @@ class Queue:
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return (f, timeout, description, result_ttl, ttl, failure_ttl, return (
depends_on, job_id, at_front, meta, retry, on_success, on_failure, f,
pipeline, args, kwargs) timeout,
description,
result_ttl,
ttl,
failure_ttl,
depends_on,
job_id,
at_front,
meta,
retry,
on_success,
on_failure,
pipeline,
args,
kwargs,
)
def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job': def enqueue(self, f: 'FunctionReferenceType', *args, **kwargs) -> 'Job':
"""Creates a job to represent the delayed function call and enqueues it. """Creates a job to represent the delayed function call and enqueues it.
@ -727,16 +846,42 @@ class Queue:
Returns: Returns:
job (Job): The created Job job (Job): The created Job
""" """
(f, timeout, description, result_ttl, ttl, failure_ttl, (
depends_on, job_id, at_front, meta, retry, on_success, f,
on_failure, pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) timeout,
description,
result_ttl,
ttl,
failure_ttl,
depends_on,
job_id,
at_front,
meta,
retry,
on_success,
on_failure,
pipeline,
args,
kwargs,
) = Queue.parse_args(f, *args, **kwargs)
return self.enqueue_call( return self.enqueue_call(
func=f, args=args, kwargs=kwargs, timeout=timeout, func=f,
result_ttl=result_ttl, ttl=ttl, failure_ttl=failure_ttl, args=args,
description=description, depends_on=depends_on, job_id=job_id, kwargs=kwargs,
at_front=at_front, meta=meta, retry=retry, on_success=on_success, on_failure=on_failure, timeout=timeout,
pipeline=pipeline result_ttl=result_ttl,
ttl=ttl,
failure_ttl=failure_ttl,
description=description,
depends_on=depends_on,
job_id=job_id,
at_front=at_front,
meta=meta,
retry=retry,
on_success=on_success,
on_failure=on_failure,
pipeline=pipeline,
) )
def enqueue_at(self, datetime: datetime, f, *args, **kwargs): def enqueue_at(self, datetime: datetime, f, *args, **kwargs):
@ -749,14 +894,41 @@ class Queue:
Returns: Returns:
_type_: _description_ _type_: _description_
""" """
(f, timeout, description, result_ttl, ttl, failure_ttl, (
depends_on, job_id, at_front, meta, retry, on_success, on_failure, f,
pipeline, args, kwargs) = Queue.parse_args(f, *args, **kwargs) timeout,
job = self.create_job(f, status=JobStatus.SCHEDULED, args=args, kwargs=kwargs, description,
timeout=timeout, result_ttl=result_ttl, ttl=ttl, result_ttl,
failure_ttl=failure_ttl, description=description, ttl,
depends_on=depends_on, job_id=job_id, meta=meta, retry=retry, failure_ttl,
on_success=on_success, on_failure=on_failure) depends_on,
job_id,
at_front,
meta,
retry,
on_success,
on_failure,
pipeline,
args,
kwargs,
) = Queue.parse_args(f, *args, **kwargs)
job = self.create_job(
f,
status=JobStatus.SCHEDULED,
args=args,
kwargs=kwargs,
timeout=timeout,
result_ttl=result_ttl,
ttl=ttl,
failure_ttl=failure_ttl,
description=description,
depends_on=depends_on,
job_id=job_id,
meta=meta,
retry=retry,
on_success=on_success,
on_failure=on_failure,
)
if at_front: if at_front:
job.enqueue_at_front = True job.enqueue_at_front = True
return self.schedule_job(job, datetime, pipeline=pipeline) return self.schedule_job(job, datetime, pipeline=pipeline)
@ -773,6 +945,7 @@ class Queue:
_type_: _description_ _type_: _description_
""" """
from .registry import ScheduledJobRegistry from .registry import ScheduledJobRegistry
registry = ScheduledJobRegistry(queue=self) registry = ScheduledJobRegistry(queue=self)
pipe = pipeline if pipeline is not None else self.connection.pipeline() pipe = pipeline if pipeline is not None else self.connection.pipeline()
@ -795,8 +968,7 @@ class Queue:
Returns: Returns:
job (Job): The enqueued Job job (Job): The enqueued Job
""" """
return self.enqueue_at(datetime.now(timezone.utc) + time_delta, return self.enqueue_at(datetime.now(timezone.utc) + time_delta, func, *args, **kwargs)
func, *args, **kwargs)
def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job: def enqueue_job(self, job: 'Job', pipeline: Optional['Pipeline'] = None, at_front: bool = False) -> Job:
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
@ -861,7 +1033,9 @@ class Queue:
return job return job
def enqueue_dependents(self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None): def enqueue_dependents(
self, job: 'Job', pipeline: Optional['Pipeline'] = None, exclude_job_id: Optional[str] = None
):
"""Enqueues all jobs in the given job's dependents set and clears it. """Enqueues all jobs in the given job's dependents set and clears it.
When called without a pipeline, this method uses WATCH/MULTI/EXEC. When called without a pipeline, this method uses WATCH/MULTI/EXEC.
@ -886,20 +1060,19 @@ class Queue:
if pipeline is None: if pipeline is None:
pipe.watch(dependents_key) pipe.watch(dependents_key)
dependent_job_ids = {as_text(_id) dependent_job_ids = {as_text(_id) for _id in pipe.smembers(dependents_key)}
for _id in pipe.smembers(dependents_key)}
# There's no dependents # There's no dependents
if not dependent_job_ids: if not dependent_job_ids:
break break
jobs_to_enqueue = [ jobs_to_enqueue = [
dependent_job for dependent_job dependent_job
in self.job_class.fetch_many( for dependent_job in self.job_class.fetch_many(
dependent_job_ids, dependent_job_ids, connection=self.connection, serializer=self.serializer
connection=self.connection, )
serializer=self.serializer if dependent_job
) if dependent_job and dependent_job.dependencies_are_met( and dependent_job.dependencies_are_met(
parent_job=job, parent_job=job,
pipeline=pipe, pipeline=pipe,
exclude_job_id=exclude_job_id, exclude_job_id=exclude_job_id,
@ -914,10 +1087,9 @@ class Queue:
for dependent in jobs_to_enqueue: for dependent in jobs_to_enqueue:
enqueue_at_front = dependent.enqueue_at_front or False enqueue_at_front = dependent.enqueue_at_front or False
registry = DeferredJobRegistry(dependent.origin, registry = DeferredJobRegistry(
self.connection, dependent.origin, self.connection, job_class=self.job_class, serializer=self.serializer
job_class=self.job_class, )
serializer=self.serializer)
registry.remove(dependent, pipeline=pipe) registry.remove(dependent, pipeline=pipe)
if dependent.origin == self.name: if dependent.origin == self.name:
@ -1000,8 +1172,14 @@ class Queue:
return None return None
@classmethod @classmethod
def dequeue_any(cls, queues: List['Queue'], timeout: int, connection: Optional['Redis'] = None, def dequeue_any(
job_class: Optional['Job'] = None, serializer: Any = None) -> Tuple['Job', 'Queue']: cls,
queues: List['Queue'],
timeout: int,
connection: Optional['Redis'] = None,
job_class: Optional['Job'] = None,
serializer: Any = None,
) -> Tuple['Job', 'Queue']:
"""Class method returning the job_class instance at the front of the given """Class method returning the job_class instance at the front of the given
set of Queues, where the order of the queues is important. set of Queues, where the order of the queues is important.
@ -1033,10 +1211,7 @@ class Queue:
if result is None: if result is None:
return None return None
queue_key, job_id = map(as_text, result) queue_key, job_id = map(as_text, result)
queue = cls.from_queue_key(queue_key, queue = cls.from_queue_key(queue_key, connection=connection, job_class=job_class, serializer=serializer)
connection=connection,
job_class=job_class,
serializer=serializer)
try: try:
job = job_class.fetch(job_id, connection=connection, serializer=serializer) job = job_class.fetch(job_id, connection=connection, serializer=serializer)
except NoSuchJobError: except NoSuchJobError:

@ -23,11 +23,18 @@ class BaseRegistry:
Each job is stored as a key in the registry, scored by expiration time Each job is stored as a key in the registry, scored by expiration time
(unix timestamp). (unix timestamp).
""" """
job_class = Job job_class = Job
key_template = 'rq:registry:{0}' key_template = 'rq:registry:{0}'
def __init__(self, name: str = 'default', connection: Optional['Redis'] = None, def __init__(
job_class: Optional[Type['Job']] = None, queue: Optional['Queue'] = None, serializer: Any = None): self,
name: str = 'default',
connection: Optional['Redis'] = None,
job_class: Optional[Type['Job']] = None,
queue: Optional['Queue'] = None,
serializer: Any = None,
):
if queue: if queue:
self.name = queue.name self.name = queue.name
self.connection = resolve_connection(queue.connection) self.connection = resolve_connection(queue.connection)
@ -46,8 +53,8 @@ class BaseRegistry:
def __eq__(self, other): def __eq__(self, other):
return ( return (
self.name == other.name and self.name == other.name
self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs and self.connection.connection_pool.connection_kwargs == other.connection.connection_pool.connection_kwargs
) )
def __contains__(self, item: Union[str, 'Job']): def __contains__(self, item: Union[str, 'Job']):
@ -134,8 +141,7 @@ class BaseRegistry:
_type_: _description_ _type_: _description_
""" """
self.cleanup() self.cleanup()
return [as_text(job_id) for job_id in return [as_text(job_id) for job_id in self.connection.zrange(self.key, start, end)]
self.connection.zrange(self.key, start, end)]
def get_queue(self): def get_queue(self):
"""Returns Queue object associated with this registry.""" """Returns Queue object associated with this registry."""
@ -175,8 +181,7 @@ class BaseRegistry:
raise InvalidJobOperation raise InvalidJobOperation
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
queue = Queue(job.origin, connection=self.connection, queue = Queue(job.origin, connection=self.connection, job_class=self.job_class, serializer=serializer)
job_class=self.job_class, serializer=serializer)
job.started_at = None job.started_at = None
job.ended_at = None job.ended_at = None
job._exc_info = '' job._exc_info = ''
@ -195,6 +200,7 @@ class StartedJobRegistry(BaseRegistry):
Jobs are added to registry right before they are executed and removed Jobs are added to registry right before they are executed and removed
right after completion (success or failure). right after completion (success or failure).
""" """
key_template = 'rq:wip:{0}' key_template = 'rq:wip:{0}'
def cleanup(self, timestamp: Optional[float] = None): def cleanup(self, timestamp: Optional[float] = None):
@ -216,9 +222,7 @@ class StartedJobRegistry(BaseRegistry):
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for job_id in job_ids: for job_id in job_ids:
try: try:
job = self.job_class.fetch(job_id, job = self.job_class.fetch(job_id, connection=self.connection, serializer=self.serializer)
connection=self.connection,
serializer=self.serializer)
except NoSuchJobError: except NoSuchJobError:
continue continue
@ -246,6 +250,7 @@ class FinishedJobRegistry(BaseRegistry):
Registry of jobs that have been completed. Jobs are added to this Registry of jobs that have been completed. Jobs are added to this
registry after they have successfully completed for monitoring purposes. registry after they have successfully completed for monitoring purposes.
""" """
key_template = 'rq:finished:{0}' key_template = 'rq:finished:{0}'
def cleanup(self, timestamp: Optional[float] = None): def cleanup(self, timestamp: Optional[float] = None):
@ -263,6 +268,7 @@ class FailedJobRegistry(BaseRegistry):
""" """
Registry of containing failed jobs. Registry of containing failed jobs.
""" """
key_template = 'rq:failed:{0}' key_template = 'rq:failed:{0}'
def cleanup(self, timestamp: Optional[float] = None): def cleanup(self, timestamp: Optional[float] = None):
@ -275,8 +281,14 @@ class FailedJobRegistry(BaseRegistry):
score = timestamp if timestamp is not None else current_timestamp() score = timestamp if timestamp is not None else current_timestamp()
self.connection.zremrangebyscore(self.key, 0, score) self.connection.zremrangebyscore(self.key, 0, score)
def add(self, job: 'Job', ttl=None, exc_string: str = '', pipeline: Optional['Pipeline'] = None, def add(
_save_exc_to_job: bool = False): self,
job: 'Job',
ttl=None,
exc_string: str = '',
pipeline: Optional['Pipeline'] = None,
_save_exc_to_job: bool = False,
):
""" """
Adds a job to a registry with expiry time of now + ttl. Adds a job to a registry with expiry time of now + ttl.
`ttl` defaults to DEFAULT_FAILURE_TTL if not specified. `ttl` defaults to DEFAULT_FAILURE_TTL if not specified.
@ -303,6 +315,7 @@ class DeferredJobRegistry(BaseRegistry):
""" """
Registry of deferred jobs (waiting for another job to finish). Registry of deferred jobs (waiting for another job to finish).
""" """
key_template = 'rq:deferred:{0}' key_template = 'rq:deferred:{0}'
def cleanup(self): def cleanup(self):
@ -316,6 +329,7 @@ class ScheduledJobRegistry(BaseRegistry):
""" """
Registry of scheduled jobs. Registry of scheduled jobs.
""" """
key_template = 'rq:scheduled:{0}' key_template = 'rq:scheduled:{0}'
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -412,19 +426,16 @@ def clean_registries(queue: 'Queue'):
Args: Args:
queue (Queue): The queue to clean queue (Queue): The queue to clean
""" """
registry = FinishedJobRegistry(name=queue.name, registry = FinishedJobRegistry(
connection=queue.connection, name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
job_class=queue.job_class, )
serializer=queue.serializer)
registry.cleanup() registry.cleanup()
registry = StartedJobRegistry(name=queue.name, registry = StartedJobRegistry(
connection=queue.connection, name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
job_class=queue.job_class, )
serializer=queue.serializer)
registry.cleanup() registry.cleanup()
registry = FailedJobRegistry(name=queue.name, registry = FailedJobRegistry(
connection=queue.connection, name=queue.name, connection=queue.connection, job_class=queue.job_class, serializer=queue.serializer
job_class=queue.job_class, )
serializer=queue.serializer)
registry.cleanup() registry.cleanup()

@ -17,15 +17,22 @@ def get_key(job_id):
class Result(object): class Result(object):
class Type(Enum): class Type(Enum):
SUCCESSFUL = 1 SUCCESSFUL = 1
FAILED = 2 FAILED = 2
STOPPED = 3 STOPPED = 3
def __init__(self, job_id: str, type: Type, connection: Redis, id: Optional[str] = None, def __init__(
created_at: Optional[datetime] = None, return_value: Optional[Any] = None, self,
exc_string: Optional[str] = None, serializer=None): job_id: str,
type: Type,
connection: Redis,
id: Optional[str] = None,
created_at: Optional[datetime] = None,
return_value: Optional[Any] = None,
exc_string: Optional[str] = None,
serializer=None,
):
self.return_value = return_value self.return_value = return_value
self.exc_string = exc_string self.exc_string = exc_string
self.type = type self.type = type
@ -49,16 +56,26 @@ class Result(object):
@classmethod @classmethod
def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None): def create(cls, job, type, ttl, return_value=None, exc_string=None, pipeline=None):
result = cls(job_id=job.id, type=type, connection=job.connection, result = cls(
job_id=job.id,
type=type,
connection=job.connection,
return_value=return_value, return_value=return_value,
exc_string=exc_string, serializer=job.serializer) exc_string=exc_string,
serializer=job.serializer,
)
result.save(ttl=ttl, pipeline=pipeline) result.save(ttl=ttl, pipeline=pipeline)
return result return result
@classmethod @classmethod
def create_failure(cls, job, ttl, exc_string, pipeline=None): def create_failure(cls, job, ttl, exc_string, pipeline=None):
result = cls(job_id=job.id, type=cls.Type.FAILED, connection=job.connection, result = cls(
exc_string=exc_string, serializer=job.serializer) job_id=job.id,
type=cls.Type.FAILED,
connection=job.connection,
exc_string=exc_string,
serializer=job.serializer,
)
result.save(ttl=ttl, pipeline=pipeline) result.save(ttl=ttl, pipeline=pipeline)
return result return result
@ -70,8 +87,7 @@ class Result(object):
results = [] results = []
for (result_id, payload) in response: for (result_id, payload) in response:
results.append( results.append(
cls.restore(job.id, result_id.decode(), payload, cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
connection=job.connection, serializer=serializer)
) )
return results return results
@ -89,9 +105,7 @@ class Result(object):
@classmethod @classmethod
def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result': def restore(cls, job_id: str, result_id: str, payload: dict, connection: Redis, serializer=None) -> 'Result':
"""Create a Result object from given Redis payload""" """Create a Result object from given Redis payload"""
created_at = datetime.fromtimestamp( created_at = datetime.fromtimestamp(int(result_id.split('-')[0]) / 1000, tz=timezone.utc)
int(result_id.split('-')[0]) / 1000, tz=timezone.utc
)
payload = decode_redis_hash(payload) payload = decode_redis_hash(payload)
# data, timestamp = payload # data, timestamp = payload
# result_data = json.loads(data) # result_data = json.loads(data)
@ -106,11 +120,15 @@ class Result(object):
if exc_string: if exc_string:
exc_string = zlib.decompress(b64decode(exc_string)).decode() exc_string = zlib.decompress(b64decode(exc_string)).decode()
return Result(job_id, Result.Type(int(payload['type'])), connection=connection, return Result(
job_id,
Result.Type(int(payload['type'])),
connection=connection,
id=result_id, id=result_id,
created_at=created_at, created_at=created_at,
return_value=return_value, return_value=return_value,
exc_string=exc_string) exc_string=exc_string,
)
@classmethod @classmethod
def fetch(cls, job: Job, serializer=None) -> Optional['Result']: def fetch(cls, job: Job, serializer=None) -> Optional['Result']:
@ -134,8 +152,7 @@ class Result(object):
return None return None
result_id, payload = response[0] result_id, payload = response[0]
return cls.restore(job.id, result_id.decode(), payload, return cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
connection=job.connection, serializer=serializer)
@classmethod @classmethod
def get_key(cls, job_id): def get_key(cls, job_id):

@ -9,8 +9,7 @@ from multiprocessing import Process
from redis import SSLConnection, UnixDomainSocketConnection from redis import SSLConnection, UnixDomainSocketConnection
from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, DEFAULT_SCHEDULER_FALLBACK_PERIOD
DEFAULT_SCHEDULER_FALLBACK_PERIOD)
from .job import Job from .job import Job
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import Queue from .queue import Queue
@ -35,9 +34,16 @@ class RQScheduler:
Status = SchedulerStatus Status = SchedulerStatus
def __init__(self, queues, connection, interval=1, logging_level=logging.INFO, def __init__(
self,
queues,
connection,
interval=1,
logging_level=logging.INFO,
date_format=DEFAULT_LOGGING_DATE_FORMAT, date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, serializer=None): log_format=DEFAULT_LOGGING_FORMAT,
serializer=None,
):
self._queue_names = set(parse_names(queues)) self._queue_names = set(parse_names(queues))
self._acquired_locks = set() self._acquired_locks = set()
self._scheduled_job_registries = [] self._scheduled_job_registries = []
@ -59,9 +65,7 @@ class RQScheduler:
# the key is necessary. # the key is necessary.
# `path` is not left in the dictionary as that keyword argument is # `path` is not left in the dictionary as that keyword argument is
# not expected by `redis.client.Redis` and would raise an exception. # not expected by `redis.client.Redis` and would raise an exception.
self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop( self._connection_kwargs['unix_socket_path'] = self._connection_kwargs.pop('path')
'path'
)
self.serializer = resolve_serializer(serializer) self.serializer = resolve_serializer(serializer)
self._connection = None self._connection = None
@ -158,9 +162,7 @@ class RQScheduler:
queue = Queue(registry.name, connection=self.connection, serializer=self.serializer) queue = Queue(registry.name, connection=self.connection, serializer=self.serializer)
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
jobs = Job.fetch_many( jobs = Job.fetch_many(job_ids, connection=self.connection, serializer=self.serializer)
job_ids, connection=self.connection, serializer=self.serializer
)
for job in jobs: for job in jobs:
if job is not None: if job is not None:
queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front)) queue.enqueue_job(job, pipeline=pipeline, at_front=bool(job.enqueue_at_front))
@ -181,8 +183,7 @@ class RQScheduler:
def heartbeat(self): def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks""" """Updates the TTL on scheduler keys and the locks"""
self.log.debug("Scheduler sending heartbeat to %s", self.log.debug("Scheduler sending heartbeat to %s", ", ".join(self.acquired_locks))
", ".join(self.acquired_locks))
if len(self._queue_names) > 1: if len(self._queue_names) > 1:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for name in self._acquired_locks: for name in self._acquired_locks:
@ -194,8 +195,7 @@ class RQScheduler:
self.connection.expire(key, self.interval + 60) self.connection.expire(key, self.interval + 60)
def stop(self): def stop(self):
self.log.info("Scheduler stopping, releasing locks for %s...", self.log.info("Scheduler stopping, releasing locks for %s...", ','.join(self._queue_names))
','.join(self._queue_names))
self.release_locks() self.release_locks()
self._status = self.Status.STOPPED self._status = self.Status.STOPPED
@ -231,15 +231,11 @@ class RQScheduler:
def run(scheduler): def run(scheduler):
scheduler.log.info("Scheduler for %s started with PID %s", scheduler.log.info("Scheduler for %s started with PID %s", ','.join(scheduler._queue_names), os.getpid())
','.join(scheduler._queue_names), os.getpid())
try: try:
scheduler.work() scheduler.work()
except: # noqa except: # noqa
scheduler.log.error( scheduler.log.error('Scheduler [PID %s] raised an exception.\n%s', os.getpid(), traceback.format_exc())
'Scheduler [PID %s] raised an exception.\n%s',
os.getpid(), traceback.format_exc()
)
raise raise
scheduler.log.info("Scheduler with PID %s has stopped", os.getpid()) scheduler.log.info("Scheduler with PID %s has stopped", os.getpid())

@ -11,7 +11,7 @@ class DefaultSerializer:
loads = pickle.loads loads = pickle.loads
class JSONSerializer(): class JSONSerializer:
@staticmethod @staticmethod
def dumps(*args, **kwargs): def dumps(*args, **kwargs):
return json.dumps(*args, **kwargs).encode('utf-8') return json.dumps(*args, **kwargs).encode('utf-8')

@ -5,6 +5,7 @@ import threading
class BaseTimeoutException(Exception): class BaseTimeoutException(Exception):
"""Base exception for timeouts.""" """Base exception for timeouts."""
pass pass
@ -12,6 +13,7 @@ class JobTimeoutException(BaseTimeoutException):
"""Raised when a job takes longer to complete than the allowed maximum """Raised when a job takes longer to complete than the allowed maximum
timeout value. timeout value.
""" """
pass pass
@ -19,6 +21,7 @@ class HorseMonitorTimeoutException(BaseTimeoutException):
"""Raised when waiting for a horse exiting takes longer than the maximum """Raised when waiting for a horse exiting takes longer than the maximum
timeout value. timeout value.
""" """
pass pass
@ -56,10 +59,8 @@ class BaseDeathPenalty:
class UnixSignalDeathPenalty(BaseDeathPenalty): class UnixSignalDeathPenalty(BaseDeathPenalty):
def handle_death_penalty(self, signum, frame): def handle_death_penalty(self, signum, frame):
raise self._exception('Task exceeded maximum timeout value ' raise self._exception('Task exceeded maximum timeout value ' '({0} seconds)'.format(self._timeout))
'({0} seconds)'.format(self._timeout))
def setup_death_penalty(self): def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises """Sets up an alarm signal and a signal handler that raises
@ -85,15 +86,12 @@ class TimerDeathPenalty(BaseDeathPenalty):
# Monkey-patch exception with the message ahead of time # Monkey-patch exception with the message ahead of time
# since PyThreadState_SetAsyncExc can only take a class # since PyThreadState_SetAsyncExc can only take a class
def init_with_message(self, *args, **kwargs): # noqa def init_with_message(self, *args, **kwargs): # noqa
super(exception, self).__init__( super(exception, self).__init__("Task exceeded maximum timeout value ({0} seconds)".format(timeout))
"Task exceeded maximum timeout value ({0} seconds)".format(timeout)
)
self._exception.__init__ = init_with_message self._exception.__init__ = init_with_message
def new_timer(self): def new_timer(self):
"""Returns a new timer since timers can only be used once. """Returns a new timer since timers can only be used once."""
"""
return threading.Timer(self._timeout, self.handle_death_penalty) return threading.Timer(self._timeout, self.handle_death_penalty)
def handle_death_penalty(self): def handle_death_penalty(self):
@ -111,13 +109,11 @@ class TimerDeathPenalty(BaseDeathPenalty):
raise SystemError("PyThreadState_SetAsyncExc failed") raise SystemError("PyThreadState_SetAsyncExc failed")
def setup_death_penalty(self): def setup_death_penalty(self):
"""Starts the timer. """Starts the timer."""
"""
self._timer = self.new_timer() self._timer = self.new_timer()
self._timer.start() self._timer.start()
def cancel_death_penalty(self): def cancel_death_penalty(self):
"""Cancels the timer. """Cancels the timer."""
"""
self._timer.cancel() self._timer.cancel()
self._timer = None self._timer = None

@ -41,10 +41,8 @@ class _Colorizer:
self.codes["blink"] = esc + "05m" self.codes["blink"] = esc + "05m"
self.codes["overline"] = esc + "06m" self.codes["overline"] = esc + "06m"
dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue", "purple", "teal", "lightgray"]
"purple", "teal", "lightgray"] light_colors = ["darkgray", "red", "green", "yellow", "blue", "fuchsia", "turquoise", "white"]
light_colors = ["darkgray", "red", "green", "yellow", "blue",
"fuchsia", "turquoise", "white"]
x = 30 x = 30
for d, l in zip(dark_colors, light_colors): for d, l in zip(dark_colors, light_colors):
@ -90,8 +88,10 @@ def make_colorizer(color: str):
>>> # You can then use: >>> # You can then use:
>>> print("It's either " + green('OK') + ' or ' + red('Oops')) >>> print("It's either " + green('OK') + ' or ' + red('Oops'))
""" """
def inner(text): def inner(text):
return colorizer.colorize(color, text) return colorizer.colorize(color, text)
return inner return inner
@ -230,8 +230,7 @@ def utcnow():
def now(): def now():
"""Return now in UTC """Return now in UTC"""
"""
return datetime.datetime.now(datetime.timezone.utc) return datetime.datetime.now(datetime.timezone.utc)
@ -356,8 +355,7 @@ def str_to_date(date_str: Optional[str]) -> Union[dt.datetime, Any]:
def parse_timeout(timeout: Any): def parse_timeout(timeout: Any):
"""Transfer all kinds of timeout format to an integer representing seconds """Transfer all kinds of timeout format to an integer representing seconds"""
"""
if not isinstance(timeout, numbers.Integral) and timeout is not None: if not isinstance(timeout, numbers.Integral) and timeout is not None:
try: try:
timeout = int(timeout) timeout = int(timeout)
@ -367,9 +365,11 @@ def parse_timeout(timeout: Any):
try: try:
timeout = int(digit) * unit_second[unit] timeout = int(digit) * unit_second[unit]
except (ValueError, KeyError): except (ValueError, KeyError):
raise TimeoutFormatError('Timeout must be an integer or a string representing an integer, or ' raise TimeoutFormatError(
'Timeout must be an integer or a string representing an integer, or '
'a string with format: digits + unit, unit can be "d", "h", "m", "s", ' 'a string with format: digits + unit, unit can be "d", "h", "m", "s", '
'such as "1h", "23m".') 'such as "1h", "23m".'
)
return timeout return timeout
@ -391,7 +391,7 @@ def get_version(connection: 'Redis') -> Tuple[int, int, int]:
setattr( setattr(
connection, connection,
"__rq_redis_server_version", "__rq_redis_server_version",
tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]) tuple(int(i) for i in connection.info("server")["redis_version"].split('.')[:3]),
) )
return getattr(connection, "__rq_redis_server_version") return getattr(connection, "__rq_redis_server_version")
except ResponseError: # fakeredis doesn't implement Redis' INFO command except ResponseError: # fakeredis doesn't implement Redis' INFO command
@ -440,8 +440,9 @@ def truncate_long_string(data: str, max_length: Optional[int] = None) -> str:
return (data[:max_length] + '...') if len(data) > max_length else data return (data[:max_length] + '...') if len(data) > max_length else data
def get_call_string(func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], def get_call_string(
max_length: Optional[int] = None) -> Optional[str]: func_name: Optional[str], args: Any, kwargs: Dict[Any, Any], max_length: Optional[int] = None
) -> Optional[str]:
""" """
Returns a string representation of the call, formatted as a regular Returns a string representation of the call, formatted as a regular
Python function invocation statement. If max_length is not None, truncate Python function invocation statement. If max_length is not None, truncate

@ -34,9 +34,15 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL, from .defaults import (
DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, CALLBACK_TIMEOUT,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_MAINTENANCE_TASK_INTERVAL,
DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL,
DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT,
)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
@ -46,8 +52,7 @@ from .results import Result
from .scheduler import RQScheduler from .scheduler import RQScheduler
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import (backend_class, ensure_list, get_version, from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact
make_colorizer, utcformat, utcnow, utcparse, compact)
from .version import VERSION from .version import VERSION
from .worker_registration import clean_worker_registry, get_keys from .worker_registration import clean_worker_registry, get_keys
from .serializers import resolve_serializer from .serializers import resolve_serializer
@ -55,9 +60,11 @@ from .serializers import resolve_serializer
try: try:
from setproctitle import setproctitle as setprocname from setproctitle import setproctitle as setprocname
except ImportError: except ImportError:
def setprocname(*args, **kwargs): # noqa def setprocname(*args, **kwargs): # noqa
pass pass
green = make_colorizer('darkgreen') green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
blue = make_colorizer('darkblue') blue = make_colorizer('darkblue')
@ -69,10 +76,9 @@ class StopRequested(Exception):
pass pass
_signames = dict(
_signames = dict((getattr(signal, signame), signame) (getattr(signal, signame), signame) for signame in dir(signal) if signame.startswith('SIG') and '_' not in signame
for signame in dir(signal) )
if signame.startswith('SIG') and '_' not in signame)
def signal_name(signum): def signal_name(signum):
@ -118,7 +124,7 @@ class Worker:
job_class: Optional[Type['Job']] = None, job_class: Optional[Type['Job']] = None,
queue_class: Optional[Type['Queue']] = None, queue_class: Optional[Type['Queue']] = None,
queue: Optional['Queue'] = None, queue: Optional['Queue'] = None,
serializer=None serializer=None,
) -> List['Worker']: ) -> List['Worker']:
"""Returns an iterable of all Workers. """Returns an iterable of all Workers.
@ -131,11 +137,12 @@ class Worker:
connection = get_current_connection() connection = get_current_connection()
worker_keys = get_keys(queue=queue, connection=connection) worker_keys = get_keys(queue=queue, connection=connection)
workers = [cls.find_by_key(as_text(key), workers = [
connection=connection, cls.find_by_key(
job_class=job_class, as_text(key), connection=connection, job_class=job_class, queue_class=queue_class, serializer=serializer
queue_class=queue_class, serializer=serializer) )
for key in worker_keys] for key in worker_keys
]
return compact(workers) return compact(workers)
@classmethod @classmethod
@ -149,8 +156,7 @@ class Worker:
Returns: Returns:
list_keys (List[str]): A list of worker keys list_keys (List[str]): A list of worker keys
""" """
return [as_text(key) return [as_text(key) for key in get_keys(queue=queue, connection=connection)]
for key in get_keys(queue=queue, connection=connection)]
@classmethod @classmethod
def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None): def count(cls, connection: Optional['Redis'] = None, queue: Optional['Queue'] = None):
@ -166,8 +172,14 @@ class Worker:
return len(get_keys(queue=queue, connection=connection)) return len(get_keys(queue=queue, connection=connection))
@classmethod @classmethod
def find_by_key(cls, worker_key: str, connection: Optional['Redis'] = None, job_class: Type['Job'] = None, def find_by_key(
queue_class: Type['Queue'] = None, serializer=None): cls,
worker_key: str,
connection: Optional['Redis'] = None,
job_class: Type['Job'] = None,
queue_class: Type['Queue'] = None,
serializer=None,
):
"""Returns a Worker instance, based on the naming conventions for """Returns a Worker instance, based on the naming conventions for
naming the internal Redis keys. Can be used to reverse-lookup Workers naming the internal Redis keys. Can be used to reverse-lookup Workers
by their Redis keys. by their Redis keys.
@ -183,20 +195,37 @@ class Worker:
return None return None
name = worker_key[len(prefix) :] name = worker_key[len(prefix) :]
worker = cls([], name, connection=connection, job_class=job_class, worker = cls(
queue_class=queue_class, prepare_for_work=False, serializer=serializer) [],
name,
connection=connection,
job_class=job_class,
queue_class=queue_class,
prepare_for_work=False,
serializer=serializer,
)
worker.refresh() worker.refresh()
return worker return worker
def __init__(self, queues, name: Optional[str] = None, default_result_ttl=DEFAULT_RESULT_TTL, def __init__(
connection: Optional['Redis'] = None, exc_handler=None, exception_handlers=None, self,
default_worker_ttl=DEFAULT_WORKER_TTL, job_class: Type['Job'] = None, queues,
queue_class=None, log_job_description: bool = True, name: Optional[str] = None,
default_result_ttl=DEFAULT_RESULT_TTL,
connection: Optional['Redis'] = None,
exc_handler=None,
exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL,
job_class: Type['Job'] = None,
queue_class=None,
log_job_description: bool = True,
job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL, job_monitoring_interval=DEFAULT_JOB_MONITORING_INTERVAL,
disable_default_exception_handler: bool = False, disable_default_exception_handler: bool = False,
prepare_for_work: bool = True, serializer=None): # noqa prepare_for_work: bool = True,
serializer=None,
): # noqa
connection = self._set_connection(connection, default_worker_ttl) connection = self._set_connection(connection, default_worker_ttl)
self.connection = connection self.connection = connection
@ -208,11 +237,12 @@ class Worker:
self.python_version = sys.version self.python_version = sys.version
self.serializer = resolve_serializer(serializer) self.serializer = resolve_serializer(serializer)
queues = [self.queue_class(name=q, queues = [
connection=connection, self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer)
job_class=self.job_class, serializer=self.serializer) if isinstance(q, str)
if isinstance(q, str) else q else q
for q in ensure_list(queues)] for q in ensure_list(queues)
]
self.name: str = name or uuid4().hex self.name: str = name or uuid4().hex
self.queues = queues self.queues = queues
@ -250,24 +280,14 @@ class Worker:
try: try:
connection.client_setname(self.name) connection.client_setname(self.name)
except redis.exceptions.ResponseError: except redis.exceptions.ResponseError:
warnings.warn( warnings.warn('CLIENT SETNAME command not supported, setting ip_address to unknown', Warning)
'CLIENT SETNAME command not supported, setting ip_address to unknown',
Warning
)
self.ip_address = 'unknown' self.ip_address = 'unknown'
else: else:
client_adresses = [ client_adresses = [client['addr'] for client in connection.client_list() if client['name'] == self.name]
client['addr']
for client in connection.client_list()
if client['name'] == self.name
]
if len(client_adresses) > 0: if len(client_adresses) > 0:
self.ip_address = client_adresses[0] self.ip_address = client_adresses[0]
else: else:
warnings.warn( warnings.warn('CLIENT LIST command not supported, setting ip_address to unknown', Warning)
'CLIENT LIST command not supported, setting ip_address to unknown',
Warning
)
self.ip_address = 'unknown' self.ip_address = 'unknown'
else: else:
self.hostname = None self.hostname = None
@ -361,8 +381,7 @@ class Worker:
def register_birth(self): def register_birth(self):
"""Registers its own birth.""" """Registers its own birth."""
self.log.debug('Registering birth of worker %s', self.name) self.log.debug('Registering birth of worker %s', self.name)
if self.connection.exists(self.key) and \ if self.connection.exists(self.key) and not self.connection.hexists(self.key, 'death'):
not self.connection.hexists(self.key, 'death'):
msg = 'There exists an active worker named {0!r} already' msg = 'There exists an active worker named {0!r} already'
raise ValueError(msg.format(self.name)) raise ValueError(msg.format(self.name))
key = self.key key = self.key
@ -436,10 +455,7 @@ class Worker:
def _set_state(self, state): def _set_state(self, state):
"""Raise a DeprecationWarning if ``worker.state = X`` is used""" """Raise a DeprecationWarning if ``worker.state = X`` is used"""
warnings.warn( warnings.warn("worker.state is deprecated, use worker.set_state() instead.", DeprecationWarning)
"worker.state is deprecated, use worker.set_state() instead.",
DeprecationWarning
)
self.set_state(state) self.set_state(state)
def get_state(self): def get_state(self):
@ -447,10 +463,7 @@ class Worker:
def _get_state(self): def _get_state(self):
"""Raise a DeprecationWarning if ``worker.state == X`` is used""" """Raise a DeprecationWarning if ``worker.state == X`` is used"""
warnings.warn( warnings.warn("worker.state is deprecated, use worker.get_state() instead.", DeprecationWarning)
"worker.state is deprecated, use worker.get_state() instead.",
DeprecationWarning
)
return self.get_state() return self.get_state()
state = property(_get_state, _set_state) state = property(_get_state, _set_state)
@ -516,8 +529,7 @@ class Worker:
return pid, stat return pid, stat
def request_force_stop(self, signum, frame): def request_force_stop(self, signum, frame):
"""Terminates the application (cold shutdown). """Terminates the application (cold shutdown)."""
"""
self.log.warning('Cold shut down') self.log.warning('Cold shut down')
# Take down the horse with the worker # Take down the horse with the worker
@ -547,8 +559,7 @@ class Worker:
if self.get_state() == WorkerStatus.BUSY: if self.get_state() == WorkerStatus.BUSY:
self._stop_requested = True self._stop_requested = True
self.set_shutdown_requested_date() self.set_shutdown_requested_date()
self.log.debug('Stopping after current horse is finished. ' self.log.debug('Stopping after current horse is finished. ' 'Press Ctrl+C again for a cold shutdown.')
'Press Ctrl+C again for a cold shutdown.')
if self.scheduler: if self.scheduler:
self.stop_scheduler() self.stop_scheduler()
else: else:
@ -614,8 +625,15 @@ class Worker:
def reorder_queues(self, reference_queue): def reorder_queues(self, reference_queue):
pass pass
def work(self, burst: bool = False, logging_level: str = "INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, def work(
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler: bool = False): self,
burst: bool = False,
logging_level: str = "INFO",
date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT,
max_jobs=None,
with_scheduler: bool = False,
):
"""Starts the work loop. """Starts the work loop.
Pops and performs all jobs on the current list of queues. When all Pops and performs all jobs on the current list of queues. When all
@ -635,8 +653,13 @@ class Worker:
if with_scheduler: if with_scheduler:
self.scheduler = RQScheduler( self.scheduler = RQScheduler(
self.queues, connection=self.connection, logging_level=logging_level, self.queues,
date_format=date_format, log_format=log_format, serializer=self.serializer) connection=self.connection,
logging_level=logging_level,
date_format=date_format,
log_format=log_format,
serializer=self.serializer,
)
self.scheduler.acquire_locks() self.scheduler.acquire_locks()
# If lock is acquired, start scheduler # If lock is acquired, start scheduler
if self.scheduler.acquired_locks: if self.scheduler.acquired_locks:
@ -676,10 +699,7 @@ class Worker:
completed_jobs += 1 completed_jobs += 1
if max_jobs is not None: if max_jobs is not None:
if completed_jobs >= max_jobs: if completed_jobs >= max_jobs:
self.log.info( self.log.info("Worker %s: finished executing %d jobs, quitting", self.key, completed_jobs)
"Worker %s: finished executing %d jobs, quitting",
self.key, completed_jobs
)
break break
except redis.exceptions.TimeoutError: except redis.exceptions.TimeoutError:
@ -694,10 +714,7 @@ class Worker:
raise raise
except: # noqa except: # noqa
self.log.error( self.log.error('Worker %s: found an unhandled exception, quitting...', self.key, exc_info=True)
'Worker %s: found an unhandled exception, quitting...',
self.key, exc_info=True
)
break break
finally: finally:
if not self.is_horse: if not self.is_horse:
@ -736,19 +753,20 @@ class Worker:
self.run_maintenance_tasks() self.run_maintenance_tasks()
self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}")
result = self.queue_class.dequeue_any(self._ordered_queues, timeout, result = self.queue_class.dequeue_any(
self._ordered_queues,
timeout,
connection=self.connection, connection=self.connection,
job_class=self.job_class, job_class=self.job_class,
serializer=self.serializer) serializer=self.serializer,
)
self.log.debug(f"Dequeued job {result[1]} from {result[0]}") self.log.debug(f"Dequeued job {result[1]} from {result[0]}")
if result is not None: if result is not None:
job, queue = result job, queue = result
job.redis_server_version = self.get_redis_server_version() job.redis_server_version = self.get_redis_server_version()
if self.log_job_description: if self.log_job_description:
self.log.info( self.log.info('%s: %s (%s)', green(queue.name), blue(job.description), job.id)
'%s: %s (%s)', green(queue.name),
blue(job.description), job.id)
else: else:
self.log.info('%s: %s', green(queue.name), job.id) self.log.info('%s: %s', green(queue.name), job.id)
@ -756,8 +774,9 @@ class Worker:
except DequeueTimeout: except DequeueTimeout:
pass pass
except redis.exceptions.ConnectionError as conn_err: except redis.exceptions.ConnectionError as conn_err:
self.log.error('Could not connect to Redis instance: %s Retrying in %d seconds...', self.log.error(
conn_err, connection_wait_time) 'Could not connect to Redis instance: %s Retrying in %d seconds...', conn_err, connection_wait_time
)
time.sleep(connection_wait_time) time.sleep(connection_wait_time)
connection_wait_time *= self.exponential_backoff_factor connection_wait_time *= self.exponential_backoff_factor
connection_wait_time = min(connection_wait_time, self.max_connection_wait_time) connection_wait_time = min(connection_wait_time, self.max_connection_wait_time)
@ -782,18 +801,44 @@ class Worker:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout) connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
self.log.debug('Sent heartbeat to prevent worker timeout. ' self.log.debug(
'Next one should arrive within %s seconds.', timeout) 'Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within %s seconds.', timeout
)
def refresh(self): def refresh(self):
data = self.connection.hmget( data = self.connection.hmget(
self.key, 'queues', 'state', 'current_job', 'last_heartbeat', self.key,
'birth', 'failed_job_count', 'successful_job_count', 'total_working_time', 'queues',
'current_job_working_time', 'hostname', 'ip_address', 'pid', 'version', 'python_version', 'state',
'current_job',
'last_heartbeat',
'birth',
'failed_job_count',
'successful_job_count',
'total_working_time',
'current_job_working_time',
'hostname',
'ip_address',
'pid',
'version',
'python_version',
) )
(queues, state, job_id, last_heartbeat, birth, failed_job_count, (
successful_job_count, total_working_time, current_job_working_time, queues,
hostname, ip_address, pid, version, python_version) = data state,
job_id,
last_heartbeat,
birth,
failed_job_count,
successful_job_count,
total_working_time,
current_job_working_time,
hostname,
ip_address,
pid,
version,
python_version,
) = data
queues = as_text(queues) queues = as_text(queues)
self.hostname = as_text(hostname) self.hostname = as_text(hostname)
self.ip_address = as_text(ip_address) self.ip_address = as_text(ip_address)
@ -820,10 +865,12 @@ class Worker:
self.current_job_working_time = float(as_text(current_job_working_time)) self.current_job_working_time = float(as_text(current_job_working_time))
if queues: if queues:
self.queues = [self.queue_class(queue, self.queues = [
connection=self.connection, self.queue_class(
job_class=self.job_class, serializer=self.serializer) queue, connection=self.connection, job_class=self.job_class, serializer=self.serializer
for queue in queues.split(',')] )
for queue in queues.split(',')
]
def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None): def increment_failed_job_count(self, pipeline: Optional['Pipeline'] = None):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
@ -834,12 +881,10 @@ class Worker:
connection.hincrby(self.key, 'successful_job_count', 1) connection.hincrby(self.key, 'successful_job_count', 1)
def increment_total_working_time(self, job_execution_time, pipeline): def increment_total_working_time(self, job_execution_time, pipeline):
pipeline.hincrbyfloat(self.key, 'total_working_time', pipeline.hincrbyfloat(self.key, 'total_working_time', job_execution_time.total_seconds())
job_execution_time.total_seconds())
def fork_work_horse(self, job: 'Job', queue: 'Queue'): def fork_work_horse(self, job: 'Job', queue: 'Queue'):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job."""
"""
child_pid = os.fork() child_pid = os.fork()
os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_WORKER_ID'] = self.name
os.environ['RQ_JOB_ID'] = job.id os.environ['RQ_JOB_ID'] = job.id
@ -909,24 +954,20 @@ class Worker:
elif self._stopped_job_id == job.id: elif self._stopped_job_id == job.id:
# Work-horse killed deliberately # Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry') self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
self.handle_job_failure( self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
job, queue=queue,
exc_string="Job stopped by user, work-horse terminated."
)
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at: if not job.ended_at:
job.ended_at = utcnow() job.ended_at = utcnow()
# Unhandled failure: move the job to the failed queue # Unhandled failure: move the job to the failed queue
self.log.warning(( self.log.warning(
'Moving job to FailedJobRegistry ' ('Moving job to FailedJobRegistry ' '(work-horse terminated unexpectedly; waitpid returned {})').format(
'(work-horse terminated unexpectedly; waitpid returned {})' ret_val
).format(ret_val)) )
)
self.handle_job_failure( self.handle_job_failure(
job, queue=queue, job, queue=queue, exc_string="Work-horse was terminated unexpectedly " "(waitpid returned %s)" % ret_val
exc_string="Work-horse was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
) )
def execute_job(self, job: 'Job', queue: 'Queue'): def execute_job(self, job: 'Job', queue: 'Queue'):
@ -1009,8 +1050,7 @@ class Worker:
msg = 'Processing {0} from {1} since {2}' msg = 'Processing {0} from {1} since {2}'
self.procline(msg.format(job.func_name, job.origin, time.time())) self.procline(msg.format(job.func_name, job.origin, time.time()))
def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, def handle_job_failure(self, job: 'Job', queue: 'Queue', started_job_registry=None, exc_string=''):
exc_string=''):
""" """
Handles the failure or an executing job by: Handles the failure or an executing job by:
1. Setting the job status to failed 1. Setting the job status to failed
@ -1023,10 +1063,7 @@ class Worker:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
if started_job_registry is None: if started_job_registry is None:
started_job_registry = StartedJobRegistry( started_job_registry = StartedJobRegistry(
job.origin, job.origin, self.connection, job_class=self.job_class, serializer=self.serializer
self.connection,
job_class=self.job_class,
serializer=self.serializer
) )
# check whether a job was stopped intentionally and set the job # check whether a job was stopped intentionally and set the job
@ -1045,14 +1082,19 @@ class Worker:
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
if not self.disable_default_exception_handler and not retry: if not self.disable_default_exception_handler and not retry:
failed_job_registry = FailedJobRegistry(job.origin, job.connection, failed_job_registry = FailedJobRegistry(
job_class=self.job_class, serializer=job.serializer) job.origin, job.connection, job_class=self.job_class, serializer=job.serializer
)
# Exception should be saved in job hash if server # Exception should be saved in job hash if server
# doesn't support Redis streams # doesn't support Redis streams
_save_exc_to_job = not self.supports_redis_streams _save_exc_to_job = not self.supports_redis_streams
failed_job_registry.add(job, ttl=job.failure_ttl, failed_job_registry.add(
exc_string=exc_string, pipeline=pipeline, job,
_save_exc_to_job=_save_exc_to_job) ttl=job.failure_ttl,
exc_string=exc_string,
pipeline=pipeline,
_save_exc_to_job=_save_exc_to_job,
)
if self.supports_redis_streams: if self.supports_redis_streams:
Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline) Result.create_failure(job, job.failure_ttl, exc_string=exc_string, pipeline=pipeline)
with suppress(redis.exceptions.ConnectionError): with suppress(redis.exceptions.ConnectionError):
@ -1061,9 +1103,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_failed_job_count(pipeline) self.increment_failed_job_count(pipeline)
if job.started_at and job.ended_at: if job.started_at and job.ended_at:
self.increment_total_working_time( self.increment_total_working_time(job.ended_at - job.started_at, pipeline)
job.ended_at - job.started_at, pipeline
)
if retry: if retry:
job.retry(queue, pipeline) job.retry(queue, pipeline)
@ -1099,9 +1139,7 @@ class Worker:
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
self.increment_successful_job_count(pipeline=pipeline) self.increment_successful_job_count(pipeline=pipeline)
self.increment_total_working_time( self.increment_total_working_time(job.ended_at - job.started_at, pipeline) # type: ignore
job.ended_at - job.started_at, pipeline # type: ignore
)
result_ttl = job.get_result_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
@ -1111,16 +1149,15 @@ class Worker:
# doesn't support Redis streams # doesn't support Redis streams
include_result = not self.supports_redis_streams include_result = not self.supports_redis_streams
# Don't clobber user's meta dictionary! # Don't clobber user's meta dictionary!
job.save(pipeline=pipeline, include_meta=False, job.save(pipeline=pipeline, include_meta=False, include_result=include_result)
include_result=include_result)
if self.supports_redis_streams: if self.supports_redis_streams:
Result.create(job, Result.Type.SUCCESSFUL, return_value=job._result, Result.create(
ttl=result_ttl, pipeline=pipeline) job, Result.Type.SUCCESSFUL, return_value=job._result, ttl=result_ttl, pipeline=pipeline
)
finished_job_registry = queue.finished_job_registry finished_job_registry = queue.finished_job_registry
finished_job_registry.add(job, result_ttl, pipeline) finished_job_registry.add(job, result_ttl, pipeline)
job.cleanup(result_ttl, pipeline=pipeline, job.cleanup(result_ttl, pipeline=pipeline, remove_from_queue=False)
remove_from_queue=False)
self.log.debug('Removing job %s from StartedJobRegistry', job.id) self.log.debug('Removing job %s from StartedJobRegistry', job.id)
started_job_registry.remove(job, pipeline=pipeline) started_job_registry.remove(job, pipeline=pipeline)
@ -1172,9 +1209,7 @@ class Worker:
if job.success_callback: if job.success_callback:
self.execute_success_callback(job, rv) self.execute_success_callback(job, rv)
self.handle_job_success(job=job, self.handle_job_success(job=job, queue=queue, started_job_registry=started_job_registry)
queue=queue,
started_job_registry=started_job_registry)
except: # NOQA except: # NOQA
self.log.debug(f"Job {job.id} raised an exception.") self.log.debug(f"Job {job.id} raised an exception.")
job.ended_at = utcnow() job.ended_at = utcnow()
@ -1185,15 +1220,13 @@ class Worker:
try: try:
self.execute_failure_callback(job) self.execute_failure_callback(job)
except: # noqa except: # noqa
self.log.error( self.log.error('Worker %s: error while executing failure callback', self.key, exc_info=True)
'Worker %s: error while executing failure callback',
self.key, exc_info=True
)
exc_info = sys.exc_info() exc_info = sys.exc_info()
exc_string = ''.join(traceback.format_exception(*exc_info)) exc_string = ''.join(traceback.format_exception(*exc_info))
self.handle_job_failure(job=job, exc_string=exc_string, queue=queue, self.handle_job_failure(
started_job_registry=started_job_registry) job=job, exc_string=exc_string, queue=queue, started_job_registry=started_job_registry
)
self.handle_exception(job, *exc_info) self.handle_exception(job, *exc_info)
return False return False
@ -1239,8 +1272,7 @@ class Worker:
extra.update({'queue': job.origin, 'job_id': job.id}) extra.update({'queue': job.origin, 'job_id': job.id})
# func_name # func_name
self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, self.log.error(f'[Job {job.id}]: exception raised while executing ({func_name})\n' + exc_string, extra=extra)
extra=extra)
for handler in self._exc_handlers: for handler in self._exc_handlers:
self.log.debug('Invoking exception handler %s', handler) self.log.debug('Invoking exception handler %s', handler)
@ -1322,6 +1354,7 @@ class HerokuWorker(Worker):
* sends SIGRTMIN to work horses on SIGTERM to the main process which in turn * sends SIGRTMIN to work horses on SIGTERM to the main process which in turn
causes the horse to crash `imminent_shutdown_delay` seconds later causes the horse to crash `imminent_shutdown_delay` seconds later
""" """
imminent_shutdown_delay = 6 imminent_shutdown_delay = 6
frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace'] frame_properties = ['f_code', 'f_lasti', 'f_lineno', 'f_locals', 'f_trace']
@ -1335,10 +1368,7 @@ class HerokuWorker(Worker):
def handle_warm_shutdown_request(self): def handle_warm_shutdown_request(self):
"""If horse is alive send it SIGRTMIN""" """If horse is alive send it SIGRTMIN"""
if self.horse_pid != 0: if self.horse_pid != 0:
self.log.info( self.log.info('Worker %s: warm shut down requested, sending horse SIGRTMIN signal', self.key)
'Worker %s: warm shut down requested, sending horse SIGRTMIN signal',
self.key
)
self.kill_horse(sig=signal.SIGRTMIN) self.kill_horse(sig=signal.SIGRTMIN)
else: else:
self.log.warning('Warm shut down requested, no horse found') self.log.warning('Warm shut down requested, no horse found')
@ -1348,8 +1378,9 @@ class HerokuWorker(Worker):
self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately') self.log.warning('Imminent shutdown, raising ShutDownImminentException immediately')
self.request_force_stop_sigrtmin(signum, frame) self.request_force_stop_sigrtmin(signum, frame)
else: else:
self.log.warning('Imminent shutdown, raising ShutDownImminentException in %d seconds', self.log.warning(
self.imminent_shutdown_delay) 'Imminent shutdown, raising ShutDownImminentException in %d seconds', self.imminent_shutdown_delay
)
signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin) signal.signal(signal.SIGRTMIN, self.request_force_stop_sigrtmin)
signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin) signal.signal(signal.SIGALRM, self.request_force_stop_sigrtmin)
signal.alarm(self.imminent_shutdown_delay) signal.alarm(self.imminent_shutdown_delay)

Loading…
Cancel
Save