New dequeue strategy (#1806)

* New dequeue strategy

This implements a new parameter `dequeue_strategy` that
should replace the `RoundRobinWorker` and `RandomWorker`.
Changes includes: feature, docs, tests, deprecation warning.

* Fix dequeue strategy name

* Black & Fix warning

* feat: tests, warnings, refactor naming

* feat: improve worker check

* fix: revert to str subclass

* fix: dequeue strategy into bootstrap

* org: move DequeueStrategy to worker

* refactor: round robin naming

* fix: naming

* fix: type annotation

* fix: typo

* refactor: remove kwarg from worker's init

* fix: typo

* move `dequeue_strategy` from `bootstrap()` into `work()`
main
lowercase00 2 years ago committed by GitHub
parent 0ba3971d55
commit 654649743c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -69,6 +69,7 @@ In addition to `--burst`, `rq worker` also accepts these arguments:
* `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'` * `--date-format`: Datetime format for the worker logs, defaults to `'%H:%M:%S'`
* `--disable-job-desc-logging`: Turn off job description logging. * `--disable-job-desc-logging`: Turn off job description logging.
* `--max-jobs`: Maximum number of jobs to execute. * `--max-jobs`: Maximum number of jobs to execute.
* `--dequeue-strategy`: The strategy to dequeue jobs from multiple queues (one of `default`, `random` or `round_robin`, defaults to `default`)
_New in version 1.8.0._ _New in version 1.8.0._
* `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer") * `--serializer`: Path to serializer object (e.g "rq.serializers.DefaultSerializer" or "rq.serializers.JSONSerializer")
@ -317,19 +318,21 @@ $ rq worker -w 'path.to.GeventWorker'
The default worker considers the order of queues as their priority order, The default worker considers the order of queues as their priority order,
and if a task is pending in a higher priority queue and if a task is pending in a higher priority queue
it will be selected before any other in queues with lower priority. it will be selected before any other in queues with lower priority (the `default` behavior).
To choose the strategy that should be used, `rq` provides the `--dequeue-strategy / -ds` option.
In certain circumstances it can be useful that a when a worker is listening to multiple queues, In certain circumstances it can be useful that a when a worker is listening to multiple queues,
say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st
dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th
from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker` from `q1`, the 5th from `q2` and so on. To implement this strategy use `-ds round_robin` argument.
implements this strategy.
In some other circumstances, when a worker is listening to multiple queues, it can be useful In other circumstances, it can be useful to pull jobs from the different queues randomly.
to pull jobs from the different queues randomly. The custom class `rq.worker.RandomWorker` To implement this strategy use `-ds random` argument.
implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is In fact, whenever a job is pulled from any queue with the `random` strategy, the list of queues is
shuffled, so that no queue has more priority than the other ones. shuffled, so that no queue has more priority than the other ones.
Deprecation Warning: Those strategies were formely being implemented by using the custom classes `rq.worker.RoundRobinWorker`
and `rq.worker.RandomWorker`. As the `--dequeue-strategy` argument allows for this option to be used with any worker, those worker classes are deprecated and will be removed from future versions.
## Custom Job and Queue Classes ## Custom Job and Queue Classes

@ -5,6 +5,7 @@ RQ command line tool
from functools import update_wrapper from functools import update_wrapper
import os import os
import sys import sys
import warnings
import click import click
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
@ -174,7 +175,6 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
try: try:
with Connection(cli_config.connection): with Connection(cli_config.connection):
if queues: if queues:
qs = list(map(cli_config.queue_class, queues)) qs = list(map(cli_config.queue_class, queues))
else: else:
@ -226,6 +226,7 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute') @click.option('--max-idle-time', type=int, default=None, help='Maximum seconds to stay alive without jobs to execute')
@click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler') @click.option('--with-scheduler', '-s', is_flag=True, help='Run worker with scheduler')
@click.option('--serializer', '-S', default=None, help='Run worker with custom serializer') @click.option('--serializer', '-S', default=None, help='Run worker with custom serializer')
@click.option('--dequeue-strategy', '-ds', default='default', help='Sets a custom stratey to dequeue from multiple queues')
@click.argument('queues', nargs=-1) @click.argument('queues', nargs=-1)
@pass_cli_config @pass_cli_config
def worker( def worker(
@ -253,7 +254,8 @@ def worker(
log_format, log_format,
date_format, date_format,
serializer, serializer,
**options dequeue_strategy,
**options,
): ):
"""Starts an RQ worker.""" """Starts an RQ worker."""
settings = read_config_file(cli_config.config) if cli_config.config else {} settings = read_config_file(cli_config.config) if cli_config.config else {}
@ -268,6 +270,17 @@ def worker(
with open(os.path.expanduser(pid), "w") as fp: with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid())) fp.write(str(os.getpid()))
worker_name = cli_config.worker_class.__qualname__
if worker_name in ["RoundRobinWorker", "RandomWorker"]:
strategy_alternative = "random" if worker_name == "RandomWorker" else "round_robin"
msg = f"WARNING: The {worker_name} is deprecated. Use the --dequeue-strategy / -ds option with the {strategy_alternative} argument to set the strategy."
warnings.warn(msg, DeprecationWarning)
click.secho(msg, fg='yellow')
if dequeue_strategy not in ("default", "random", "round_robin"):
click.secho("ERROR: Dequeue Strategy can only be one of `default`, `random` or `round_robin`.", err=True, fg='red')
sys.exit(1)
setup_loghandlers_from_args(verbose, quiet, date_format, log_format) setup_loghandlers_from_args(verbose, quiet, date_format, log_format)
try: try:
@ -299,7 +312,7 @@ def worker(
exception_handlers=exception_handlers or None, exception_handlers=exception_handlers or None,
disable_default_exception_handler=disable_default_exception_handler, disable_default_exception_handler=disable_default_exception_handler,
log_job_description=not disable_job_desc_logging, log_job_description=not disable_job_desc_logging,
serializer=serializer, serializer=serializer
) )
# Should we configure Sentry? # Should we configure Sentry?
@ -321,6 +334,7 @@ def worker(
max_jobs=max_jobs, max_jobs=max_jobs,
max_idle_time=max_idle_time, max_idle_time=max_idle_time,
with_scheduler=with_scheduler, with_scheduler=with_scheduler,
dequeue_strategy=dequeue_strategy
) )
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
@ -402,7 +416,7 @@ def enqueue(
serializer, serializer,
function, function,
arguments, arguments,
**options **options,
): ):
"""Enqueues a job from the command line""" """Enqueues a job from the command line"""
args, kwargs = parse_function_args(arguments) args, kwargs = parse_function_args(arguments)

