diff --git a/docs/docs/workers.md b/docs/docs/workers.md index d19451f..52b4ff3 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments: * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--disable-job-desc-logging`: Turn off job description logging. * `--max-jobs`: Maximum number of jobs to execute. +* `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`) _New in version 1.8.0._ * `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer") @@ -317,19 +318,21 @@ $ rq worker -w 'path.to.GeventWorker' The default worker considers the order of queues as their priority order, and if a task is pending in a higher priority queue -it will be selected before any other in queues with lower priority. +it will be selected before any other in queues with lower priority (the `default` behavior). +To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option. In certain circumstances it can be useful that a when a worker is listening to multiple queues, say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th -from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker` -implements this strategy. +from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument. -In some other circumstances, when a worker is listening to multiple queues, it can be useful -to pull jobs from the different queues randomly. The custom class `rq.worker.RandomWorker` -implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is +In other circumstances, it can be useful to pull jobs from the different queues randomly. +To implement this strategy use `-ds random` argument. +In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is shuffled, so that no queue has more priority than the other ones. +Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker` +and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions. ## Custom Job and Queue Classes diff --git a/rq/cli/cli.py b/rq/cli/cli.py index 1bf9151..a2851aa 100755 --- a/rq/cli/cli.py +++ b/rq/cli/cli.py @@ -5,6 +5,7 @@ RQ command line tool from functools import update_wrapper import os import sys +import warnings import click from redis.exceptions import ConnectionError @@ -174,7 +175,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, try: with Connection(cli_config.connection): - if queues: qs = list(map(cli_config.queue_class, queues)) else: @@ -226,6 +226,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues, @click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') +@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues') @click.argument('queues', nargs=-1) @pass_cli_config def worker( @@ -253,7 +254,8 @@ def worker( log_format, date_format, serializer, - **options + dequeue_strategy, + **options, ): """Starts an RQ worker.""" settings = read_config_file(cli_config.config) if cli_config.config else {} @@ -268,6 +270,17 @@ def worker( with open(os.path.expanduser(pid), "w") as fp: fp.write(str(os.getpid())) + worker_name = cli_config.worker_class.__qualname__ + if worker_name in ["RoundRobinWorker", "RandomWorker"]: + strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin" + msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy." + warnings.warn(msg, DeprecationWarning) + click.secho(msg, fg='yellow') + + if dequeue_strategy not in ("default", "random", "round_robin"): + click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red') + sys.exit(1) + setup_loghandlers_from_args(verbose, quiet, date_format, log_format) try: @@ -299,7 +312,7 @@ def worker( exception_handlers=exception_handlers or None, disable_default_exception_handler=disable_default_exception_handler, log_job_description=not disable_job_desc_logging, - serializer=serializer, + serializer=serializer ) # Should we configure Sentry? @@ -321,6 +334,7 @@ def worker( max_jobs=max_jobs, max_idle_time=max_idle_time, with_scheduler=with_scheduler, + dequeue_strategy=dequeue_strategy ) except ConnectionError as e: print(e) @@ -402,7 +416,7 @@ def enqueue( serializer, function, arguments, - **options + **options, ): """Enqueues a job from the command line""" args, kwargs = parse_function_args(arguments) diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index fb20109..53bc019 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -100,7 +100,6 @@ def state_symbol(state): def show_queues(queues, raw, by_queue, queue_class, worker_class): - num_jobs = 0 termwidth = get_terminal_size().columns chartwidth = min(20, termwidth - 20) @@ -141,7 +140,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class): workers.add(worker) if not by_queue: - for worker in workers: queue_names = ', '.join(worker.queue_names()) name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid) diff --git a/rq/job.py b/rq/job.py index 0dcbf28..6e0333d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1322,9 +1322,8 @@ class Job: # for backward compatibility if self.supports_redis_streams: from .results import Result - Result.create( - self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline - ) + + Result.create(self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline) if result_ttl != 0: finished_job_registry = self.finished_job_registry @@ -1344,6 +1343,7 @@ class Job: ) if self.supports_redis_streams: from .results import Result + Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) def get_retry_interval(self) -> int: diff --git a/rq/queue.py b/rq/queue.py index 45d6a40..8564786 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1067,7 +1067,6 @@ class Queue: dependents_key = job.dependents_key while True: - try: # if a pipeline is passed, the caller is responsible for calling WATCH # to ensure all jobs are enqueued diff --git a/rq/results.py b/rq/results.py index 8ff770d..317b290 100644 --- a/rq/results.py +++ b/rq/results.py @@ -85,7 +85,7 @@ class Result(object): # response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True) response = job.connection.xrevrange(cls.get_key(job.id), '+', '-') results = [] - for (result_id, payload) in response: + for result_id, payload in response: results.append( cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) ) diff --git a/rq/utils.py b/rq/utils.py index a304fd3..9cd1255 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -96,7 +96,6 @@ def make_colorizer(color: str): class ColorizingStreamHandler(logging.StreamHandler): - levels = { logging.WARNING: make_colorizer('darkyellow'), logging.ERROR: make_colorizer('darkred'), diff --git a/rq/worker.py b/rq/worker.py index cb63c65..2ccfb78 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -11,12 +11,12 @@ import sys import time import traceback import warnings - from datetime import timedelta from enum import Enum -from uuid import uuid4 from random import shuffle -from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union +from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type, + Union) +from uuid import uuid4 if TYPE_CHECKING: from redis import Redis @@ -26,7 +26,9 @@ try: from signal import SIGKILL except ImportError: from signal import SIGTERM as SIGKILL + from contextlib import suppress + import redis.exceptions from . import worker_registration @@ -43,17 +45,20 @@ from .defaults import ( DEFAULT_LOGGING_DATE_FORMAT, ) from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException + from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import Queue from .registry import StartedJobRegistry, clean_registries from .scheduler import RQScheduler +from .serializers import resolve_serializer from .suspension import is_suspended from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text from .version import VERSION from .serializers import resolve_serializer + try: from setproctitle import setproctitle as setprocname except ImportError: @@ -91,6 +96,12 @@ def signal_name(signum): return 'SIG_UNKNOWN' +class DequeueStrategy(str, Enum): + DEFAULT = "default" + ROUND_ROBIN = "round_robin" + RANDOM = "random" + + class WorkerStatus(str, Enum): STARTED = 'started' SUSPENDED = 'suspended' @@ -283,6 +294,7 @@ class Worker: self.scheduler: Optional[RQScheduler] = None self.pubsub = None self.pubsub_thread = None + self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT self.disable_default_exception_handler = disable_default_exception_handler @@ -671,20 +683,36 @@ class Worker: self.pubsub.unsubscribe() self.pubsub.close() - def reorder_queues(self, reference_queue): - """Method placeholder to workers that implement some reordering strategy. - `pass` here means that the queue will remain with the same job order. + def reorder_queues(self, reference_queue: 'Queue'): + """Reorder the queues according to the strategy. + As this can be defined both in the `Worker` initialization or in the `work` method, + it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute. Args: - reference_queue (Union[Queue, str]): The queue + reference_queue (Union[Queue, str]): The queues to reorder """ - pass + if self._dequeue_strategy is None: + self._dequeue_strategy = DequeueStrategy.DEFAULT + + if self._dequeue_strategy not in ("default", "random", "round_robin"): + raise ValueError( + f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`." + ) + if self._dequeue_strategy == DequeueStrategy.DEFAULT: + return + if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN: + pos = self._ordered_queues.index(reference_queue) + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] + return + if self._dequeue_strategy == DequeueStrategy.RANDOM: + shuffle(self._ordered_queues) + return def bootstrap( self, logging_level: str = "INFO", date_format: str = DEFAULT_LOGGING_DATE_FORMAT, - log_format: str = DEFAULT_LOGGING_FORMAT, + log_format: str = DEFAULT_LOGGING_FORMAT ): """Bootstraps the worker. Runs the basic tasks that should run when the worker actually starts working. @@ -749,6 +777,7 @@ class Worker: max_jobs: Optional[int] = None, max_idle_time: Optional[int] = None, with_scheduler: bool = False, + dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT ) -> bool: """Starts the work loop. @@ -767,11 +796,13 @@ class Worker: max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. + dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT Returns: worked (bool): Will return True if any job was processed, False otherwise. """ self.bootstrap(logging_level, date_format, log_format) + self._dequeue_strategy = dequeue_strategy completed_jobs = 0 if with_scheduler: self._start_scheduler(burst, logging_level, date_format, log_format) @@ -1588,7 +1619,7 @@ class RoundRobinWorker(Worker): def reorder_queues(self, reference_queue): pos = self._ordered_queues.index(reference_queue) - self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] + self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1] class RandomWorker(Worker): diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 0f31f1d..fe4dc04 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -86,7 +86,6 @@ def clean_worker_registry(queue: 'Queue'): keys = list(get_keys(queue)) with queue.connection.pipeline() as pipeline: - for key in keys: pipeline.exists(key) results = pipeline.execute() diff --git a/tests/test_cli.py b/tests/test_cli.py index 07b9c39..0cdca78 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -326,6 +326,21 @@ class TestRQCli(RQTestCase): result = runner.invoke(main, args + ['--quiet', '--verbose']) self.assertNotEqual(result.exit_code, 0) + def test_worker_dequeue_strategy(self): + """--quiet and --verbose logging options are supported""" + runner = CliRunner() + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin'] + result = runner.invoke(main, args) + self.assert_normal_execution(result) + + args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong'] + result = runner.invoke(main, args) + self.assertEqual(result.exit_code, 1) + def test_exception_handlers(self): """rq worker -u -b --exception-handler """ connection = Redis.from_url(self.redis_url) diff --git a/tests/test_worker.py b/tests/test_worker.py index 239252c..dfa0f1d 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1136,6 +1136,59 @@ class TestWorker(RQTestCase): worker = Worker.find_by_key(w2.key) self.assertEqual(worker.python_version, python_version) + def test_dequeue_random_strategy(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="random") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)] + expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)] + + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + expected_rr.reverse() + expected_ser.reverse() + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + sorted_ids.sort() + expected_ser.sort() + self.assertEqual(sorted_ids, expected_ser) + + def test_dequeue_round_robin(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j)) + + w = Worker(qs) + w.work(burst=True, dequeue_strategy="round_robin") + + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0', + 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1', + 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2'] + + self.assertEqual(expected, sorted_ids) + def wait_and_kill_work_horse(pid, time_to_wait=0.0): time.sleep(time_to_wait)