|
|
|
@ -5,24 +5,25 @@ RQ command line tool
|
|
|
|
|
from __future__ import (absolute_import, division, print_function,
|
|
|
|
|
unicode_literals)
|
|
|
|
|
|
|
|
|
|
from functools import update_wrapper
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
from redis import StrictRedis
|
|
|
|
|
from redis.exceptions import ConnectionError
|
|
|
|
|
|
|
|
|
|
from rq import Connection, get_failed_queue, Queue
|
|
|
|
|
from rq import Connection, get_failed_queue, __version__ as version
|
|
|
|
|
from rq.cli.helpers import (read_config_file, refresh,
|
|
|
|
|
setup_loghandlers_from_args,
|
|
|
|
|
show_both, show_queues, show_workers, CliConfig)
|
|
|
|
|
from rq.contrib.legacy import cleanup_ghosts
|
|
|
|
|
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
|
|
|
|
|
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
|
|
|
|
|
from rq.exceptions import InvalidJobOperationError
|
|
|
|
|
from rq.utils import import_attribute
|
|
|
|
|
from rq.suspension import (suspend as connection_suspend,
|
|
|
|
|
resume as connection_resume, is_suspended)
|
|
|
|
|
|
|
|
|
|
from .helpers import (get_redis_from_config, read_config_file, refresh,
|
|
|
|
|
setup_loghandlers_from_args, show_both, show_queues,
|
|
|
|
|
show_workers)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Disable the warning that Click displays (as of Click version 5.0) when users
|
|
|
|
|
# use unicode_literals in Python 2.
|
|
|
|
@ -30,42 +31,72 @@ from .helpers import (get_redis_from_config, read_config_file, refresh,
|
|
|
|
|
click.disable_unicode_literals_warning = True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
|
|
|
|
|
help='URL describing Redis connection details.')
|
|
|
|
|
|
|
|
|
|
config_option = click.option('--config', '-c',
|
|
|
|
|
help='Module containing RQ settings.')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def connect(url, config=None, connection_class=StrictRedis):
|
|
|
|
|
if url:
|
|
|
|
|
return connection_class.from_url(url)
|
|
|
|
|
|
|
|
|
|
settings = read_config_file(config) if config else {}
|
|
|
|
|
return get_redis_from_config(settings, connection_class)
|
|
|
|
|
shared_options = [
|
|
|
|
|
click.option('--url', '-u',
|
|
|
|
|
envvar='RQ_REDIS_URL',
|
|
|
|
|
help='URL describing Redis connection details.'),
|
|
|
|
|
click.option('--config', '-c',
|
|
|
|
|
envvar='RQ_CONFIG',
|
|
|
|
|
help='Module containing RQ settings.'),
|
|
|
|
|
click.option('--worker-class', '-w',
|
|
|
|
|
envvar='RQ_WORKER_CLASS',
|
|
|
|
|
default=DEFAULT_WORKER_CLASS,
|
|
|
|
|
help='RQ Worker class to use'),
|
|
|
|
|
click.option('--job-class', '-j',
|
|
|
|
|
envvar='RQ_JOB_CLASS',
|
|
|
|
|
default=DEFAULT_JOB_CLASS,
|
|
|
|
|
help='RQ Job class to use'),
|
|
|
|
|
click.option('--queue-class',
|
|
|
|
|
envvar='RQ_QUEUE_CLASS',
|
|
|
|
|
default=DEFAULT_QUEUE_CLASS,
|
|
|
|
|
help='RQ Queue class to use'),
|
|
|
|
|
click.option('--connection-class',
|
|
|
|
|
envvar='RQ_CONNECTION_CLASS',
|
|
|
|
|
default=DEFAULT_CONNECTION_CLASS,
|
|
|
|
|
help='Redis client class to use'),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def pass_cli_config(func):
|
|
|
|
|
# add all the shared options to the command
|
|
|
|
|
for option in shared_options:
|
|
|
|
|
func = option(func)
|
|
|
|
|
|
|
|
|
|
# pass the cli config object into the command
|
|
|
|
|
def wrapper(*args, **kwargs):
|
|
|
|
|
ctx = click.get_current_context()
|
|
|
|
|
cli_config = CliConfig(**kwargs)
|
|
|
|
|
return ctx.invoke(func, cli_config, *args[1:], **kwargs)
|
|
|
|
|
|
|
|
|
|
return update_wrapper(wrapper, func)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@click.group()
|
|
|
|
|
@click.version_option(version)
|
|
|
|
|
def main():
|
|
|
|
|
"""RQ command line tool."""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
|
|
|
|
|
@click.argument('queues', nargs=-1)
|
|
|
|
|
def empty(url, all, queues):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def empty(cli_config, all, queues, **options):
|
|
|
|
|
"""Empty given queues."""
|
|
|
|
|
conn = connect(url)
|
|
|
|
|
|
|
|
|
|
if all:
|
|
|
|
|
queues = Queue.all(connection=conn)
|
|
|
|
|
queues = cli_config.queue_class.all(connection=cli_config.connection,
|
|
|
|
|
job_class=cli_config.job_class)
|
|
|
|
|
else:
|
|
|
|
|
queues = [Queue(queue, connection=conn) for queue in queues]
|
|
|
|
|
queues = [cli_config.queue_class(queue,
|
|
|
|
|
connection=cli_config.connection,
|
|
|
|
|
job_class=cli_config.job_class)
|
|
|
|
|
for queue in queues]
|
|
|
|
|
|
|
|
|
|
if not queues:
|
|
|
|
|
click.echo('Nothing to do')
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
for queue in queues:
|
|
|
|
|
num_jobs = queue.empty()
|
|
|
|
@ -73,13 +104,14 @@ def empty(url, all, queues):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
|
|
|
|
|
@click.argument('job_ids', nargs=-1)
|
|
|
|
|
def requeue(url, all, job_ids):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def requeue(cli_config, all, job_class, job_ids, **options):
|
|
|
|
|
"""Requeue failed jobs."""
|
|
|
|
|
conn = connect(url)
|
|
|
|
|
failed_queue = get_failed_queue(connection=conn)
|
|
|
|
|
|
|
|
|
|
failed_queue = get_failed_queue(connection=cli_config.connection,
|
|
|
|
|
job_class=cli_config.job_class)
|
|
|
|
|
|
|
|
|
|
if all:
|
|
|
|
|
job_ids = failed_queue.job_ids
|
|
|
|
@ -102,8 +134,6 @@ def requeue(url, all, job_ids):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@config_option
|
|
|
|
|
@click.option('--path', '-P', default='.', help='Specify the import path.')
|
|
|
|
|
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
|
|
|
|
|
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
|
|
|
|
@ -111,7 +141,9 @@ def requeue(url, all, job_ids):
|
|
|
|
|
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
|
|
|
|
|
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
|
|
|
|
|
@click.argument('queues', nargs=-1)
|
|
|
|
|
def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, queues):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def info(cli_config, path, interval, raw, only_queues, only_workers, by_queue, queues,
|
|
|
|
|
**options):
|
|
|
|
|
"""RQ command-line monitor."""
|
|
|
|
|
|
|
|
|
|
if path:
|
|
|
|
@ -125,8 +157,9 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
|
|
|
|
|
func = show_both
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
with Connection(connect(url, config)):
|
|
|
|
|
refresh(interval, func, queues, raw, by_queue)
|
|
|
|
|
with Connection(cli_config.connection):
|
|
|
|
|
refresh(interval, func, queues, raw, by_queue,
|
|
|
|
|
cli_config.queue_class, cli_config.worker_class)
|
|
|
|
|
except ConnectionError as e:
|
|
|
|
|
click.echo(e)
|
|
|
|
|
sys.exit(1)
|
|
|
|
@ -136,14 +169,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@config_option
|
|
|
|
|
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
|
|
|
|
|
@click.option('--name', '-n', help='Specify a different name')
|
|
|
|
|
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
|
|
|
|
|
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
|
|
|
|
|
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
|
|
|
|
|
@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use')
|
|
|
|
|
@click.option('--path', '-P', default='.', help='Specify the import path.')
|
|
|
|
|
@click.option('--results-ttl', type=int, help='Default results timeout to be used')
|
|
|
|
|
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
|
|
|
|
@ -153,14 +180,16 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
|
|
|
|
|
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
|
|
|
|
|
@click.option('--pid', help='Write the process ID number to a file at the specified path')
|
|
|
|
|
@click.argument('queues', nargs=-1)
|
|
|
|
|
def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl,
|
|
|
|
|
worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def worker(cli_config, burst, name, path, results_ttl,
|
|
|
|
|
worker_ttl, verbose, quiet, sentry_dsn, exception_handler,
|
|
|
|
|
pid, queues, **options):
|
|
|
|
|
"""Starts an RQ worker."""
|
|
|
|
|
|
|
|
|
|
if path:
|
|
|
|
|
sys.path = path.split(':') + sys.path
|
|
|
|
|
|
|
|
|
|
settings = read_config_file(config) if config else {}
|
|
|
|
|
settings = read_config_file(cli_config.config) if cli_config.config else {}
|
|
|
|
|
# Worker specific default arguments
|
|
|
|
|
queues = queues or settings.get('QUEUES', ['default'])
|
|
|
|
|
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
|
|
|
|
@ -171,30 +200,29 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, conne
|
|
|
|
|
|
|
|
|
|
setup_loghandlers_from_args(verbose, quiet)
|
|
|
|
|
|
|
|
|
|
connection_class = import_attribute(connection_class)
|
|
|
|
|
conn = connect(url, config, connection_class)
|
|
|
|
|
cleanup_ghosts(conn)
|
|
|
|
|
worker_class = import_attribute(worker_class)
|
|
|
|
|
queue_class = import_attribute(queue_class)
|
|
|
|
|
exception_handlers = []
|
|
|
|
|
for h in exception_handler:
|
|
|
|
|
exception_handlers.append(import_attribute(h))
|
|
|
|
|
|
|
|
|
|
if is_suspended(conn):
|
|
|
|
|
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
queues = [queue_class(queue, connection=conn) for queue in queues]
|
|
|
|
|
w = worker_class(queues,
|
|
|
|
|
name=name,
|
|
|
|
|
connection=conn,
|
|
|
|
|
default_worker_ttl=worker_ttl,
|
|
|
|
|
default_result_ttl=results_ttl,
|
|
|
|
|
job_class=job_class,
|
|
|
|
|
queue_class=queue_class,
|
|
|
|
|
exception_handlers=exception_handlers or None)
|
|
|
|
|
cleanup_ghosts(cli_config.connection)
|
|
|
|
|
exception_handlers = []
|
|
|
|
|
for h in exception_handler:
|
|
|
|
|
exception_handlers.append(import_attribute(h))
|
|
|
|
|
|
|
|
|
|
if is_suspended(cli_config.connection):
|
|
|
|
|
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
queues = [cli_config.queue_class(queue,
|
|
|
|
|
connection=cli_config.connection,
|
|
|
|
|
job_class=cli_config.job_class)
|
|
|
|
|
for queue in queues]
|
|
|
|
|
worker = cli_config.worker_class(queues,
|
|
|
|
|
name=name,
|
|
|
|
|
connection=cli_config.connection,
|
|
|
|
|
default_worker_ttl=worker_ttl,
|
|
|
|
|
default_result_ttl=results_ttl,
|
|
|
|
|
job_class=cli_config.job_class,
|
|
|
|
|
queue_class=cli_config.queue_class,
|
|
|
|
|
exception_handlers=exception_handlers or None)
|
|
|
|
|
|
|
|
|
|
# Should we configure Sentry?
|
|
|
|
|
if sentry_dsn:
|
|
|
|
@ -202,26 +230,25 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, conne
|
|
|
|
|
from raven.transport.http import HTTPTransport
|
|
|
|
|
from rq.contrib.sentry import register_sentry
|
|
|
|
|
client = Client(sentry_dsn, transport=HTTPTransport)
|
|
|
|
|
register_sentry(client, w)
|
|
|
|
|
register_sentry(client, worker)
|
|
|
|
|
|
|
|
|
|
w.work(burst=burst)
|
|
|
|
|
worker.work(burst=burst)
|
|
|
|
|
except ConnectionError as e:
|
|
|
|
|
print(e)
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@config_option
|
|
|
|
|
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
|
|
|
|
|
def suspend(url, config, duration):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def suspend(cli_config, duration, **options):
|
|
|
|
|
"""Suspends all workers, to resume run `rq resume`"""
|
|
|
|
|
|
|
|
|
|
if duration is not None and duration < 1:
|
|
|
|
|
click.echo("Duration must be an integer greater than 1")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
connection = connect(url, config)
|
|
|
|
|
connection_suspend(connection, duration)
|
|
|
|
|
connection_suspend(cli_config.connection, duration)
|
|
|
|
|
|
|
|
|
|
if duration:
|
|
|
|
|
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
|
|
|
|
@ -232,10 +259,8 @@ def suspend(url, config, duration):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@main.command()
|
|
|
|
|
@url_option
|
|
|
|
|
@config_option
|
|
|
|
|
def resume(url, config):
|
|
|
|
|
@pass_cli_config
|
|
|
|
|
def resume(cli_config, **options):
|
|
|
|
|
"""Resumes processing of queues, that where suspended with `rq suspend`"""
|
|
|
|
|
connection = connect(url, config)
|
|
|
|
|
connection_resume(connection)
|
|
|
|
|
connection_resume(cli_config.connection)
|
|
|
|
|
click.echo("Resuming workers.")
|
|
|
|
|