@ -100,7 +100,6 @@ def state_symbol(state):
def show_queues(queues, raw, by_queue, queue_class, worker_class): def show_queues(queues, raw, by_queue, queue_class, worker_class):
num_jobs = 0 num_jobs = 0
termwidth = get_terminal_size().columns termwidth = get_terminal_size().columns
chartwidth = min(20, termwidth - 20) chartwidth = min(20, termwidth - 20)
@ -141,7 +140,6 @@ def show_workers(queues, raw, by_queue, queue_class, worker_class):
workers.add(worker) workers.add(worker)
if not by_queue: if not by_queue:
for worker in workers: for worker in workers:
queue_names = ', '.join(worker.queue_names()) queue_names = ', '.join(worker.queue_names())
name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid) name = '%s (%s %s %s)' % (worker.name, worker.hostname, worker.ip_address, worker.pid)

@ -1322,9 +1322,8 @@ class Job:
# for backward compatibility # for backward compatibility
if self.supports_redis_streams: if self.supports_redis_streams:
from .results import Result from .results import Result
Result.create(
self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline Result.create(self, Result.Type.SUCCESSFUL, return_value=self._result, ttl=result_ttl, pipeline=pipeline)
)
if result_ttl != 0: if result_ttl != 0:
finished_job_registry = self.finished_job_registry finished_job_registry = self.finished_job_registry
@ -1344,6 +1343,7 @@ class Job:
) )
if self.supports_redis_streams: if self.supports_redis_streams:
from .results import Result from .results import Result
Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline) Result.create_failure(self, self.failure_ttl, exc_string=exc_string, pipeline=pipeline)
def get_retry_interval(self) -> int: def get_retry_interval(self) -> int:

