Workers dequeuing jobs from queues using both Round-Robin and Random strategies (#1425)

* implemented round-robin and random access to queues

* added tests for RoundRobinQueue

* reverted change in gitignore

* removed linebreak

* added tests for random queues

* added documentation for round robin and random queues

* moved round robin strategy to worker

* reverted changes to queue.py

* reverted changes to workers.md

* reverted changes to test_queue

* added tests for RoundRobinWorker and RandomWorker

* added doc for round robin and random workers

* removed f-strings for backward compatibility

* corrected a mistake

* minor changes (code style)

* now using _ordered_queues instead of queues for reordering queues
main
Biel Cardona 4 years ago committed by GitHub
parent dcbbd067f0
commit 08ef54dcf4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -308,6 +308,8 @@ more common requests so far are:
2. Using a job execution model that does not require `os.fork`. 2. Using a job execution model that does not require `os.fork`.
3. The ability to use different concurrency models such as 3. The ability to use different concurrency models such as
`multiprocessing` or `gevent`. `multiprocessing` or `gevent`.
4. Using a custom strategy for dequeuing jobs from different queues.
See [link](#Round-Robin-and-Random-strategies-for-dequeuing-jobs-from-queues).
You can use the `-w` option to specify a different worker class to use: You can use the `-w` option to specify a different worker class to use:
@ -316,6 +318,20 @@ $ rq worker -w 'path.to.GeventWorker'
``` ```
## Round Robin and Random strategies for dequeuing jobs from queues
In certain circumstances it can be useful that a when a worker is listening to multiple queues,
say `q1`,`q2`,`q3`, the jobs are dequeued using a Round Robin strategy. That is, the 1st
dequeued job is taken from `q1`, the 2nd from `q2`, the 3rd from `q3`, the 4th
from `q1`, the 5th from `q2` and so on. The custom worker class `rq.worker.RoundRobinWorker`
implements this strategy.
In some other circumstances, when a worker is listening to multiple queues, it can be useful
to pull jobs from the different queues randomly. The custom class `rq.worker.RoundRobinWorker`
implements this strategy. In fact, whenever a job is pulled from any queue, the list of queues is
shuffled, so that no queue has more priority than the other ones.
## Custom Job and Queue Classes ## Custom Job and Queue Classes
You can tell the worker to use a custom class for jobs and queues using You can tell the worker to use a custom class for jobs and queues using

@ -16,6 +16,7 @@ import warnings
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from distutils.version import StrictVersion from distutils.version import StrictVersion
from uuid import uuid4 from uuid import uuid4
from random import shuffle
try: try:
from signal import SIGKILL from signal import SIGKILL
@ -198,6 +199,7 @@ class Worker(object):
self.name = name or uuid4().hex self.name = name or uuid4().hex
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
self._ordered_queues = self.queues[:]
self._exc_handlers = [] self._exc_handlers = []
self.default_result_ttl = default_result_ttl self.default_result_ttl = default_result_ttl
@ -525,6 +527,9 @@ class Worker(object):
self.pubsub.unsubscribe() self.pubsub.unsubscribe()
self.pubsub.close() self.pubsub.close()
def reorder_queues(self, reference_queue):
pass
def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT, def work(self, burst=False, logging_level="INFO", date_format=DEFAULT_LOGGING_DATE_FORMAT,
log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False): log_format=DEFAULT_LOGGING_FORMAT, max_jobs=None, with_scheduler=False):
"""Starts the work loop. """Starts the work loop.
@ -581,6 +586,7 @@ class Worker(object):
break break
job, queue = result job, queue = result
self.reorder_queues(reference_queue=queue)
self.execute_job(job, queue) self.execute_job(job, queue)
self.heartbeat() self.heartbeat()
@ -642,7 +648,7 @@ class Worker(object):
if self.should_run_maintenance_tasks: if self.should_run_maintenance_tasks:
self.run_maintenance_tasks() self.run_maintenance_tasks()
result = self.queue_class.dequeue_any(self.queues, timeout, result = self.queue_class.dequeue_any(self._ordered_queues, timeout,
connection=self.connection, connection=self.connection,
job_class=self.job_class, job_class=self.job_class,
serializer=self.serializer) serializer=self.serializer)
@ -1160,3 +1166,21 @@ class HerokuWorker(Worker):
info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties) info = dict((attr, getattr(frame, attr)) for attr in self.frame_properties)
self.log.warning('raising ShutDownImminentException to cancel job...') self.log.warning('raising ShutDownImminentException to cancel job...')
raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info) raise ShutDownImminentException('shut down imminent (signal: %s)' % signal_name(signum), info)
class RoundRobinWorker(Worker):
"""
Modified version of Worker that dequeues jobs from the queues using a round-robin strategy.
"""
def reorder_queues(self, reference_queue):
pos = self._ordered_queues.index(reference_queue)
self._ordered_queues = self._ordered_queues[pos+1:] + self._ordered_queues[:pos+1]
class RandomWorker(Worker):
"""
Modified version of Worker that dequeues jobs from the queues using a random strategy.
"""
def reorder_queues(self, reference_queue):
shuffle(self._ordered_queues)

