Restructure new CLI modules.

A few things have changed.  First of all, there is no separate copy of
the argparse-based `rqinfo` anymore.  It now fully utilizes the new
Click subcommand.  In other words: `rqinfo` and `rq info` both invoke
the same function under the hood.

In order to support this, the main command group now does NOT take
a `url` option and initializes the connection.  Besides supporting this
alias pattern, this change was useful for two more reasons: (1) it
allows us to add subcommands that don't need the Redis server running in
the future, and (2) it makes the `--url` option an option underneath
each subcommand.  This avoids command invocations that look like this:

    $ rq --url <url> info --more --flags

And instead allows us to pass the URL to each subcommand where it's
deemed necessary:

    $ rq info --url <url> --more --flags

Which is much friendlier to use/remember.
main
Vincent Driessen 10 years ago
parent 652cd71d2b
commit b5fbc3992b

@ -0,0 +1,6 @@
# flake8: noqa
from .cli import main
# TODO: the following imports can be removed when we drop the `rqinfo` and
# `rqworkers` commands in favor of just shipping the `rq` command.
from .cli import info

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
RQ command line tool
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import sys
import click
from redis import StrictRedis
from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from .helpers import refresh, show_both, show_queues, show_workers
url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0',
help='URL describing Redis connection details.')
def connect(url):
return StrictRedis.from_url(url)
@click.group()
def main():
"""RQ command line tool."""
pass
@main.command()
@url_option
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, url, all, queues):
"""Empty given queues."""
conn = connect(url)
if all:
queues = Queue.all(connection=conn)
else:
queues = [Queue(queue, connection=conn) for queue in queues]
if not queues:
click.echo('Nothing to do')
for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))
@main.command()
@url_option
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
@click.pass_context
def requeue(ctx, url, all, job_ids):
"""Requeue failed jobs."""
conn = connect(url)
failed_queue = get_failed_queue(connection=conn)
if all:
job_ids = failed_queue.job_ids
if not job_ids:
click.echo('Nothing to do')
sys.exit(0)
click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids)))
fail_count = 0
with click.progressbar(job_ids) as job_ids:
for job_id in job_ids:
try:
failed_queue.requeue(job_id)
except InvalidJobOperationError:
fail_count += 1
if fail_count > 0:
click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red')
@main.command()
@url_option
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=None, help='Updates stats every N seconds (default: don\'t poll)') # noqa
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa
@click.argument('queues', nargs=-1)
@click.pass_context
def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor."""
if path:
sys.path = path.split(':') + sys.path
if only_queues:
func = show_queues
elif only_workers:
func = show_workers
else:
func = show_both
try:
with Connection(connect(url)):
refresh(interval, func, queues, raw, by_queue)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
except KeyboardInterrupt:
click.echo()
sys.exit(0)

@ -0,0 +1,143 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import time
from functools import partial
import click
from rq import Queue, Worker
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow')
def pad(s, pad_to_length):
"""Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,)
def get_scale(x):
"""Finds the lowest scale where x <= scale."""
scales = [20, 50, 100, 200, 400, 600, 800, 1000]
for scale in scales:
if x <= scale:
return scale
return x
def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
}
try:
return symbols[state]
except KeyError:
return state
def show_queues(queues, raw, by_queue):
if queues:
qs = list(map(Queue, queues))
else:
qs = Queue.all()
num_jobs = 0
termwidth, _ = click.get_terminal_size()
chartwidth = min(20, termwidth - 20)
max_count = 0
counts = dict()
for q in qs:
count = q.count
counts[q] = count
max_count = max(max_count, count)
scale = get_scale(max_count)
ratio = chartwidth * 1.0 / scale
for q in qs:
count = counts[q]
if not raw:
chart = green('|' + '' * int(ratio * count))
line = '%-12s %s %d' % (q.name, chart, count)
else:
line = 'queue %s %d' % (q.name, count)
click.echo(line)
num_jobs += count
# print summary when not in raw mode
if not raw:
click.echo('%d queues, %d jobs total' % (len(qs), num_jobs))
def show_workers(queues, raw, by_queue):
if queues:
qs = list(map(Queue, queues))
def any_matching_queue(worker):
def queue_matches(q):
return q in qs
return any(map(queue_matches, worker.queues))
# Filter out workers that don't match the queue filter
ws = [w for w in Worker.all() if any_matching_queue(w)]
def filter_queues(queue_names):
return [qname for qname in queue_names if Queue(qname) in qs]
else:
qs = Queue.all()
ws = Worker.all()
filter_queues = lambda x: x
if not by_queue:
for w in ws:
worker_queues = filter_queues(w.queue_names())
if not raw:
click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues)))
else:
click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues)))
else:
# Create reverse lookup table
queues = dict([(q, []) for q in qs])
for w in ws:
for q in w.queues:
if q not in queues:
continue
queues[q].append(w)
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0
for q in queues:
if queues[q]:
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa
else:
queues_str = ''
click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
if not raw:
click.echo('%d workers, %d queues' % (len(ws), len(qs)))
def show_both(queues, raw, by_queue):
show_queues(queues, raw, by_queue)
if not raw:
click.echo('')
show_workers(queues, raw, by_queue)
if not raw:
click.echo('')
import datetime
click.echo('Updated: %s' % datetime.datetime.now())
def refresh(interval, func, *args):
while True:
if interval:
click.clear()
func(*args)
if interval:
time.sleep(interval)
else:
break