@ -1067,7 +1067,6 @@ class Queue:
dependents_key = job.dependents_key dependents_key = job.dependents_key
while True: while True:
try: try:
# if a pipeline is passed, the caller is responsible for calling WATCH # if a pipeline is passed, the caller is responsible for calling WATCH
# to ensure all jobs are enqueued # to ensure all jobs are enqueued

@ -85,7 +85,7 @@ class Result(object):
# response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True) # response = job.connection.zrange(cls.get_key(job.id), 0, 10, desc=True, withscores=True)
response = job.connection.xrevrange(cls.get_key(job.id), '+', '-') response = job.connection.xrevrange(cls.get_key(job.id), '+', '-')
results = [] results = []
for (result_id, payload) in response: for result_id, payload in response:
results.append( results.append(
cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer) cls.restore(job.id, result_id.decode(), payload, connection=job.connection, serializer=serializer)
) )

@ -96,7 +96,6 @@ def make_colorizer(color: str):
class ColorizingStreamHandler(logging.StreamHandler): class ColorizingStreamHandler(logging.StreamHandler):
levels = { levels = {
logging.WARNING: make_colorizer('darkyellow'), logging.WARNING: make_colorizer('darkyellow'),
logging.ERROR: make_colorizer('darkred'), logging.ERROR: make_colorizer('darkred'),

@ -11,12 +11,12 @@ import sys
import time import time
import traceback import traceback
import warnings import warnings
from datetime import timedelta from datetime import timedelta
from enum import Enum from enum import Enum
from uuid import uuid4
from random import shuffle from random import shuffle
from typing import Any, Callable, List, Optional, TYPE_CHECKING, Tuple, Type, Union from typing import (TYPE_CHECKING, Any, Callable, List, Optional, Tuple, Type,
Union)
from uuid import uuid4
if TYPE_CHECKING: if TYPE_CHECKING:
from redis import Redis from redis import Redis
@ -26,7 +26,9 @@ try:
from signal import SIGKILL from signal import SIGKILL
except ImportError: except ImportError:
from signal import SIGTERM as SIGKILL from signal import SIGTERM as SIGKILL
from contextlib import suppress from contextlib import suppress
import redis.exceptions import redis.exceptions
from . import worker_registration from . import worker_registration
@ -43,17 +45,20 @@ from .defaults import (
DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_DATE_FORMAT,
) )
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import Queue from .queue import Queue
from .registry import StartedJobRegistry, clean_registries from .registry import StartedJobRegistry, clean_registries
from .scheduler import RQScheduler from .scheduler import RQScheduler
from .serializers import resolve_serializer
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty
from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text from .utils import backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse, compact, as_text
from .version import VERSION from .version import VERSION
from .serializers import resolve_serializer from .serializers import resolve_serializer
try: try:
from setproctitle import setproctitle as setprocname from setproctitle import setproctitle as setprocname
except ImportError: except ImportError:
@ -91,6 +96,12 @@ def signal_name(signum):
return 'SIG_UNKNOWN' return 'SIG_UNKNOWN'
class DequeueStrategy(str, Enum):
DEFAULT = "default"
ROUND_ROBIN = "round_robin"
RANDOM = "random"
class WorkerStatus(str, Enum): class WorkerStatus(str, Enum):
STARTED = 'started' STARTED = 'started'
SUSPENDED = 'suspended' SUSPENDED = 'suspended'
@ -283,6 +294,7 @@ class Worker:
self.scheduler: Optional[RQScheduler] = None self.scheduler: Optional[RQScheduler] = None
self.pubsub = None self.pubsub = None
self.pubsub_thread = None self.pubsub_thread = None
self._dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
self.disable_default_exception_handler = disable_default_exception_handler self.disable_default_exception_handler = disable_default_exception_handler
@ -671,20 +683,36 @@ class Worker:
self.pubsub.unsubscribe() self.pubsub.unsubscribe()
self.pubsub.close() self.pubsub.close()
def reorder_queues(self, reference_queue): def reorder_queues(self, reference_queue: 'Queue'):
"""Method placeholder to workers that implement some reordering strategy. """Reorder the queues according to the strategy.
`pass` here means that the queue will remain with the same job order. As this can be defined both in the `Worker` initialization or in the `work` method,
it doesn't take the strategy directly, but rather uses the private `_dequeue_strategy` attribute.
Args: Args:
reference_queue (Union[Queue, str]): The queue reference_queue (Union[Queue, str]): The queues to reorder
""" """
pass if self._dequeue_strategy is None:
self._dequeue_strategy = DequeueStrategy.DEFAULT
if self._dequeue_strategy not in ("default", "random", "round_robin"):
raise ValueError(
f"Dequeue strategy {self._dequeue_strategy} is not allowed. Use `default`, `random` or `round_robin`."
)
if self._dequeue_strategy == DequeueStrategy.DEFAULT:
return
if self._dequeue_strategy == DequeueStrategy.ROUND_ROBIN:
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
return
if self._dequeue_strategy == DequeueStrategy.RANDOM:
shuffle(self._ordered_queues)
return
def bootstrap( def bootstrap(
self, self,
logging_level: str = "INFO", logging_level: str = "INFO",
date_format: str = DEFAULT_LOGGING_DATE_FORMAT, date_format: str = DEFAULT_LOGGING_DATE_FORMAT,
log_format: str = DEFAULT_LOGGING_FORMAT, log_format: str = DEFAULT_LOGGING_FORMAT
): ):
"""Bootstraps the worker. """Bootstraps the worker.
Runs the basic tasks that should run when the worker actually starts working. Runs the basic tasks that should run when the worker actually starts working.
@ -749,6 +777,7 @@ class Worker:
max_jobs: Optional[int] = None, max_jobs: Optional[int] = None,
max_idle_time: Optional[int] = None, max_idle_time: Optional[int] = None,
with_scheduler: bool = False, with_scheduler: bool = False,
dequeue_strategy: DequeueStrategy = DequeueStrategy.DEFAULT
) -> bool: ) -> bool:
"""Starts the work loop. """Starts the work loop.
@ -767,11 +796,13 @@ class Worker:
max_jobs (Optional[int], optional): Max number of jobs. Defaults to None. max_jobs (Optional[int], optional): Max number of jobs. Defaults to None.
max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None. max_idle_time (Optional[int], optional): Max seconds for worker to be idle. Defaults to None.
with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False. with_scheduler (bool, optional): Whether to run the scheduler in a separate process. Defaults to False.
dequeue_strategy (DequeueStrategy, optional): Which strategy to use to dequeue jobs. Defaults to DequeueStrategy.DEFAULT
Returns: Returns:
worked (bool): Will return True if any job was processed, False otherwise. worked (bool): Will return True if any job was processed, False otherwise.
""" """
self.bootstrap(logging_level, date_format, log_format) self.bootstrap(logging_level, date_format, log_format)
self._dequeue_strategy = dequeue_strategy
completed_jobs = 0 completed_jobs = 0
if with_scheduler: if with_scheduler:
self._start_scheduler(burst, logging_level, date_format, log_format) self._start_scheduler(burst, logging_level, date_format, log_format)
@ -1588,7 +1619,7 @@ class RoundRobinWorker(Worker):
def reorder_queues(self, reference_queue): def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue) pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos + 1 :] + self._ordered_queues[: pos + 1] self._ordered_queues = self._ordered_queues[pos + 1:] + self._ordered_queues[: pos + 1]
class RandomWorker(Worker): class RandomWorker(Worker):

