Merge pull request #420 from zhangliyong/cli-rq-worker

Convert `rqworker` to `rq worker` subcommand
main
Vincent Driessen 10 years ago
commit bc6d30e473

@ -3,4 +3,4 @@ from .cli import main
# TODO: the following imports can be removed when we drop the `rqinfo` and # TODO: the following imports can be removed when we drop the `rqinfo` and
# `rqworkers` commands in favor of just shipping the `rq` command. # `rqworkers` commands in favor of just shipping the `rq` command.
from .cli import info from .cli import info, worker

@ -5,22 +5,28 @@ RQ command line tool
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import os
import sys import sys
import click import click
from redis import StrictRedis from redis import StrictRedis
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, Queue from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
from .helpers import refresh, show_both, show_queues, show_workers
url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0', url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.') help='URL describing Redis connection details.')
def connect(url): def connect(url):
return StrictRedis.from_url(url) return StrictRedis.from_url(url or 'redis://localhost:6379/0')
@click.group() @click.group()
@ -33,8 +39,7 @@ def main():
@url_option @url_option
@click.option('--all', '-a', is_flag=True, help='Empty all queues') @click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@click.pass_context def empty(url, all, queues):
def empty(ctx, url, all, queues):
"""Empty given queues.""" """Empty given queues."""
conn = connect(url) conn = connect(url)
@ -55,8 +60,7 @@ def empty(ctx, url, all, queues):
@url_option @url_option
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs') @click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1) @click.argument('job_ids', nargs=-1)
@click.pass_context def requeue(url, all, job_ids):
def requeue(ctx, url, all, job_ids):
"""Requeue failed jobs.""" """Requeue failed jobs."""
conn = connect(url) conn = connect(url)
failed_queue = get_failed_queue(connection=conn) failed_queue = get_failed_queue(connection=conn)
@ -84,14 +88,13 @@ def requeue(ctx, url, all, job_ids):
@main.command() @main.command()
@url_option @url_option
@click.option('--path', '-P', default='.', help='Specify the import path.') @click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=None, help='Updates stats every N seconds (default: don\'t poll)') # noqa @click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa @click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa @click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa @click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa @click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@click.pass_context def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor.""" """RQ command-line monitor."""
if path: if path:
@ -113,3 +116,65 @@ def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, que
except KeyboardInterrupt: except KeyboardInterrupt:
click.echo() click.echo()
sys.exit(0) sys.exit(0)
@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl,
verbose, quiet, sentry_dsn, pid, queues):
"""Starts an RQ worker."""
if path:
sys.path = path.split(':') + sys.path
settings = read_config_file(config) if config else {}
# Worker specific default arguments
url = url or settings.get('REDIS_URL')
queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
setup_loghandlers_from_args(verbose, quiet)
conn = connect(url)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
try:
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
connection=conn,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=job_class)
# Should we configure Sentry?
if sentry_dsn:
from raven import Client
from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn)
register_sentry(client, w)
w.work(burst=burst)
except ConnectionError as e:
print(e)
sys.exit(1)