@ -38,7 +38,7 @@ from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegist
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow from rq.utils import utcnow
from rq.version import VERSION from rq.version import VERSION
from rq.worker import HerokuWorker, WorkerStatus from rq.worker import HerokuWorker, WorkerStatus, RoundRobinWorker, RandomWorker
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
class CustomJob(Job): class CustomJob(Job):
@ -1362,3 +1362,58 @@ class TestExceptionHandlerMessageEncoding(RQTestCase):
def test_handle_exception_handles_non_ascii_in_exception_message(self): def test_handle_exception_handles_non_ascii_in_exception_message(self):
"""worker.handle_exception doesn't crash on non-ascii in exception message.""" """worker.handle_exception doesn't crash on non-ascii in exception message."""
self.worker.handle_exception(Mock(), *self.exc_info) self.worker.handle_exception(Mock(), *self.exc_info)
class TestRoundRobinWorker(RQTestCase):
def test_round_robin(self):
qs = [Queue('q%d' % i) for i in range(5)]
for i in range(5):
for j in range(3):
qs[i].enqueue(say_pid,
job_id='q%d_%d' % (i, j))
w = RoundRobinWorker(qs)
w.work(burst=True)
start_times = []
for i in range(5):
for j in range(3):
job = Job.fetch('q%d_%d' % (i, j))
start_times.append(('q%d_%d' % (i, j), job.started_at))
sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
sorted_ids = [tup[0] for tup in sorted_by_time]
expected = ['q0_0', 'q1_0', 'q2_0', 'q3_0', 'q4_0',
'q0_1', 'q1_1', 'q2_1', 'q3_1', 'q4_1',
'q0_2', 'q1_2', 'q2_2', 'q3_2', 'q4_2']
self.assertEqual(expected, sorted_ids)
class TestRandomWorker(RQTestCase):
def test_random_worker(self):
qs = [Queue('q%d' % i) for i in range(5)]
for i in range(5):
for j in range(3):
qs[i].enqueue(say_pid,
job_id='q%d_%d' % (i, j))
w = RandomWorker(qs)
w.work(burst=True)
start_times = []
for i in range(5):
for j in range(3):
job = Job.fetch('q%d_%d' % (i, j))
start_times.append(('q%d_%d' % (i, j), job.started_at))
sorted_by_time = sorted(start_times, key=lambda tup: tup[1])
sorted_ids = [tup[0] for tup in sorted_by_time]
expected_rr = ['q%d_%d' % (i, j) for j in range(3) for i in range(5)]
expected_ser = ['q%d_%d' % (i, j) for i in range(5) for j in range(3)]
self.assertNotEqual(sorted_ids, expected_rr)
self.assertNotEqual(sorted_ids, expected_ser)
expected_rr.reverse()
expected_ser.reverse()
self.assertNotEqual(sorted_ids, expected_rr)
self.assertNotEqual(sorted_ids, expected_ser)
sorted_ids.sort()
expected_ser.sort()
self.assertEqual(sorted_ids, expected_ser)

Loading…
Cancel
Save