diff --git a/rq/cli/__init__.py b/rq/cli/__init__.py index 88a454b..821f9d7 100644 --- a/rq/cli/__init__.py +++ b/rq/cli/__init__.py @@ -3,4 +3,4 @@ from .cli import main # TODO: the following imports can be removed when we drop the `rqinfo` and # `rqworkers` commands in favor of just shipping the `rq` command. -from .cli import info +from .cli import info, worker diff --git a/rq/cli/cli.py b/rq/cli/cli.py index a3cdc1b..faf0686 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,15 +5,21 @@ RQ command line tool from __future__ import (absolute_import, division, print_function, unicode_literals) +import os import sys import click from redis import StrictRedis from redis.exceptions import ConnectionError + from rq import Connection, get_failed_queue, Queue +from rq.contrib.legacy import cleanup_ghosts from rq.exceptions import InvalidJobOperationError +from rq.utils import import_attribute + +from .helpers import (read_config_file, refresh, setup_loghandlers_from_args, + show_both, show_queues, show_workers) -from .helpers import refresh, show_both, show_queues, show_workers url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0', help='URL describing Redis connection details.') @@ -113,3 +119,71 @@ def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, que except KeyboardInterrupt: click.echo() sys.exit(0) + + +@main.command() +@url_option +@click.option('--config', '-c', help='Module containing RQ settings.') +@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)') # noqa +@click.option('--name', '-n', help='Specify a different name') +@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use') +@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use') +@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use') +@click.option('--path', '-P', default='.', help='Specify the import path.') +@click.option('--results-ttl', help='Default results timeout to be used') +@click.option('--worker-ttl', type=int, help='Default worker timeout to be used') +@click.option('--verbose', '-v', is_flag=True, help='Show more output') +@click.option('--quiet', '-q', is_flag=True, help='Show less output') +@click.option('--sentry-dsn', help='Report exceptions to this Sentry DSN') +@click.option('--pid', help='Write the process ID number to a file at the specified path') +@click.argument('queues', nargs=-1) +def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, + worker_ttl, verbose, quiet, sentry_dsn, pid, queues): + """Starts an RQ worker.""" + + if path: + sys.path = path.split(':') + sys.path + + settings = {} + if config: + settings = read_config_file(config) + + conn = connect(url) + # Worker specific default arguments + if not queues: + queues = settings.get('QUEUES', ['default']) + + if sentry_dsn is None: + sentry_dsn = settings.get('SENTRY_DSN', + os.environ.get('SENTRY_DSN', None)) + + if pid: + with open(os.path.expanduser(pid), "w") as fp: + fp.write(str(os.getpid())) + + setup_loghandlers_from_args(verbose, quiet) + + cleanup_ghosts(conn) + worker_class = import_attribute(worker_class) + queue_class = import_attribute(queue_class) + + try: + queues = [queue_class(queue, connection=conn) for queue in queues] + w = worker_class(queues, + name=name, + connection=conn, + default_worker_ttl=worker_ttl, + default_result_ttl=results_ttl, + job_class=job_class) + + # Should we configure Sentry? + if sentry_dsn: + from raven import Client + from rq.contrib.sentry import register_sentry + client = Client(sentry_dsn) + register_sentry(client, w) + + w.work(burst=burst) + except ConnectionError as e: + print(e) + sys.exit(1) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index c63fe5c..f25cc81 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -2,17 +2,27 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import importlib import time from functools import partial import click from rq import Queue, Worker +from rq.logutils import setup_loghandlers red = partial(click.style, fg='red') green = partial(click.style, fg='green') yellow = partial(click.style, fg='yellow') +def read_config_file(module): + """Reads all UPPERCASE variables defined in the given module file.""" + settings = importlib.import_module(module) + return dict([(k, v) + for k, v in settings.__dict__.items() + if k.upper() == k]) + + def pad(s, pad_to_length): """Pads the given string to the given length.""" return ('%-' + '%ds' % pad_to_length) % (s,) @@ -141,3 +151,16 @@ def refresh(interval, func, *args): time.sleep(interval) else: break + + +def setup_loghandlers_from_args(verbose, quiet): + if verbose and quiet: + raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.") + + if verbose: + level = 'DEBUG' + elif quiet: + level = 'WARNING' + else: + level = 'INFO' + setup_loghandlers(level) diff --git a/setup.py b/setup.py index 6e1d216..f2a8574 100644 --- a/setup.py +++ b/setup.py @@ -48,7 +48,7 @@ setup( # NOTE: rqworker/rqinfo are kept for backward-compatibility, # remove eventually (TODO) 'rqinfo = rq.cli:info', - 'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand + 'rqworker = rq.cli:worker', ], }, classifiers=[