@ -2,17 +2,27 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import importlib
import time import time
from functools import partial from functools import partial
import click import click
from rq import Queue, Worker from rq import Queue, Worker
from rq.logutils import setup_loghandlers
red = partial(click.style, fg='red') red = partial(click.style, fg='red')
green = partial(click.style, fg='green') green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow') yellow = partial(click.style, fg='yellow')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
return dict([(k, v)
for k, v in settings.__dict__.items()
if k.upper() == k])
def pad(s, pad_to_length): def pad(s, pad_to_length):
"""Pads the given string to the given length.""" """Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,) return ('%-' + '%ds' % pad_to_length) % (s,)
@ -141,3 +151,16 @@ def refresh(interval, func, *args):
time.sleep(interval) time.sleep(interval)
else: else:
break break
def setup_loghandlers_from_args(verbose, quiet):
if verbose and quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
if verbose:
level = 'DEBUG'
elif quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)

@ -11,7 +11,7 @@ from rq import Worker
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def cleanup_ghosts(): def cleanup_ghosts(conn=None):
""" """
RQ versions < 0.3.6 suffered from a race condition where workers, when RQ versions < 0.3.6 suffered from a race condition where workers, when
abruptly terminated, did not have a chance to clean up their worker abruptly terminated, did not have a chance to clean up their worker
@ -21,8 +21,8 @@ def cleanup_ghosts():
This function will clean up any of such legacy ghosted workers. This function will clean up any of such legacy ghosted workers.
""" """
conn = get_current_connection() conn = conn if conn else get_current_connection()
for worker in Worker.all(): for worker in Worker.all(connection=conn):
if conn._ttl(worker.key) == -1: if conn._ttl(worker.key) == -1:
ttl = worker.default_worker_ttl ttl = worker.default_worker_ttl
conn.expire(worker.key, ttl) conn.expire(worker.key, ttl)

@ -2,7 +2,6 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import types
import inspect import inspect
import warnings import warnings
from functools import partial from functools import partial

@ -1,65 +0,0 @@
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import importlib
import os
from functools import partial
from warnings import warn
import redis
from rq import use_connection
from rq.utils import first
def add_standard_arguments(parser):
parser.add_argument('--config', '-c', default=None,
help='Module containing RQ settings.')
parser.add_argument('--url', '-u', default=None,
help='URL describing Redis connection details. '
'Overrides other connection arguments if supplied.')
parser.add_argument('--host', '-H', default=None,
help='The Redis hostname (default: localhost)')
parser.add_argument('--port', '-p', default=None,
help='The Redis portnumber (default: 6379)')
parser.add_argument('--db', '-d', type=int, default=None,
help='The Redis database (default: 0)')
parser.add_argument('--password', '-a', default=None,
help='The Redis password (default: None)')
parser.add_argument('--socket', '-s', default=None,
help='The Redis Unix socket')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
return dict([(k, v)
for k, v in settings.__dict__.items()
if k.upper() == k])
def setup_default_arguments(args, settings):
""" Sets up args from settings or defaults """
args.url = first([args.url, settings.get('REDIS_URL'), os.environ.get('RQ_REDIS_URL')])
if (args.host or args.port or args.socket or args.db or args.password):
warn('Host, port, db, password options for Redis will not be '
'supported in future versions of RQ. '
'Please use `REDIS_URL` or `--url` instead.', DeprecationWarning)
strict_first = partial(first, key=lambda obj: obj is not None)
args.host = strict_first([args.host, settings.get('REDIS_HOST'), os.environ.get('RQ_REDIS_HOST'), 'localhost'])
args.port = int(strict_first([args.port, settings.get('REDIS_PORT'), os.environ.get('RQ_REDIS_PORT'), 6379]))
args.socket = strict_first([args.socket, settings.get('REDIS_SOCKET'), os.environ.get('RQ_REDIS_SOCKET'), None])
args.db = strict_first([args.db, settings.get('REDIS_DB'), os.environ.get('RQ_REDIS_DB'), 0])
args.password = strict_first([args.password, settings.get('REDIS_PASSWORD'), os.environ.get('RQ_REDIS_PASSWORD')])
def setup_redis(args):
if args.url is not None:
redis_conn = redis.StrictRedis.from_url(args.url)
else:
redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
password=args.password, unix_socket_path=args.socket)
use_connection(redis_conn)

@ -1,102 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import (absolute_import, division, print_function,
unicode_literals)
import argparse
import os
import sys
from redis.exceptions import ConnectionError
from rq.contrib.legacy import cleanup_ghosts
from rq.logutils import setup_loghandlers
from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis)
from rq.utils import import_attribute
def parse_args():
parser = argparse.ArgumentParser(description='Starts an RQ worker.')
add_standard_arguments(parser)
parser.add_argument('--burst', '-b', action='store_true', default=False, help='Run in burst mode (quit after all work is done)') # noqa
parser.add_argument('--name', '-n', default=None, help='Specify a different name')
parser.add_argument('--worker-class', '-w', action='store', default='rq.Worker', help='RQ Worker class to use')
parser.add_argument('--job-class', '-j', action='store', default='rq.job.Job', help='RQ Job class to use')
parser.add_argument('--queue-class', action='store', default='rq.Queue', help='RQ Queue class to use')
parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--results-ttl', default=None, help='Default results timeout to be used')
parser.add_argument('--worker-ttl', type=int, default=None, help='Default worker timeout to be used')
parser.add_argument('--verbose', '-v', action='store_true', default=False, help='Show more output')
parser.add_argument('--quiet', '-q', action='store_true', default=False, help='Show less output')
parser.add_argument('--sentry-dsn', action='store', default=None, metavar='URL', help='Report exceptions to this Sentry DSN') # noqa
parser.add_argument('--pid', action='store', default=None,
help='Write the process ID number to a file at the specified path')
parser.add_argument('queues', nargs='*', help='The queues to listen on (default: \'default\')')
return parser.parse_args()
def setup_loghandlers_from_args(args):
if args.verbose and args.quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
if args.verbose:
level = 'DEBUG'
elif args.quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)
def main():
args = parse_args()
if args.path:
sys.path = args.path.split(':') + sys.path
settings = {}
if args.config:
settings = read_config_file(args.config)
setup_default_arguments(args, settings)
# Worker specific default arguments
if not args.queues:
args.queues = settings.get('QUEUES', ['default'])
if args.sentry_dsn is None:
args.sentry_dsn = settings.get('SENTRY_DSN',
os.environ.get('SENTRY_DSN', None))
if args.pid:
with open(os.path.expanduser(args.pid), "w") as fp:
fp.write(str(os.getpid()))
setup_loghandlers_from_args(args)
setup_redis(args)
cleanup_ghosts()
worker_class = import_attribute(args.worker_class)
queue_class = import_attribute(args.queue_class)
try:
queues = list(map(queue_class, args.queues))
w = worker_class(queues,
name=args.name,
default_worker_ttl=args.worker_ttl,
default_result_ttl=args.results_ttl,
job_class=args.job_class)
# Should we configure Sentry?
if args.sentry_dsn:
from raven import Client
from rq.contrib.sentry import register_sentry
client = Client(args.sentry_dsn)
register_sentry(client, w)
w.work(burst=args.burst)
except ConnectionError as e:
print(e)
sys.exit(1)

@ -48,7 +48,7 @@ setup(
# NOTE: rqworker/rqinfo are kept for backward-compatibility, # NOTE: rqworker/rqinfo are kept for backward-compatibility,
# remove eventually (TODO) # remove eventually (TODO)
'rqinfo = rq.cli:info', 'rqinfo = rq.cli:info',
'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand 'rqworker = rq.cli:worker',
], ],
}, },
classifiers=[ classifiers=[

@ -7,7 +7,7 @@ from rq import get_failed_queue
from rq.compat import is_python_version from rq.compat import is_python_version
from rq.job import Job from rq.job import Job
from rq.cli import main from rq.cli import main
from rq.scripts import read_config_file from rq.cli.helpers import read_config_file
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero from tests.fixtures import div_by_zero
@ -38,22 +38,28 @@ class TestRQCli(RQTestCase):
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
def test_empty(self): def test_empty(self):
"""rq -u <url> empty -y""" """rq empty -u <url> failed"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed']) result = runner.invoke(main, ['empty', '-u', self.redis_url, 'failed'])
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertEqual(result.output.strip(), '1 jobs removed from failed queue') self.assertEqual(result.output.strip(), '1 jobs removed from failed queue')
def test_requeue(self): def test_requeue(self):
"""rq -u <url> requeue""" """rq requeue -u <url> --all"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all']) result = runner.invoke(main, ['requeue', '-u', self.redis_url, '--all'])
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue') self.assertEqual(result.output.strip(), 'Requeueing 1 jobs from failed queue')
def test_info(self): def test_info(self):
"""rq -u <url> info -i 0""" """rq info -u <url>"""
runner = CliRunner() runner = CliRunner()
result = runner.invoke(main, ['info', '-u', self.redis_url]) result = runner.invoke(main, ['info', '-u', self.redis_url])
self.assertEqual(result.exit_code, 0) self.assertEqual(result.exit_code, 0)
self.assertIn('1 queues, 1 jobs total', result.output) self.assertIn('1 queues, 1 jobs total', result.output)
def test_worker(self):
"""rq worker -u <url> -b"""
runner = CliRunner()
result = runner.invoke(main, ['worker', '-u', self.redis_url, '-b'])
self.assertEqual(result.exit_code, 0)

@ -346,5 +346,5 @@ class TestJob(RQTestCase):
job = queue.enqueue(say_hello, job_id="1234") job = queue.enqueue(say_hello, job_id="1234")
self.assertEqual(job.id, "1234") self.assertEqual(job.id, "1234")
job.perform() job.perform()
self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234) self.assertRaises(TypeError, queue.enqueue, say_hello, job_id=1234)

Loading…
Cancel
Save