Restart scheduler process if the process is not alive (#1764)

main
Oleg 2 years ago committed by GitHub
parent 55f833ab6f
commit 4bd0f12ec9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,4 +10,5 @@ DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds
DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S'
DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120 DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120
DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s'
DEFAULT_MAINTENANCE_TASK_INTERVAL = 10 * 60
CALLBACK_TIMEOUT = 60 CALLBACK_TIMEOUT = 60

@ -118,7 +118,7 @@ class RQScheduler:
# If auto_start is requested and scheduler is not started, # If auto_start is requested and scheduler is not started,
# run self.start() # run self.start()
if self._acquired_locks and auto_start: if self._acquired_locks and auto_start:
if not self._process: if not self._process or not self._process.is_alive():
self.start() self.start()
return successful_locks return successful_locks

@ -32,7 +32,7 @@ from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command
from .utils import as_text from .utils import as_text
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL, from .defaults import (CALLBACK_TIMEOUT, DEFAULT_MAINTENANCE_TASK_INTERVAL, DEFAULT_RESULT_TTL,
DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_WORKER_TTL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT) DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_DATE_FORMAT)
from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException from .exceptions import DeserializationError, DequeueTimeout, ShutDownImminentException
@ -546,7 +546,7 @@ class Worker:
""" """
# No need to try to start scheduler on first run # No need to try to start scheduler on first run
if self.last_cleaned_at: if self.last_cleaned_at:
if self.scheduler and not self.scheduler._process: if self.scheduler and (not self.scheduler._process or not self.scheduler._process.is_alive()):
self.scheduler.acquire_locks(auto_start=True) self.scheduler.acquire_locks(auto_start=True)
self.clean_registries() self.clean_registries()
@ -1240,7 +1240,7 @@ class Worker:
"""Maintenance tasks should run on first startup or every 10 minutes.""" """Maintenance tasks should run on first startup or every 10 minutes."""
if self.last_cleaned_at is None: if self.last_cleaned_at is None:
return True return True
if (utcnow() - self.last_cleaned_at) > timedelta(minutes=10): if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL):
return True return True
return False return False

@ -11,6 +11,7 @@ from rq.scheduler import RQScheduler
from rq.serializers import JSONSerializer from rq.serializers import JSONSerializer
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, find_empty_redis_database, ssl_test from tests import RQTestCase, find_empty_redis_database, ssl_test
@ -139,7 +140,7 @@ class TestScheduler(RQTestCase):
# scheduler.should_reacquire_locks always returns False if # scheduler.should_reacquire_locks always returns False if
# scheduler.acquired_locks and scheduler._queue_names are the same # scheduler.acquired_locks and scheduler._queue_names are the same
self.assertFalse(scheduler.should_reacquire_locks) self.assertFalse(scheduler.should_reacquire_locks)
scheduler.lock_acquisition_time = datetime.now() - timedelta(minutes=16) scheduler.lock_acquisition_time = datetime.now() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL+6)
self.assertFalse(scheduler.should_reacquire_locks) self.assertFalse(scheduler.should_reacquire_locks)
scheduler._queue_names = set(['default', 'foo']) scheduler._queue_names = set(['default', 'foo'])
@ -176,11 +177,24 @@ class TestScheduler(RQTestCase):
self.assertEqual(mocked.call_count, 1) self.assertEqual(mocked.call_count, 1)
# If process has started, scheduler.start() won't be called # If process has started, scheduler.start() won't be called
running_process = mock.MagicMock()
running_process.is_alive.return_value = True
scheduler = RQScheduler(['auto-start2'], self.testconn) scheduler = RQScheduler(['auto-start2'], self.testconn)
scheduler._process = 1 scheduler._process = running_process
with mock.patch.object(scheduler, 'start') as mocked: with mock.patch.object(scheduler, 'start') as mocked:
scheduler.acquire_locks(auto_start=True) scheduler.acquire_locks(auto_start=True)
self.assertEqual(mocked.call_count, 0) self.assertEqual(mocked.call_count, 0)
self.assertEqual(running_process.is_alive.call_count, 1)
# If the process has stopped for some reason, the scheduler should restart
scheduler = RQScheduler(['auto-start3'], self.testconn)
stopped_process = mock.MagicMock()
stopped_process.is_alive.return_value = False
scheduler._process = stopped_process
with mock.patch.object(scheduler, 'start') as mocked:
scheduler.acquire_locks(auto_start=True)
self.assertEqual(mocked.call_count, 1)
self.assertEqual(stopped_process.is_alive.call_count, 1)
def test_heartbeat(self): def test_heartbeat(self):
"""Test that heartbeat updates locking keys TTL""" """Test that heartbeat updates locking keys TTL"""
@ -272,15 +286,32 @@ class TestWorker(RQTestCase):
worker.run_maintenance_tasks() worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0) self.assertEqual(mocked.call_count, 0)
# if scheduler object exists and it's a first start, acquire locks should not run
worker.last_cleaned_at = None worker.last_cleaned_at = None
worker.scheduler = RQScheduler([queue], connection=self.testconn) worker.scheduler = RQScheduler([queue], connection=self.testconn)
worker.run_maintenance_tasks() worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 0) self.assertEqual(mocked.call_count, 0)
# the scheduler exists and it's NOT a first start, since the process doesn't exists,
# should call acquire_locks to start the process
worker.last_cleaned_at = datetime.now() worker.last_cleaned_at = datetime.now()
worker.run_maintenance_tasks() worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 1) self.assertEqual(mocked.call_count, 1)
# the scheduler exists, the process exists, but the process is not alive
running_process = mock.MagicMock()
running_process.is_alive.return_value = False
worker.scheduler._process = running_process
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 2)
self.assertEqual(running_process.is_alive.call_count, 1)
# the scheduler exists, the process exits, and it is alive. acquire_locks shouldn't run
running_process.is_alive.return_value = True
worker.run_maintenance_tasks()
self.assertEqual(mocked.call_count, 2)
self.assertEqual(running_process.is_alive.call_count, 2)
def test_work(self): def test_work(self):
queue = Queue(connection=self.testconn) queue = Queue(connection=self.testconn)
worker = Worker(queues=[queue], connection=self.testconn) worker = Worker(queues=[queue], connection=self.testconn)

Loading…
Cancel
Save