Merge branch 'zhangliyong-cli-rq'

main
Vincent Driessen 10 years ago
commit 652cd71d2b

@ -1 +1,2 @@
redis redis
click

@ -138,10 +138,10 @@ class Queue(object):
def remove(self, job_or_id, pipeline=None): def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID.""" """Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None: if pipeline is not None:
pipeline.lrem(self.key, 0, job_id) pipeline.lrem(self.key, 0, job_id)
return self.connection._lrem(self.key, 0, job_id) return self.connection._lrem(self.key, 0, job_id)
def compact(self): def compact(self):

@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
"""
RQ command line tool
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import sys
import click
import redis
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError
from .rqinfo import info
@click.group()
@click.option('--url', '-u', envvar='URL', help='URL describing Redis connection details.')
@click.pass_context
def main(ctx, url):
"""RQ CLI"""
if url is None:
url = "redis://localhost:6379/0"
redis_conn = redis.from_url(url)
ctx.obj = {}
ctx.obj['connection'] = redis_conn
@main.command()
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, all, queues):
"""Empty given queues."""
conn = ctx.obj['connection']
if all:
queues = Queue.all(connection=conn)
else:
queues = [Queue(queue, connection=conn) for queue in queues]
if not queues:
click.echo('Nothing to do')
for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))
@main.command()
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
@click.pass_context
def requeue(ctx, all, job_ids):
"""Requeue failed jobs."""
conn = ctx.obj['connection']
failed_queue = get_failed_queue(connection=conn)
if all:
job_ids = failed_queue.job_ids
if not job_ids:
click.echo('Nothing to do')
sys.exit(0)
click.echo('Requeueing {0} jobs from failed queue'.format(len(job_ids)))
fail_count = 0
with click.progressbar(job_ids) as job_ids:
for job_id in job_ids:
try:
failed_queue.requeue(job_id)
except InvalidJobOperationError:
fail_count += 1
if fail_count > 0:
click.secho('Unable to requeue {0} jobs from failed queue'.format(fail_count), fg='red')
main.add_command(info)

@ -4,19 +4,19 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import argparse import argparse
import os
import sys import sys
import time import time
from functools import partial
import click
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import get_failed_queue, Queue, Worker from rq import Connection, get_failed_queue, Queue, Worker
from rq.scripts import (add_standard_arguments, read_config_file, from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis) setup_default_arguments, setup_redis)
from rq.utils import gettermsize, make_colorizer
red = make_colorizer('darkred') red = partial(click.style, fg='red')
green = make_colorizer('darkgreen') green = partial(click.style, fg='green')
yellow = make_colorizer('darkyellow') yellow = partial(click.style, fg='yellow')
def pad(s, pad_to_length): def pad(s, pad_to_length):
@ -44,14 +44,14 @@ def state_symbol(state):
return state return state
def show_queues(args): def show_queues(queues, raw, by_queue):
if len(args.queues): if queues:
qs = list(map(Queue, args.queues)) qs = list(map(Queue, queues))
else: else:
qs = Queue.all() qs = Queue.all()
num_jobs = 0 num_jobs = 0
termwidth, _ = gettermsize() termwidth, _ = click.get_terminal_size()
chartwidth = min(20, termwidth - 20) chartwidth = min(20, termwidth - 20)
max_count = 0 max_count = 0
@ -65,23 +65,23 @@ def show_queues(args):
for q in qs: for q in qs:
count = counts[q] count = counts[q]
if not args.raw: if not raw:
chart = green('|' + '' * int(ratio * count)) chart = green('|' + '' * int(ratio * count))
line = '%-12s %s %d' % (q.name, chart, count) line = '%-12s %s %d' % (q.name, chart, count)
else: else:
line = 'queue %s %d' % (q.name, count) line = 'queue %s %d' % (q.name, count)
print(line) click.echo(line)
num_jobs += count num_jobs += count
# Print summary when not in raw mode # print summary when not in raw mode
if not args.raw: if not raw:
print('%d queues, %d jobs total' % (len(qs), num_jobs)) click.echo('%d queues, %d jobs total' % (len(qs), num_jobs))
def show_workers(args): def show_workers(queues, raw, by_queue):
if len(args.queues): if queues:
qs = list(map(Queue, args.queues)) qs = list(map(Queue, queues))
def any_matching_queue(worker): def any_matching_queue(worker):
def queue_matches(q): def queue_matches(q):
@ -99,13 +99,13 @@ def show_workers(args):
ws = Worker.all() ws = Worker.all()
filter_queues = lambda x: x filter_queues = lambda x: x
if not args.by_queue: if not by_queue:
for w in ws: for w in ws:
worker_queues = filter_queues(w.queue_names()) worker_queues = filter_queues(w.queue_names())
if not args.raw: if not raw:
print('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues))) click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues)))
else: else:
print('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues))) click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues)))
else: else:
# Create reverse lookup table # Create reverse lookup table
queues = dict([(q, []) for q in qs]) queues = dict([(q, []) for q in qs])
@ -121,23 +121,69 @@ def show_workers(args):
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa
else: else:
queues_str = '' queues_str = ''
print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
if not args.raw: if not raw:
print('%d workers, %d queues' % (len(ws), len(qs))) click.echo('%d workers, %d queues' % (len(ws), len(qs)))
def show_both(args): def show_both(queues, raw, by_queue):
show_queues(args) show_queues(queues, raw, by_queue)
if not args.raw: if not raw:
print('') click.echo('')
show_workers(args) show_workers(queues, raw, by_queue)
if not args.raw: if not raw:
print('') click.echo('')
import datetime import datetime
print('Updated: %s' % datetime.datetime.now()) click.echo('Updated: %s' % datetime.datetime.now())
def refresh(val, func, *args):
while True:
if val:
click.clear()
func(*args)
if val:
time.sleep(val)
else:
break
@click.command()
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa
@click.argument('queues', nargs=-1)
@click.pass_context
def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor."""
if path:
sys.path = path.split(':') + sys.path
conn = ctx.obj['connection']
try:
if only_queues:
func = show_queues
elif only_workers:
func = show_workers
else:
func = show_both
with Connection(conn):
refresh(interval, func, queues, raw, by_queue)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
except KeyboardInterrupt:
click.echo()
sys.exit(0)
# TODO: The following code is for backward compatibility, should be removed in future
def parse_args(): def parse_args():
parser = argparse.ArgumentParser(description='RQ command-line monitor.') parser = argparse.ArgumentParser(description='RQ command-line monitor.')
add_standard_arguments(parser) add_standard_arguments(parser)
@ -152,18 +198,13 @@ def parse_args():
return parser.parse_args() return parser.parse_args()
def interval(val, func, args):
while True:
if val and sys.stdout.isatty():
os.system('clear')
func(args)
if val and sys.stdout.isatty():
time.sleep(val)
else:
break
def main(): def main():
# warn users this command is deprecated, use `rq info`
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn("This command will be removed in future, "
"use `rq info` instead", DeprecationWarning)
args = parse_args() args = parse_args()
if args.path: if args.path:
@ -189,7 +230,7 @@ def main():
else: else:
func = show_both func = show_both
interval(args.interval, func, args) refresh(args.interval, func, args.queues, args.raw, args.by_queue)
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)

