Fix various flake8 complaints.

main
Vincent Driessen 10 years ago
parent 3eabe76690
commit fc803f5d0d

@ -138,10 +138,10 @@ class Queue(object):
def remove(self, job_or_id, pipeline=None):
"""Removes Job from queue, accepts either a Job instance or ID."""
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if pipeline is not None:
pipeline.lrem(self.key, 0, job_id)
return self.connection._lrem(self.key, 0, job_id)
def compact(self):

@ -15,8 +15,7 @@ from .rqinfo import info
@click.group()
@click.option('--url', '-u', envvar='URL',
help='URL describing Redis connection details.')
@click.option('--url', '-u', envvar='URL', help='URL describing Redis connection details.')
@click.pass_context
def main(ctx, url):
"""RQ CLI"""
@ -29,8 +28,7 @@ def main(ctx, url):
@main.command()
@click.option('--yes', '-y', is_flag=True,
help='Empty failed queue by default')
@click.option('--yes', '-y', is_flag=True, help='Empty failed queue by default')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, yes, queues):
@ -51,8 +49,7 @@ def empty(ctx, yes, queues):
conn = ctx.obj['connection']
queues = [Queue(queue, connection=conn) for queue in queues]
if not queues:
if yes or click.confirm('Do you want to empty failed queue?',
abort=True):
if yes or click.confirm('Do you want to empty failed queue?', abort=True):
queues = (get_failed_queue(connection=conn),)
for queue in queues:
num_jobs = queue.empty()
@ -61,8 +58,7 @@ def empty(ctx, yes, queues):
@main.command()
@click.option('--all', '-a', 'is_all', is_flag=True,
help='Requeue all failed jobs')
@click.option('--all', '-a', 'is_all', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
@click.pass_context
def requeue(ctx, is_all, job_ids):

@ -3,18 +3,17 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import sys
import time
import click
import argparse
from functools import partial
import click
from redis.exceptions import ConnectionError
from rq import get_failed_queue, Queue, Worker, Connection
from rq import Connection, get_failed_queue, Queue, Worker
from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis)
red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow')
@ -184,7 +183,7 @@ def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues):
sys.exit(0)
### The following code is for backward compatibility, will be removed in future
# TODO: The following code is for backward compatibility, should be removed in future
def parse_args():
parser = argparse.ArgumentParser(description='RQ command-line monitor.')
add_standard_arguments(parser)
@ -198,12 +197,13 @@ def parse_args():
parser.add_argument('queues', nargs='*', help='The queues to poll')
return parser.parse_args()
def main():
# warn users this command is deprecated, use `rq info`
import warnings
warnings.simplefilter('always', DeprecationWarning)
warnings.warn("This command will be remove in future, "
"use `rq info` instead", DeprecationWarning)
warnings.warn("This command will be removed in future, "
"use `rq info` instead", DeprecationWarning)
args = parse_args()

@ -4,8 +4,6 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import logging
import logging.config
import os
import sys

@ -39,25 +39,21 @@ class TestRQCli(RQTestCase):
def test_empty(self):
"""rq -u <url> empty -y"""
runner = CliRunner()
result = runner.invoke(rq_cli.main,
['-u', self.redis_url, 'empty', "-y"])
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'empty', "-y"])
self.assertEqual(result.exit_code, 0)
self.assertEqual(result.output, '1 jobs removed from failed queue\n')
def test_requeue(self):
"""rq -u <url> requeue"""
runner = CliRunner()
result = runner.invoke(rq_cli.main,
['-u', self.redis_url, 'requeue', '-a'])
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'requeue', '-a'])
self.assertEqual(result.exit_code, 0)
self.assertIn('Requeueing 1 jobs from FailedQueue', result.output)
self.assertIn('Unable to requeue 0 jobs from FailedQueue',
result.output)
self.assertIn('Unable to requeue 0 jobs from FailedQueue', result.output)
def test_info(self):
"""rq -u <url> info -i 0"""
runner = CliRunner()
result = runner.invoke(rq_cli.main,
['-u', self.redis_url, 'info', '-i 0'])
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'info', '-i 0'])
self.assertEqual(result.exit_code, 0)
self.assertIn('1 queues, 1 jobs total', result.output)

Loading…
Cancel
Save