diff --git a/rq/cli/__init__.py b/rq/cli/__init__.py new file mode 100644 index 0000000..88a454b --- /dev/null +++ b/rq/cli/__init__.py @@ -0,0 +1,6 @@ +# flake8: noqa +from .cli import main + +# TODO: the following imports can be removed when we drop the `rqinfo` and +# `rqworkers` commands in favor of just shipping the `rq` command. +from .cli import info diff --git a/rq/cli/cli.py b/rq/cli/cli.py new file mode 100755 index 0000000..a3cdc1b --- /dev/null +++ b/rq/cli/cli.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +""" +RQ command line tool +""" +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import sys + +import click +from redis import StrictRedis +from redis.exceptions import ConnectionError +from rq import Connection, get_failed_queue, Queue +from rq.exceptions import InvalidJobOperationError + +from .helpers import refresh, show_both, show_queues, show_workers + +url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0', + help='URL describing Redis connection details.') + + +def connect(url): + return StrictRedis.from_url(url) + + +@click.group() +def main(): + """RQ command line tool.""" + pass + + +@main.command() +@url_option +@click.option('--all', '-a', is_flag=True, help='Empty all queues') +@click.argument('queues', nargs=-1) +@click.pass_context +def empty(ctx, url, all, queues): + """Empty given queues.""" + conn = connect(url) + + if all: + queues = Queue.all(connection=conn) + else: + queues = [Queue(queue, connection=conn) for queue in queues] + + if not queues: + click.echo('Nothing to do') + + for queue in queues: + num_jobs = queue.empty() + click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name)) + + +@main.command() +@url_option +@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') +@click.argument('job_ids', nargs=-1) +@click.pass_context +def requeue(ctx, url, all, job_ids): + """Requeue failed jobs.""" + conn = connect(url) + failed_queue = get_failed_queue(connection=conn) + + if all: + job_ids = failed_queue.job_ids + + if not job_ids: + click.echo('Nothing to do') + sys.exit(0) + + click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids))) + fail_count = 0 + with click.progressbar(job_ids) as job_ids: + for job_id in job_ids: + try: + failed_queue.requeue(job_id) + except InvalidJobOperationError: + fail_count += 1 + + if fail_count > 0: + click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red') + + +@main.command() +@url_option +@click.option('--path', '-P', default='.', help='Specify the import path.') +@click.option('--interval', '-i', default=None, help='Updates stats every N seconds (default: don\'t poll)') # noqa +@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa +@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa +@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa +@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa +@click.argument('queues', nargs=-1) +@click.pass_context +def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, queues): + """RQ command-line monitor.""" + + if path: + sys.path = path.split(':') + sys.path + + if only_queues: + func = show_queues + elif only_workers: + func = show_workers + else: + func = show_both + + try: + with Connection(connect(url)): + refresh(interval, func, queues, raw, by_queue) + except ConnectionError as e: + click.echo(e) + sys.exit(1) + except KeyboardInterrupt: + click.echo() + sys.exit(0) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py new file mode 100644 index 0000000..c63fe5c --- /dev/null +++ b/rq/cli/helpers.py @@ -0,0 +1,143 @@ +# -*- coding: utf-8 -*- +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import time +from functools import partial + +import click +from rq import Queue, Worker + +red = partial(click.style, fg='red') +green = partial(click.style, fg='green') +yellow = partial(click.style, fg='yellow') + + +def pad(s, pad_to_length): + """Pads the given string to the given length.""" + return ('%-' + '%ds' % pad_to_length) % (s,) + + +def get_scale(x): + """Finds the lowest scale where x <= scale.""" + scales = [20, 50, 100, 200, 400, 600, 800, 1000] + for scale in scales: + if x <= scale: + return scale + return x + + +def state_symbol(state): + symbols = { + 'busy': red('busy'), + 'idle': green('idle'), + } + try: + return symbols[state] + except KeyError: + return state + + +def show_queues(queues, raw, by_queue): + if queues: + qs = list(map(Queue, queues)) + else: + qs = Queue.all() + + num_jobs = 0 + termwidth, _ = click.get_terminal_size() + chartwidth = min(20, termwidth - 20) + + max_count = 0 + counts = dict() + for q in qs: + count = q.count + counts[q] = count + max_count = max(max_count, count) + scale = get_scale(max_count) + ratio = chartwidth * 1.0 / scale + + for q in qs: + count = counts[q] + if not raw: + chart = green('|' + '█' * int(ratio * count)) + line = '%-12s %s %d' % (q.name, chart, count) + else: + line = 'queue %s %d' % (q.name, count) + click.echo(line) + + num_jobs += count + + # print summary when not in raw mode + if not raw: + click.echo('%d queues, %d jobs total' % (len(qs), num_jobs)) + + +def show_workers(queues, raw, by_queue): + if queues: + qs = list(map(Queue, queues)) + + def any_matching_queue(worker): + def queue_matches(q): + return q in qs + return any(map(queue_matches, worker.queues)) + + # Filter out workers that don't match the queue filter + ws = [w for w in Worker.all() if any_matching_queue(w)] + + def filter_queues(queue_names): + return [qname for qname in queue_names if Queue(qname) in qs] + + else: + qs = Queue.all() + ws = Worker.all() + filter_queues = lambda x: x + + if not by_queue: + for w in ws: + worker_queues = filter_queues(w.queue_names()) + if not raw: + click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) + else: + click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) + else: + # Create reverse lookup table + queues = dict([(q, []) for q in qs]) + for w in ws: + for q in w.queues: + if q not in queues: + continue + queues[q].append(w) + + max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 + for q in queues: + if queues[q]: + queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa + else: + queues_str = '–' + click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) + + if not raw: + click.echo('%d workers, %d queues' % (len(ws), len(qs))) + + +def show_both(queues, raw, by_queue): + show_queues(queues, raw, by_queue) + if not raw: + click.echo('') + show_workers(queues, raw, by_queue) + if not raw: + click.echo('') + import datetime + click.echo('Updated: %s' % datetime.datetime.now()) + + +def refresh(interval, func, *args): + while True: + if interval: + click.clear() + func(*args) + if interval: + time.sleep(interval) + else: + break diff --git a/rq/scripts/rq_cli.py b/rq/scripts/rq_cli.py deleted file mode 100755 index 8e0490a..0000000 --- a/rq/scripts/rq_cli.py +++ /dev/null @@ -1,80 +0,0 @@ -# -*- coding: utf-8 -*- -""" -RQ command line tool -""" -from __future__ import (absolute_import, division, print_function, - unicode_literals) - -import sys - -import click -import redis -from rq import get_failed_queue, Queue -from rq.exceptions import InvalidJobOperationError - -from .rqinfo import info - - -@click.group() -@click.option('--url', '-u', envvar='URL', help='URL describing Redis connection details.') -@click.pass_context -def main(ctx, url): - """RQ CLI""" - if url is None: - url = "redis://localhost:6379/0" - redis_conn = redis.from_url(url) - - ctx.obj = {} - ctx.obj['connection'] = redis_conn - - -@main.command() -@click.option('--all', '-a', is_flag=True, help='Empty all queues') -@click.argument('queues', nargs=-1) -@click.pass_context -def empty(ctx, all, queues): - """Empty given queues.""" - conn = ctx.obj['connection'] - if all: - queues = Queue.all(connection=conn) - else: - queues = [Queue(queue, connection=conn) for queue in queues] - - if not queues: - click.echo('Nothing to do') - - for queue in queues: - num_jobs = queue.empty() - click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name)) - - -@main.command() -@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') -@click.argument('job_ids', nargs=-1) -@click.pass_context -def requeue(ctx, all, job_ids): - """Requeue failed jobs.""" - conn = ctx.obj['connection'] - failed_queue = get_failed_queue(connection=conn) - - if all: - job_ids = failed_queue.job_ids - - if not job_ids: - click.echo('Nothing to do') - sys.exit(0) - - click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids))) - fail_count = 0 - with click.progressbar(job_ids) as job_ids: - for job_id in job_ids: - try: - failed_queue.requeue(job_id) - except InvalidJobOperationError: - fail_count += 1 - - if fail_count > 0: - click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red') - - -main.add_command(info) diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py deleted file mode 100755 index 16c0a32..0000000 --- a/rq/scripts/rqinfo.py +++ /dev/null @@ -1,239 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -from __future__ import (absolute_import, division, print_function, - unicode_literals) - -import argparse -import sys -import time -from functools import partial - -import click -from redis.exceptions import ConnectionError -from rq import Connection, get_failed_queue, Queue, Worker -from rq.scripts import (add_standard_arguments, read_config_file, - setup_default_arguments, setup_redis) - -red = partial(click.style, fg='red') -green = partial(click.style, fg='green') -yellow = partial(click.style, fg='yellow') - - -def pad(s, pad_to_length): - """Pads the given string to the given length.""" - return ('%-' + '%ds' % pad_to_length) % (s,) - - -def get_scale(x): - """Finds the lowest scale where x <= scale.""" - scales = [20, 50, 100, 200, 400, 600, 800, 1000] - for scale in scales: - if x <= scale: - return scale - return x - - -def state_symbol(state): - symbols = { - 'busy': red('busy'), - 'idle': green('idle'), - } - try: - return symbols[state] - except KeyError: - return state - - -def show_queues(queues, raw, by_queue): - if queues: - qs = list(map(Queue, queues)) - else: - qs = Queue.all() - - num_jobs = 0 - termwidth, _ = click.get_terminal_size() - chartwidth = min(20, termwidth - 20) - - max_count = 0 - counts = dict() - for q in qs: - count = q.count - counts[q] = count - max_count = max(max_count, count) - scale = get_scale(max_count) - ratio = chartwidth * 1.0 / scale - - for q in qs: - count = counts[q] - if not raw: - chart = green('|' + '█' * int(ratio * count)) - line = '%-12s %s %d' % (q.name, chart, count) - else: - line = 'queue %s %d' % (q.name, count) - click.echo(line) - - num_jobs += count - - # print summary when not in raw mode - if not raw: - click.echo('%d queues, %d jobs total' % (len(qs), num_jobs)) - - -def show_workers(queues, raw, by_queue): - if queues: - qs = list(map(Queue, queues)) - - def any_matching_queue(worker): - def queue_matches(q): - return q in qs - return any(map(queue_matches, worker.queues)) - - # Filter out workers that don't match the queue filter - ws = [w for w in Worker.all() if any_matching_queue(w)] - - def filter_queues(queue_names): - return [qname for qname in queue_names if Queue(qname) in qs] - - else: - qs = Queue.all() - ws = Worker.all() - filter_queues = lambda x: x - - if not by_queue: - for w in ws: - worker_queues = filter_queues(w.queue_names()) - if not raw: - click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) - else: - click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) - else: - # Create reverse lookup table - queues = dict([(q, []) for q in qs]) - for w in ws: - for q in w.queues: - if q not in queues: - continue - queues[q].append(w) - - max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 - for q in queues: - if queues[q]: - queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa - else: - queues_str = '–' - click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) - - if not raw: - click.echo('%d workers, %d queues' % (len(ws), len(qs))) - - -def show_both(queues, raw, by_queue): - show_queues(queues, raw, by_queue) - if not raw: - click.echo('') - show_workers(queues, raw, by_queue) - if not raw: - click.echo('') - import datetime - click.echo('Updated: %s' % datetime.datetime.now()) - - -def refresh(val, func, *args): - while True: - if val: - click.clear() - func(*args) - if val: - time.sleep(val) - else: - break - - -@click.command() -@click.option('--path', '-P', default='.', help='Specify the import path.') -@click.option('--interval', '-i', default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa -@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa -@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa -@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa -@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa -@click.argument('queues', nargs=-1) -@click.pass_context -def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues): - """RQ command-line monitor.""" - - if path: - sys.path = path.split(':') + sys.path - - conn = ctx.obj['connection'] - try: - if only_queues: - func = show_queues - elif only_workers: - func = show_workers - else: - func = show_both - - with Connection(conn): - refresh(interval, func, queues, raw, by_queue) - except ConnectionError as e: - click.echo(e) - sys.exit(1) - except KeyboardInterrupt: - click.echo() - sys.exit(0) - - -# TODO: The following code is for backward compatibility, should be removed in future -def parse_args(): - parser = argparse.ArgumentParser(description='RQ command-line monitor.') - add_standard_arguments(parser) - parser.add_argument('--path', '-P', default='.', help='Specify the import path.') - parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa - parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa - parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa - parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa - parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa - parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa - parser.add_argument('queues', nargs='*', help='The queues to poll') - return parser.parse_args() - - -def main(): - # warn users this command is deprecated, use `rq info` - import warnings - warnings.simplefilter('always', DeprecationWarning) - warnings.warn("This command will be removed in future, " - "use `rq info` instead", DeprecationWarning) - - args = parse_args() - - if args.path: - sys.path = args.path.split(':') + sys.path - - settings = {} - if args.config: - settings = read_config_file(args.config) - - setup_default_arguments(args, settings) - - setup_redis(args) - - try: - if args.empty_failed_queue: - num_jobs = get_failed_queue().empty() - print('{} jobs removed from failed queue'.format(num_jobs)) - else: - if args.only_queues: - func = show_queues - elif args.only_workers: - func = show_workers - else: - func = show_both - - refresh(args.interval, func, args.queues, args.raw, args.by_queue) - except ConnectionError as e: - print(e) - sys.exit(1) - except KeyboardInterrupt: - print() - sys.exit(0) diff --git a/setup.py b/setup.py index 1d84412..6e1d216 100644 --- a/setup.py +++ b/setup.py @@ -41,12 +41,16 @@ setup( zip_safe=False, platforms='any', install_requires=get_dependencies(), - entry_points='''\ - [console_scripts] - rq = rq.scripts.rq_cli:main - rqworker = rq.scripts.rqworker:main - rqinfo = rq.scripts.rqinfo:main - ''', + entry_points={ + 'console_scripts': [ + 'rq = rq.cli:main', + + # NOTE: rqworker/rqinfo are kept for backward-compatibility, + # remove eventually (TODO) + 'rqinfo = rq.cli:info', + 'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand + ], + }, classifiers=[ # As from http://pypi.python.org/pypi?%3Aaction=list_classifiers #'Development Status :: 1 - Planning',