Configurable maintenance task interval (#1823)

* Configurable maintenance task interval

* pass to worker

* rename parameter

* rename

* rename

* test
main
Rony Lutsky 2 years ago committed by GitHub
parent 42ac7d4150
commit 41406db3eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -32,7 +32,7 @@ from rq.defaults import (
DEFAULT_JOB_MONITORING_INTERVAL, DEFAULT_JOB_MONITORING_INTERVAL,
DEFAULT_LOGGING_FORMAT, DEFAULT_LOGGING_FORMAT,
DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_DATE_FORMAT,
DEFAULT_SERIALIZER_CLASS, DEFAULT_SERIALIZER_CLASS, DEFAULT_MAINTENANCE_TASK_INTERVAL,
) )
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
from rq.registry import FailedJobRegistry, clean_registries from rq.registry import FailedJobRegistry, clean_registries
@ -200,7 +200,13 @@ def info(cli_config, interval, raw, only_queues, only_workers, by_queue, queues,
@click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs') @click.option('--date-format', type=str, default=DEFAULT_LOGGING_DATE_FORMAT, help='Set the date format of the logs')
@click.option('--name', '-n', help='Specify a different name') @click.option('--name', '-n', help='Specify a different name')
@click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used') @click.option('--results-ttl', type=int, default=DEFAULT_RESULT_TTL, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Default worker timeout to be used') @click.option('--worker-ttl', type=int, default=DEFAULT_WORKER_TTL, help='Worker timeout to be used')
@click.option(
'--maintenance-interval',
type=int,
default=DEFAULT_MAINTENANCE_TASK_INTERVAL,
help='Maintenance task interval (in seconds) to be used'
)
@click.option( @click.option(
'--job-monitoring-interval', '--job-monitoring-interval',
type=int, type=int,
@ -228,6 +234,7 @@ def worker(
name, name,
results_ttl, results_ttl,
worker_ttl, worker_ttl,
maintenance_interval,
job_monitoring_interval, job_monitoring_interval,
disable_job_desc_logging, disable_job_desc_logging,
verbose, verbose,
@ -283,6 +290,7 @@ def worker(
connection=cli_config.connection, connection=cli_config.connection,
default_worker_ttl=worker_ttl, default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl, default_result_ttl=results_ttl,
maintenance_interval=maintenance_interval,
job_monitoring_interval=job_monitoring_interval, job_monitoring_interval=job_monitoring_interval,
job_class=cli_config.job_class, job_class=cli_config.job_class,
queue_class=cli_config.queue_class, queue_class=cli_config.queue_class,

@ -226,6 +226,7 @@ class Worker:
exc_handler=None, exc_handler=None,
exception_handlers=None, exception_handlers=None,
default_worker_ttl=DEFAULT_WORKER_TTL, default_worker_ttl=DEFAULT_WORKER_TTL,
maintenance_interval: int = DEFAULT_MAINTENANCE_TASK_INTERVAL,
job_class: Type['Job'] = None, job_class: Type['Job'] = None,
queue_class=None, queue_class=None,
log_job_description: bool = True, log_job_description: bool = True,
@ -238,6 +239,7 @@ class Worker:
self.default_result_ttl = default_result_ttl self.default_result_ttl = default_result_ttl
self.worker_ttl = default_worker_ttl self.worker_ttl = default_worker_ttl
self.job_monitoring_interval = job_monitoring_interval self.job_monitoring_interval = job_monitoring_interval
self.maintenance_interval = maintenance_interval
connection = self._set_connection(connection) connection = self._set_connection(connection)
self.connection = connection self.connection = connection
@ -1485,7 +1487,7 @@ class Worker:
"""Maintenance tasks should run on first startup or every 10 minutes.""" """Maintenance tasks should run on first startup or every 10 minutes."""
if self.last_cleaned_at is None: if self.last_cleaned_at is None:
return True return True
if (utcnow() - self.last_cleaned_at) > timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL): if (utcnow() - self.last_cleaned_at) > timedelta(seconds=self.maintenance_interval):
return True return True
return False return False

@ -19,6 +19,7 @@ import pytest
from unittest import mock from unittest import mock
from unittest.mock import Mock from unittest.mock import Mock
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import ( from tests.fixtures import (
access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing, access_self, create_file, create_file_after_timeout, create_file_after_timeout_and_setsid, div_by_zero, do_nothing,
@ -936,7 +937,15 @@ class TestWorker(RQTestCase):
worker.last_cleaned_at = utcnow() worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks) self.assertFalse(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow() - timedelta(seconds=3700) worker.last_cleaned_at = utcnow() - timedelta(seconds=DEFAULT_MAINTENANCE_TASK_INTERVAL + 100)
self.assertTrue(worker.should_run_maintenance_tasks)
# custom maintenance_interval
worker = Worker(queue, maintenance_interval=10)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.last_cleaned_at = utcnow() - timedelta(seconds=11)
self.assertTrue(worker.should_run_maintenance_tasks) self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self): def test_worker_calls_clean_registries(self):

Loading…
Cancel
Save