From 08ef54dcf49d5eeb8319c9ad27e4405807e1e421 Mon Sep 17 00:00:00 2001 From: Biel Cardona Date: Wed, 31 Mar 2021 03:15:34 +0200 Subject: [PATCH] Workers dequeuing jobs from queues using both Round-Robin and Random strategies (#1425) * implemented round-robin and random access to queues * added tests for RoundRobinQueue * reverted change in gitignore * removed linebreak * added tests for random queues * added documentation for round robin and random queues * moved round robin strategy to worker * reverted changes to queue.py * reverted changes to workers.md * reverted changes to test_queue * added tests for RoundRobinWorker and RandomWorker * added doc for round robin and random workers * removed f-strings for backward compatibility * corrected a mistake * minor changes (code style) * now using _ordered_queues instead of queues for reordering queues --- docs/docs/workers.md | 16 +++++++++++++ rq/worker.py | 26 +++++++++++++++++++- tests/test_worker.py | 57 +++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 97 insertions(+), 2 deletions(-) diff --git a/docs/docs/workers.md b/docs/docs/workers.md index 921e496..298c451 100644 --- a/docs/docs/workers.md +++ b/docs/docs/workers.md @@ -308,6 +308,8 @@ more common requests so far are: 2. Using a job execution model that does not require `os.fork`. 3. The ability to use different concurrency models such as `multiprocessing` or `gevent`. +4. Using a custom strategy for dequeuing jobs from different queues. + See [link](#Round-Robin-and-Random-strategies-for-dequeuing-jobs-from-queues). You can use the `-w` option to specify a different worker class to use: @@ -316,6 +318,20 @@ $ rq worker -w 'path.to.GeventWorker' ``` +## Round Robin and Random strategies for dequeuing jobs from queues + +In certain circumstances it can be useful that a when a worker is listening to multiple queues, +say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st +dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th +from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker` +implements this strategy. + +In some other circumstances, when a worker is listening to multiple queues, it can be useful +to pull jobs from the different queues randomly. The custom class `rq.worker.RoundRobinWorker` +implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is +shuffled, so that no queue has more priority than the other ones. + + ## Custom Job and Queue Classes You can tell the worker to use a custom class for jobs and queues using diff --git a/rq/worker.py b/rq/worker.py index d2b400f..437a3ae 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,6 +16,7 @@ import warnings from datetime import datetime, timedelta, timezone from distutils.version import StrictVersion from uuid import uuid4 +from random import shuffle try: from signal import SIGKILL @@ -198,6 +199,7 @@ class Worker(object): self.name = name or uuid4().hex self.queues = queues self.validate_queues() + self._ordered_queues = self.queues[:] self._exc_handlers = [] self.default_result_ttl = default_result_ttl @@ -525,6 +527,9 @@ class Worker(object): self.pubsub.unsubscribe() self.pubsub.close() + def reorder_queues(self, reference_queue): + pass + def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): """Starts the work loop. @@ -581,6 +586,7 @@ class Worker(object): break job, queue = result + self.reorder_queues(reference_queue=queue) self.execute_job(job, queue) self.heartbeat() @@ -642,7 +648,7 @@ class Worker(object): if self.should_run_maintenance_tasks: self.run_maintenance_tasks() - result = self.queue_class.dequeue_any(self.queues, timeout, + result = self.queue_class.dequeue_any(self._ordered_queues, timeout, connection=self.connection, job_class=self.job_class, serializer=self.serializer) @@ -1160,3 +1166,21 @@ class HerokuWorker(Worker): info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) self.log.warning('raising ShutDownImminentException to cancel job...') raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) + + +class RoundRobinWorker(Worker): + """ + Modified version of Worker that dequeues jobs from the queues using a round-robin strategy. + """ + def reorder_queues(self, reference_queue): + pos = self._ordered_queues.index(reference_queue) + self._ordered_queues = self._ordered_queues[pos+1:] + self._ordered_queues[:pos+1] + + +class RandomWorker(Worker): + """ + Modified version of Worker that dequeues jobs from the queues using a random strategy. + """ + + def reorder_queues(self, reference_queue): + shuffle(self._ordered_queues) diff --git a/tests/test_worker.py b/tests/test_worker.py index 1cf2be1..c7d9ae6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -38,7 +38,7 @@ from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegist from rq.suspension import resume, suspend from rq.utils import utcnow from rq.version import VERSION -from rq.worker import HerokuWorker, WorkerStatus +from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker from rq.serializers import JSONSerializer class CustomJob(Job): @@ -1362,3 +1362,58 @@ class TestExceptionHandlerMessageEncoding(RQTestCase): def test_handle_exception_handles_non_ascii_in_exception_message(self): """worker.handle_exception doesn't crash on non-ascii in exception message.""" self.worker.handle_exception(Mock(), *self.exc_info) + + +class TestRoundRobinWorker(RQTestCase): + def test_round_robin(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, + job_id='q%d_%d' % (i, j)) + + w = RoundRobinWorker(qs) + w.work(burst=True) + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0', + 'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1', + 'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2'] + self.assertEqual(expected, sorted_ids) + + +class TestRandomWorker(RQTestCase): + def test_random_worker(self): + qs = [Queue('q%d' % i) for i in range(5)] + + for i in range(5): + for j in range(3): + qs[i].enqueue(say_pid, + job_id='q%d_%d' % (i, j)) + + w = RandomWorker(qs) + w.work(burst=True) + start_times = [] + for i in range(5): + for j in range(3): + job = Job.fetch('q%d_%d' % (i, j)) + start_times.append(('q%d_%d' % (i, j), job.started_at)) + sorted_by_time = sorted(start_times, key=lambda tup: tup[1]) + sorted_ids = [tup[0] for tup in sorted_by_time] + expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)] + expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)] + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + expected_rr.reverse() + expected_ser.reverse() + self.assertNotEqual(sorted_ids, expected_rr) + self.assertNotEqual(sorted_ids, expected_ser) + sorted_ids.sort() + expected_ser.sort() + self.assertEqual(sorted_ids, expected_ser)