@ -86,7 +86,6 @@ def clean_worker_registry(queue: 'Queue'):
keys = list(get_keys(queue)) keys = list(get_keys(queue))
with queue.connection.pipeline() as pipeline: with queue.connection.pipeline() as pipeline:
for key in keys: for key in keys:
pipeline.exists(key) pipeline.exists(key)
results = pipeline.execute() results = pipeline.execute()

@ -326,6 +326,21 @@ class TestRQCli(RQTestCase):
result = runner.invoke(main, args + ['--quiet', '--verbose']) result = runner.invoke(main, args + ['--quiet', '--verbose'])
self.assertNotEqual(result.exit_code, 0) self.assertNotEqual(result.exit_code, 0)
def test_worker_dequeue_strategy(self):
"""--quiet and --verbose logging options are supported"""
runner = CliRunner()
args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'random']
result = runner.invoke(main, args)
self.assert_normal_execution(result)
args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'round_robin']
result = runner.invoke(main, args)
self.assert_normal_execution(result)
args = ['worker', '-u', self.redis_url, '-b', '--dequeue-strategy', 'wrong']
result = runner.invoke(main, args)
self.assertEqual(result.exit_code, 1)
def test_exception_handlers(self): def test_exception_handlers(self):
"""rq worker -u <url> -b --exception-handler <handler>""" """rq worker -u <url> -b --exception-handler <handler>"""
connection = Redis.from_url(self.redis_url) connection = Redis.from_url(self.redis_url)