@ -4,8 +4,6 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import argparse import argparse
import logging
import logging.config
import os import os
import sys import sys

@ -11,38 +11,11 @@ from __future__ import (absolute_import, division, print_function,
import importlib import importlib
import datetime import datetime
import logging import logging
import os
import sys import sys
from .compat import is_python_version, as_text from .compat import is_python_version, as_text
def gettermsize():
def ioctl_GWINSZ(fd):
try:
import fcntl
import struct
import termios
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
except:
return None
return cr
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
if not cr:
try:
fd = os.open(os.ctermid(), os.O_RDONLY)
cr = ioctl_GWINSZ(fd)
os.close(fd)
except:
pass
if not cr:
try:
cr = (os.environ['LINES'], os.environ['COLUMNS'])
except:
cr = (25, 80)
return int(cr[1]), int(cr[0])
class _Colorizer(object): class _Colorizer(object):
def __init__(self): def __init__(self):
esc = "\x1b[" esc = "\x1b["

@ -17,7 +17,7 @@ def get_version():
def get_dependencies(): def get_dependencies():
deps = ['redis >= 2.7.0'] deps = ['redis >= 2.7.0', 'click >= 3.0']
if sys.version_info < (2, 7) or \ if sys.version_info < (2, 7) or \
(sys.version_info >= (3, 0) and sys.version_info < (3, 1)): (sys.version_info >= (3, 0) and sys.version_info < (3, 1)):
deps += ['importlib'] deps += ['importlib']
@ -43,6 +43,7 @@ setup(
install_requires=get_dependencies(), install_requires=get_dependencies(),
entry_points='''\ entry_points='''\
[console_scripts] [console_scripts]
rq = rq.scripts.rq_cli:main
rqworker = rq.scripts.rqworker:main rqworker = rq.scripts.rqworker:main
rqinfo = rq.scripts.rqinfo:main rqinfo = rq.scripts.rqinfo:main
''', ''',

@ -2,8 +2,14 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from click.testing import CliRunner
from rq import get_failed_queue
from rq.compat import is_python_version from rq.compat import is_python_version
from rq.scripts import read_config_file from rq.job import Job
from rq.scripts import read_config_file, rq_cli
from tests import RQTestCase
from tests.fixtures import div_by_zero
if is_python_version((2, 7), (3, 2)): if is_python_version((2, 7), (3, 2)):
from unittest import TestCase from unittest import TestCase
@ -16,3 +22,38 @@ class TestScripts(TestCase):
settings = read_config_file("tests.dummy_settings") settings = read_config_file("tests.dummy_settings")
self.assertIn("REDIS_HOST", settings) self.assertIn("REDIS_HOST", settings)
self.assertEqual(settings['REDIS_HOST'], "testhost.example.com") self.assertEqual(settings['REDIS_HOST'], "testhost.example.com")
class TestRQCli(RQTestCase):
"""Test rq_cli script"""
def setUp(self):
super(TestRQCli, self).setUp()
db_num = self.testconn.connection_pool.connection_kwargs['db']
self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
def test_empty(self):
"""rq -u <url> empty -y"""
runner = CliRunner()
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'empty', "-y"])
self.assertEqual(result.exit_code, 0)
self.assertEqual(result.output, '1 jobs removed from failed queue\n')
def test_requeue(self):
"""rq -u <url> requeue"""
runner = CliRunner()
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'requeue', '-a'])
self.assertEqual(result.exit_code, 0)
self.assertIn('Requeueing 1 jobs from FailedQueue', result.output)
self.assertIn('Unable to requeue 0 jobs from FailedQueue', result.output)
def test_info(self):
"""rq -u <url> info -i 0"""
runner = CliRunner()
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'info', '-i 0'])
self.assertEqual(result.exit_code, 0)
self.assertIn('1 queues, 1 jobs total', result.output)

Loading…
Cancel
Save