diff --git a/requirements.txt b/requirements.txt index 7800f0f..539b9a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ redis +click diff --git a/rq/queue.py b/rq/queue.py index 22aa160..86f0406 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -138,10 +138,10 @@ class Queue(object): def remove(self, job_or_id, pipeline=None): """Removes Job from queue, accepts either a Job instance or ID.""" job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id - + if pipeline is not None: pipeline.lrem(self.key, 0, job_id) - + return self.connection._lrem(self.key, 0, job_id) def compact(self): diff --git a/rq/scripts/rq_cli.py b/rq/scripts/rq_cli.py new file mode 100755 index 0000000..8e0490a --- /dev/null +++ b/rq/scripts/rq_cli.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +""" +RQ command line tool +""" +from __future__ import (absolute_import, division, print_function, + unicode_literals) + +import sys + +import click +import redis +from rq import get_failed_queue, Queue +from rq.exceptions import InvalidJobOperationError + +from .rqinfo import info + + +@click.group() +@click.option('--url', '-u', envvar='URL', help='URL describing Redis connection details.') +@click.pass_context +def main(ctx, url): + """RQ CLI""" + if url is None: + url = "redis://localhost:6379/0" + redis_conn = redis.from_url(url) + + ctx.obj = {} + ctx.obj['connection'] = redis_conn + + +@main.command() +@click.option('--all', '-a', is_flag=True, help='Empty all queues') +@click.argument('queues', nargs=-1) +@click.pass_context +def empty(ctx, all, queues): + """Empty given queues.""" + conn = ctx.obj['connection'] + if all: + queues = Queue.all(connection=conn) + else: + queues = [Queue(queue, connection=conn) for queue in queues] + + if not queues: + click.echo('Nothing to do') + + for queue in queues: + num_jobs = queue.empty() + click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name)) + + +@main.command() +@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') +@click.argument('job_ids', nargs=-1) +@click.pass_context +def requeue(ctx, all, job_ids): + """Requeue failed jobs.""" + conn = ctx.obj['connection'] + failed_queue = get_failed_queue(connection=conn) + + if all: + job_ids = failed_queue.job_ids + + if not job_ids: + click.echo('Nothing to do') + sys.exit(0) + + click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids))) + fail_count = 0 + with click.progressbar(job_ids) as job_ids: + for job_id in job_ids: + try: + failed_queue.requeue(job_id) + except InvalidJobOperationError: + fail_count += 1 + + if fail_count > 0: + click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red') + + +main.add_command(info) diff --git a/rq/scripts/rqinfo.py b/rq/scripts/rqinfo.py index 48d318a..16c0a32 100755 --- a/rq/scripts/rqinfo.py +++ b/rq/scripts/rqinfo.py @@ -4,19 +4,19 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import argparse -import os import sys import time +from functools import partial +import click from redis.exceptions import ConnectionError -from rq import get_failed_queue, Queue, Worker +from rq import Connection, get_failed_queue, Queue, Worker from rq.scripts import (add_standard_arguments, read_config_file, setup_default_arguments, setup_redis) -from rq.utils import gettermsize, make_colorizer -red = make_colorizer('darkred') -green = make_colorizer('darkgreen') -yellow = make_colorizer('darkyellow') +red = partial(click.style, fg='red') +green = partial(click.style, fg='green') +yellow = partial(click.style, fg='yellow') def pad(s, pad_to_length): @@ -44,14 +44,14 @@ def state_symbol(state): return state -def show_queues(args): - if len(args.queues): - qs = list(map(Queue, args.queues)) +def show_queues(queues, raw, by_queue): + if queues: + qs = list(map(Queue, queues)) else: qs = Queue.all() num_jobs = 0 - termwidth, _ = gettermsize() + termwidth, _ = click.get_terminal_size() chartwidth = min(20, termwidth - 20) max_count = 0 @@ -65,23 +65,23 @@ def show_queues(args): for q in qs: count = counts[q] - if not args.raw: + if not raw: chart = green('|' + '█' * int(ratio * count)) line = '%-12s %s %d' % (q.name, chart, count) else: line = 'queue %s %d' % (q.name, count) - print(line) + click.echo(line) num_jobs += count - # Print summary when not in raw mode - if not args.raw: - print('%d queues, %d jobs total' % (len(qs), num_jobs)) + # print summary when not in raw mode + if not raw: + click.echo('%d queues, %d jobs total' % (len(qs), num_jobs)) -def show_workers(args): - if len(args.queues): - qs = list(map(Queue, args.queues)) +def show_workers(queues, raw, by_queue): + if queues: + qs = list(map(Queue, queues)) def any_matching_queue(worker): def queue_matches(q): @@ -99,13 +99,13 @@ def show_workers(args): ws = Worker.all() filter_queues = lambda x: x - if not args.by_queue: + if not by_queue: for w in ws: worker_queues = filter_queues(w.queue_names()) - if not args.raw: - print('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) + if not raw: + click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) else: - print('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) + click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) else: # Create reverse lookup table queues = dict([(q, []) for q in qs]) @@ -121,23 +121,69 @@ def show_workers(args): queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa else: queues_str = '–' - print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) + click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) - if not args.raw: - print('%d workers, %d queues' % (len(ws), len(qs))) + if not raw: + click.echo('%d workers, %d queues' % (len(ws), len(qs))) -def show_both(args): - show_queues(args) - if not args.raw: - print('') - show_workers(args) - if not args.raw: - print('') +def show_both(queues, raw, by_queue): + show_queues(queues, raw, by_queue) + if not raw: + click.echo('') + show_workers(queues, raw, by_queue) + if not raw: + click.echo('') import datetime - print('Updated: %s' % datetime.datetime.now()) + click.echo('Updated: %s' % datetime.datetime.now()) +def refresh(val, func, *args): + while True: + if val: + click.clear() + func(*args) + if val: + time.sleep(val) + else: + break + + +@click.command() +@click.option('--path', '-P', default='.', help='Specify the import path.') +@click.option('--interval', '-i', default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa +@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa +@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa +@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa +@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa +@click.argument('queues', nargs=-1) +@click.pass_context +def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues): + """RQ command-line monitor.""" + + if path: + sys.path = path.split(':') + sys.path + + conn = ctx.obj['connection'] + try: + if only_queues: + func = show_queues + elif only_workers: + func = show_workers + else: + func = show_both + + with Connection(conn): + refresh(interval, func, queues, raw, by_queue) + except ConnectionError as e: + click.echo(e) + sys.exit(1) + except KeyboardInterrupt: + click.echo() + sys.exit(0) + + +# TODO: The following code is for backward compatibility, should be removed in future def parse_args(): parser = argparse.ArgumentParser(description='RQ command-line monitor.') add_standard_arguments(parser) @@ -152,18 +198,13 @@ def parse_args(): return parser.parse_args() -def interval(val, func, args): - while True: - if val and sys.stdout.isatty(): - os.system('clear') - func(args) - if val and sys.stdout.isatty(): - time.sleep(val) - else: - break - - def main(): + # warn users this command is deprecated, use `rq info` + import warnings + warnings.simplefilter('always', DeprecationWarning) + warnings.warn("This command will be removed in future, " + "use `rq info` instead", DeprecationWarning) + args = parse_args() if args.path: @@ -189,7 +230,7 @@ def main(): else: func = show_both - interval(args.interval, func, args) + refresh(args.interval, func, args.queues, args.raw, args.by_queue) except ConnectionError as e: print(e) sys.exit(1) diff --git a/rq/scripts/rqworker.py b/rq/scripts/rqworker.py index b6371fd..c238c8a 100644 --- a/rq/scripts/rqworker.py +++ b/rq/scripts/rqworker.py @@ -4,8 +4,6 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import argparse -import logging -import logging.config import os import sys diff --git a/rq/utils.py b/rq/utils.py index d85235e..d875b26 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -11,38 +11,11 @@ from __future__ import (absolute_import, division, print_function, import importlib import datetime import logging -import os import sys from .compat import is_python_version, as_text -def gettermsize(): - def ioctl_GWINSZ(fd): - try: - import fcntl - import struct - import termios - cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) - except: - return None - return cr - cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) - if not cr: - try: - fd = os.open(os.ctermid(), os.O_RDONLY) - cr = ioctl_GWINSZ(fd) - os.close(fd) - except: - pass - if not cr: - try: - cr = (os.environ['LINES'], os.environ['COLUMNS']) - except: - cr = (25, 80) - return int(cr[1]), int(cr[0]) - - class _Colorizer(object): def __init__(self): esc = "\x1b[" diff --git a/setup.py b/setup.py index a49b126..1d84412 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ def get_version(): def get_dependencies(): - deps = ['redis >= 2.7.0'] + deps = ['redis >= 2.7.0', 'click >= 3.0'] if sys.version_info < (2, 7) or \ (sys.version_info >= (3, 0) and sys.version_info < (3, 1)): deps += ['importlib'] @@ -43,6 +43,7 @@ setup( install_requires=get_dependencies(), entry_points='''\ [console_scripts] + rq = rq.scripts.rq_cli:main rqworker = rq.scripts.rqworker:main rqinfo = rq.scripts.rqinfo:main ''', diff --git a/tests/test_scripts.py b/tests/test_scripts.py index 88689c3..8c5e0fc 100644 --- a/tests/test_scripts.py +++ b/tests/test_scripts.py @@ -2,8 +2,14 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from click.testing import CliRunner +from rq import get_failed_queue from rq.compat import is_python_version -from rq.scripts import read_config_file +from rq.job import Job +from rq.scripts import read_config_file, rq_cli + +from tests import RQTestCase +from tests.fixtures import div_by_zero if is_python_version((2, 7), (3, 2)): from unittest import TestCase @@ -16,3 +22,38 @@ class TestScripts(TestCase): settings = read_config_file("tests.dummy_settings") self.assertIn("REDIS_HOST", settings) self.assertEqual(settings['REDIS_HOST'], "testhost.example.com") + + +class TestRQCli(RQTestCase): + """Test rq_cli script""" + def setUp(self): + super(TestRQCli, self).setUp() + db_num = self.testconn.connection_pool.connection_kwargs['db'] + self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num + + job = Job.create(func=div_by_zero, args=(1, 2, 3)) + job.origin = 'fake' + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa + + def test_empty(self): + """rq -u empty -y""" + runner = CliRunner() + result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'empty', "-y"]) + self.assertEqual(result.exit_code, 0) + self.assertEqual(result.output, '1 jobs removed from failed queue\n') + + def test_requeue(self): + """rq -u requeue""" + runner = CliRunner() + result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'requeue', '-a']) + self.assertEqual(result.exit_code, 0) + self.assertIn('Requeueing 1 jobs from FailedQueue', result.output) + self.assertIn('Unable to requeue 0 jobs from FailedQueue', result.output) + + def test_info(self): + """rq -u info -i 0""" + runner = CliRunner() + result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'info', '-i 0']) + self.assertEqual(result.exit_code, 0) + self.assertIn('1 queues, 1 jobs total', result.output)