Merge branch 'master' of github.com:rq/rq

main
Selwin Ong 4 years ago
commit 6fab48845f

@ -177,9 +177,14 @@ class RQScheduler(object):
def stop(self):
self.log.info("Scheduler stopping, releasing locks for %s...",
','.join(self._queue_names))
self.release_locks()
self._status = self.Status.STOPPED
def release_locks(self):
"""Release acquired locks"""
keys = [self.get_locking_key(name) for name in self._queue_names]
self.connection.delete(*keys)
self._status = self.Status.STOPPED
self._acquired_locks = set()
def start(self):
self._status = self.Status.STARTED

@ -3,7 +3,6 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import errno
import json
import logging
import os
import random
@ -551,6 +550,7 @@ class Worker(object):
# before working. Otherwise, start scheduler in a separate process
if burst:
self.scheduler.enqueue_scheduled_jobs()
self.scheduler.release_locks()
else:
self.scheduler.start()

@ -268,6 +268,7 @@ class TestWorker(RQTestCase):
worker = Worker(queues=[queue], connection=self.testconn)
worker.work(burst=True, with_scheduler=True)
self.assertIsNotNone(worker.scheduler)
self.assertIsNone(self.testconn.get(worker.scheduler.get_locking_key('default')))
@mock.patch.object(RQScheduler, 'acquire_locks')
def test_run_maintenance_tasks(self, mocked):

Loading…
Cancel
Save