Add more tolerance to scheduler heartbeat (#1555)

main
Selwin Ong 3 years ago committed by GitHub
parent c556106a38
commit cc70cacc1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -106,7 +106,7 @@ class RQScheduler:
pid = os.getpid() pid = os.getpid()
self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names)) self.log.info("Trying to acquire locks for %s", ", ".join(self._queue_names))
for name in self._queue_names: for name in self._queue_names:
if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=60): if self.connection.set(self.get_locking_key(name), pid, nx=True, ex=self.interval + 60):
successful_locks.add(name) successful_locks.add(name)
# Always reset _scheduled_job_registries when acquiring locks # Always reset _scheduled_job_registries when acquiring locks
@ -186,11 +186,11 @@ class RQScheduler:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for name in self._queue_names: for name in self._queue_names:
key = self.get_locking_key(name) key = self.get_locking_key(name)
pipeline.expire(key, self.interval + 5) pipeline.expire(key, self.interval + 60)
pipeline.execute() pipeline.execute()
else: else:
key = self.get_locking_key(next(iter(self._queue_names))) key = self.get_locking_key(next(iter(self._queue_names)))
self.connection.expire(key, self.interval + 5) self.connection.expire(key, self.interval + 60)
def stop(self): def stop(self):
self.log.info("Scheduler stopping, releasing locks for %s...", self.log.info("Scheduler stopping, releasing locks for %s...",

@ -198,8 +198,8 @@ class TestScheduler(RQTestCase):
pipeline.expire(locking_key_2, 1000) pipeline.expire(locking_key_2, 1000)
scheduler.heartbeat() scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 6) self.assertEqual(self.testconn.ttl(locking_key_1), 61)
self.assertEqual(self.testconn.ttl(locking_key_1), 6) self.assertEqual(self.testconn.ttl(locking_key_1), 61)
# scheduler.stop() releases locks and sets status to STOPPED # scheduler.stop() releases locks and sets status to STOPPED
scheduler._status = scheduler.Status.WORKING scheduler._status = scheduler.Status.WORKING
@ -213,7 +213,7 @@ class TestScheduler(RQTestCase):
scheduler.acquire_locks() scheduler.acquire_locks()
self.testconn.expire(locking_key_1, 1000) self.testconn.expire(locking_key_1, 1000)
scheduler.heartbeat() scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 6) self.assertEqual(self.testconn.ttl(locking_key_1), 61)
def test_enqueue_scheduled_jobs(self): def test_enqueue_scheduled_jobs(self):
"""Scheduler can enqueue scheduled jobs""" """Scheduler can enqueue scheduled jobs"""

Loading…
Cancel
Save