Convert rqworker to 'rq worker' subcommand

main
zhangliyong
parent 7b434a32eb
commit 842f27294a

@ -3,4 +3,4 @@ from .cli import main
# TODO: the following imports can be removed when we drop the `rqinfo` and # TODO: the following imports can be removed when we drop the `rqinfo` and
# `rqworkers` commands in favor of just shipping the `rq` command. # `rqworkers` commands in favor of just shipping the `rq` command.
from .cli import info from .cli import info, worker

@ -5,15 +5,21 @@ RQ command line tool
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import os
import sys import sys
import click import click
from redis import StrictRedis from redis import StrictRedis
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Connection, get_failed_queue, Queue from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
from .helpers import refresh, show_both, show_queues, show_workers
url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0', url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0',
help='URL describing Redis connection details.') help='URL describing Redis connection details.')
@ -113,3 +119,71 @@ def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, que
except KeyboardInterrupt: except KeyboardInterrupt:
click.echo() click.echo()
sys.exit(0) sys.exit(0)
@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') # noqa
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', help='Report exceptions to this Sentry DSN')
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl,
worker_ttl, verbose, quiet, sentry_dsn, pid, queues):
"""Starts an RQ worker."""
if path:
sys.path = path.split(':') + sys.path
settings = {}
if config:
settings = read_config_file(config)
conn = connect(url)
# Worker specific default arguments
if not queues:
queues = settings.get('QUEUES', ['default'])
if sentry_dsn is None:
sentry_dsn = settings.get('SENTRY_DSN',
os.environ.get('SENTRY_DSN', None))
if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))
setup_loghandlers_from_args(verbose, quiet)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
try:
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
connection=conn,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=job_class)
# Should we configure Sentry?
if sentry_dsn:
from raven import Client
from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn)
register_sentry(client, w)
w.work(burst=burst)
except ConnectionError as e:
print(e)
sys.exit(1)

@ -2,17 +2,27 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import importlib
import time import time
from functools import partial from functools import partial
import click import click
from rq import Queue, Worker from rq import Queue, Worker
from rq.logutils import setup_loghandlers
red = partial(click.style, fg='red') red = partial(click.style, fg='red')
green = partial(click.style, fg='green') green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow') yellow = partial(click.style, fg='yellow')
def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
return dict([(k, v)
for k, v in settings.__dict__.items()
if k.upper() == k])
def pad(s, pad_to_length): def pad(s, pad_to_length):
"""Pads the given string to the given length.""" """Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,) return ('%-' + '%ds' % pad_to_length) % (s,)
@ -141,3 +151,16 @@ def refresh(interval, func, *args):
time.sleep(interval) time.sleep(interval)
else: else:
break break
def setup_loghandlers_from_args(verbose, quiet):
if verbose and quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")
if verbose:
level = 'DEBUG'
elif quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)

@ -48,7 +48,7 @@ setup(
# NOTE: rqworker/rqinfo are kept for backward-compatibility, # NOTE: rqworker/rqinfo are kept for backward-compatibility,
# remove eventually (TODO) # remove eventually (TODO)
'rqinfo = rq.cli:info', 'rqinfo = rq.cli:info',
'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand 'rqworker = rq.cli:worker',
], ],
}, },
classifiers=[ classifiers=[

Loading…
Cancel
Save