@ -1136,6 +1136,59 @@ class TestWorker(RQTestCase):
worker = Worker.find_by_key(w2.key) worker = Worker.find_by_key(w2.key)
self.assertEqual(worker.python_version, python_version) self.assertEqual(worker.python_version, python_version)
def test_dequeue_random_strategy(self):
qs = [Queue('q%d' % i) for i in range(5)]
for i in range(5):
for j in range(3):
qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
w = Worker(qs)
w.work(burst=True, dequeue_strategy="random")
start_times = []
for i in range(5):
for j in range(3):
job = Job.fetch('q%d_%d' % (i, j))
start_times.append(('q%d_%d' % (i, j), job.started_at))
sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
sorted_ids = [tup[0] for tup in sorted_by_time]
expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
self.assertNotEqual(sorted_ids, expected_rr)
self.assertNotEqual(sorted_ids, expected_ser)
expected_rr.reverse()
expected_ser.reverse()
self.assertNotEqual(sorted_ids, expected_rr)
self.assertNotEqual(sorted_ids, expected_ser)
sorted_ids.sort()
expected_ser.sort()
self.assertEqual(sorted_ids, expected_ser)
def test_dequeue_round_robin(self):
qs = [Queue('q%d' % i) for i in range(5)]
for i in range(5):
for j in range(3):
qs[i].enqueue(say_pid, job_id='q%d_%d' % (i, j))
w = Worker(qs)
w.work(burst=True, dequeue_strategy="round_robin")
start_times = []
for i in range(5):
for j in range(3):
job = Job.fetch('q%d_%d' % (i, j))
start_times.append(('q%d_%d' % (i, j), job.started_at))
sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
sorted_ids = [tup[0] for tup in sorted_by_time]
expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
self.assertEqual(expected, sorted_ids)
def wait_and_kill_work_horse(pid, time_to_wait=0.0): def wait_and_kill_work_horse(pid, time_to_wait=0.0):
time.sleep(time_to_wait) time.sleep(time_to_wait)

Loading…
Cancel
Save