Scheduler should release and heartbeat only acquired locks (#1914)

* Scheduler should release and heartbeat only acquired locks.

* Added tests for heartbeat and release only acquired locks.

* Changed test description to correct one.
main
xzander 2 years ago committed by GitHub
parent ea063edf0a
commit bdbc9a4f9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -171,24 +171,24 @@ class RQScheduler:
def heartbeat(self): def heartbeat(self):
"""Updates the TTL on scheduler keys and the locks""" """Updates the TTL on scheduler keys and the locks"""
self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks)) self.log.debug('Scheduler sending heartbeat to %s', ', '.join(self.acquired_locks))
if len(self._queue_names) > 1: if len(self._acquired_locks) > 1:
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
for name in self._acquired_locks: for name in self._acquired_locks:
key = self.get_locking_key(name) key = self.get_locking_key(name)
pipeline.expire(key, self.interval + 60) pipeline.expire(key, self.interval + 60)
pipeline.execute() pipeline.execute()
else: elif self._acquired_locks:
key = self.get_locking_key(next(iter(self._queue_names))) key = self.get_locking_key(next(iter(self._acquired_locks)))
self.connection.expire(key, self.interval + 60) self.connection.expire(key, self.interval + 60)
def stop(self): def stop(self):
self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._queue_names)) self.log.info('Scheduler stopping, releasing locks for %s...', ', '.join(self._acquired_locks))
self.release_locks() self.release_locks()
self._status = self.Status.STOPPED self._status = self.Status.STOPPED
def release_locks(self): def release_locks(self):
"""Release acquired locks""" """Release acquired locks"""
keys = [self.get_locking_key(name) for name in self._queue_names] keys = [self.get_locking_key(name) for name in self._acquired_locks]
self.connection.delete(*keys) self.connection.delete(*keys)
self._acquired_locks = set() self._acquired_locks = set()

@ -204,6 +204,33 @@ class TestScheduler(RQTestCase):
self.assertEqual(mocked.call_count, 1) self.assertEqual(mocked.call_count, 1)
self.assertEqual(stopped_process.is_alive.call_count, 1) self.assertEqual(stopped_process.is_alive.call_count, 1)
def test_lock_release(self):
"""Test that scheduler.release_locks() only releases acquired locks"""
name_1 = 'lock-test-1'
name_2 = 'lock-test-2'
scheduler_1 = RQScheduler([name_1], self.testconn)
self.assertEqual(scheduler_1.acquire_locks(), {name_1})
self.assertEqual(scheduler_1._acquired_locks, {name_1})
# Only name_2 is returned since name_1 is already locked
scheduler_1_2 = RQScheduler([name_1, name_2], self.testconn)
self.assertEqual(scheduler_1_2.acquire_locks(), {name_2})
self.assertEqual(scheduler_1_2._acquired_locks, {name_2})
self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1)))
self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1)))
self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_2)))
scheduler_1_2.release_locks()
self.assertEqual(scheduler_1_2._acquired_locks, set())
self.assertEqual(scheduler_1._acquired_locks, {name_1})
self.assertTrue(self.testconn.exists(scheduler_1.get_locking_key(name_1)))
self.assertTrue(self.testconn.exists(scheduler_1_2.get_locking_key(name_1)))
self.assertFalse(self.testconn.exists(scheduler_1_2.get_locking_key(name_2)))
def test_queue_scheduler_pid(self): def test_queue_scheduler_pid(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
scheduler = RQScheduler( scheduler = RQScheduler(
@ -219,25 +246,33 @@ class TestScheduler(RQTestCase):
"""Test that heartbeat updates locking keys TTL""" """Test that heartbeat updates locking keys TTL"""
name_1 = 'lock-test-1' name_1 = 'lock-test-1'
name_2 = 'lock-test-2' name_2 = 'lock-test-2'
scheduler = RQScheduler([name_1, name_2], self.testconn) name_3 = 'lock-test-3'
scheduler = RQScheduler([name_3], self.testconn)
scheduler.acquire_locks()
scheduler = RQScheduler([name_1, name_2, name_3], self.testconn)
scheduler.acquire_locks() scheduler.acquire_locks()
locking_key_1 = RQScheduler.get_locking_key(name_1) locking_key_1 = RQScheduler.get_locking_key(name_1)
locking_key_2 = RQScheduler.get_locking_key(name_2) locking_key_2 = RQScheduler.get_locking_key(name_2)
locking_key_3 = RQScheduler.get_locking_key(name_3)
with self.testconn.pipeline() as pipeline: with self.testconn.pipeline() as pipeline:
pipeline.expire(locking_key_1, 1000) pipeline.expire(locking_key_1, 1000)
pipeline.expire(locking_key_2, 1000) pipeline.expire(locking_key_2, 1000)
pipeline.expire(locking_key_3, 1000)
pipeline.execute()
scheduler.heartbeat() scheduler.heartbeat()
self.assertEqual(self.testconn.ttl(locking_key_1), 61) self.assertEqual(self.testconn.ttl(locking_key_1), 61)
self.assertEqual(self.testconn.ttl(locking_key_1), 61) self.assertEqual(self.testconn.ttl(locking_key_2), 61)
self.assertEqual(self.testconn.ttl(locking_key_3), 1000)
# scheduler.stop() releases locks and sets status to STOPPED # scheduler.stop() releases locks and sets status to STOPPED
scheduler._status = scheduler.Status.WORKING scheduler._status = scheduler.Status.WORKING
scheduler.stop() scheduler.stop()
self.assertFalse(self.testconn.exists(locking_key_1)) self.assertFalse(self.testconn.exists(locking_key_1))
self.assertFalse(self.testconn.exists(locking_key_2)) self.assertFalse(self.testconn.exists(locking_key_2))
self.assertTrue(self.testconn.exists(locking_key_3))
self.assertEqual(scheduler.status, scheduler.Status.STOPPED) self.assertEqual(scheduler.status, scheduler.Status.STOPPED)
# Heartbeat also works properly for schedulers with a single queue # Heartbeat also works properly for schedulers with a single queue

Loading…
Cancel
Save