@ -1,80 +0,0 @@
# -*- coding: utf-8 -*-
"""
RQ command line tool
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import sys
import click
import redis
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from .rqinfo import info
@click.group()
@click.option('--url', '-u', envvar='URL', help='URL describing Redis connection details.')
@click.pass_context
def main(ctx, url):
"""RQ CLI"""
if url is None:
url = "redis://localhost:6379/0"
redis_conn = redis.from_url(url)
ctx.obj = {}
ctx.obj['connection'] = redis_conn
@main.command()
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, all, queues):
"""Empty given queues."""
conn = ctx.obj['connection']
if all:
queues = Queue.all(connection=conn)
else:
queues = [Queue(queue, connection=conn) for queue in queues]
if not queues:
click.echo('Nothing to do')
for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))
@main.command()
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
@click.pass_context
def requeue(ctx, all, job_ids):
"""Requeue failed jobs."""
conn = ctx.obj['connection']
failed_queue = get_failed_queue(connection=conn)
if all:
job_ids = failed_queue.job_ids
if not job_ids:
click.echo('Nothing to do')
sys.exit(0)
click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids)))
fail_count = 0
with click.progressbar(job_ids) as job_ids:
for job_id in job_ids:
try:
failed_queue.requeue(job_id)
except InvalidJobOperationError:
fail_count += 1
if fail_count > 0:
click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red')
main.add_command(info)

@ -1,239 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import sys
import time
from functools import partial
import click
from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, Queue, Worker
from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis)
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow')
def pad(s, pad_to_length):
"""Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,)
def get_scale(x):
"""Finds the lowest scale where x <= scale."""
scales = [20, 50, 100, 200, 400, 600, 800, 1000]
for scale in scales:
if x <= scale:
return scale
return x
def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
}
try:
return symbols[state]
except KeyError:
return state
def show_queues(queues, raw, by_queue):
if queues:
qs = list(map(Queue, queues))
else:
qs = Queue.all()
num_jobs = 0
termwidth, _ = click.get_terminal_size()
chartwidth = min(20, termwidth - 20)
max_count = 0
counts = dict()
for q in qs:
count = q.count
counts[q] = count
max_count = max(max_count, count)
scale = get_scale(max_count)
ratio = chartwidth * 1.0 / scale
for q in qs:
count = counts[q]
if not raw:
chart = green('|' + '' * int(ratio * count))
line = '%-12s %s %d' % (q.name, chart, count)
else:
line = 'queue %s %d' % (q.name, count)
click.echo(line)
num_jobs += count
# print summary when not in raw mode
if not raw:
click.echo('%d queues, %d jobs total' % (len(qs), num_jobs))
def show_workers(queues, raw, by_queue):
if queues:
qs = list(map(Queue, queues))
def any_matching_queue(worker):
def queue_matches(q):
return q in qs
return any(map(queue_matches, worker.queues))
# Filter out workers that don't match the queue filter
ws = [w for w in Worker.all() if any_matching_queue(w)]
def filter_queues(queue_names):
return [qname for qname in queue_names if Queue(qname) in qs]
else:
qs = Queue.all()
ws = Worker.all()
filter_queues = lambda x: x
if not by_queue:
for w in ws:
worker_queues = filter_queues(w.queue_names())
if not raw:
click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues)))
else:
click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues)))
else:
# Create reverse lookup table
queues = dict([(q, []) for q in qs])
for w in ws:
for q in w.queues:
if q not in queues:
continue
queues[q].append(w)
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0
for q in queues:
if queues[q]:
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa
else:
queues_str = ''
click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
if not raw:
click.echo('%d workers, %d queues' % (len(ws), len(qs)))
def show_both(queues, raw, by_queue):
show_queues(queues, raw, by_queue)
if not raw:
click.echo('')
show_workers(queues, raw, by_queue)
if not raw:
click.echo('')
import datetime
click.echo('Updated: %s' % datetime.datetime.now())
def refresh(val, func, *args):
while True:
if val:
click.clear()
func(*args)
if val:
time.sleep(val)
else:
break
@click.command()
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa
@click.argument('queues', nargs=-1)
@click.pass_context
def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor."""
if path:
sys.path = path.split(':') + sys.path
conn = ctx.obj['connection']
try:
if only_queues:
func = show_queues
elif only_workers:
func = show_workers
else:
func = show_both
with Connection(conn):
refresh(interval, func, queues, raw, by_queue)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
except KeyboardInterrupt:
click.echo()
sys.exit(0)
# TODO: The following code is for backward compatibility, should be removed in future
def parse_args():
parser = argparse.ArgumentParser(description='RQ command-line monitor.')
add_standard_arguments(parser)
parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa
parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa
parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa
parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa
parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa
parser.add_argument('queues', nargs='*', help='The queues to poll')
return parser.parse_args()
def main():
# warn users this command is deprecated, use `rq info`
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn("This command will be removed in future, "
"use `rq info` instead", DeprecationWarning)
args = parse_args()
if args.path:
sys.path = args.path.split(':') + sys.path
settings = {}
if args.config:
settings = read_config_file(args.config)
setup_default_arguments(args, settings)
setup_redis(args)
try:
if args.empty_failed_queue:
num_jobs = get_failed_queue().empty()
print('{} jobs removed from failed queue'.format(num_jobs))
else:
if args.only_queues:
func = show_queues
elif args.only_workers:
func = show_workers
else:
func = show_both
refresh(args.interval, func, args.queues, args.raw, args.by_queue)
except ConnectionError as e:
print(e)
sys.exit(1)
except KeyboardInterrupt:
print()
sys.exit(0)

@ -41,12 +41,16 @@ setup(
zip_safe=False,
platforms='any',
install_requires=get_dependencies(),
entry_points='''\
[console_scripts]
rq = rq.scripts.rq_cli:main
rqworker = rq.scripts.rqworker:main
rqinfo = rq.scripts.rqinfo:main
''',
entry_points={
'console_scripts': [
'rq = rq.cli:main',
# NOTE: rqworker/rqinfo are kept for backward-compatibility,
# remove eventually (TODO)
'rqinfo = rq.cli:info',
'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand
],
},
classifiers=[
# As from http://pypi.python.org/pypi?%3Aaction=list_classifiers
#'Development Status :: 1 - Planning',

Loading…
Cancel
Save