diff --git a/rq/worker.py b/rq/worker.py index a4cd529..adeb56e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import timedelta import errno import logging import os @@ -403,6 +404,9 @@ class Worker(object): try: self.check_for_suspension(burst) + if self.should_run_maintenance_tasks: + self.clean_registries() + if self.stopped: self.log.info('Stopping on request.') break @@ -609,7 +613,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) @@ -653,6 +657,15 @@ class Worker(object): clean_registries(queue) self.maintenance_date = utcnow() + @property + def should_run_maintenance_tasks(self): + """Maintenance tasks should run on first startup or every hour.""" + if self.maintenance_date is None: + return True + if (utcnow() - self.maintenance_date) > timedelta(seconds=3600): + return True + return False + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 961e96b..3fc44bb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +from datetime import timedelta from time import sleep from tests import RQTestCase, slow @@ -15,6 +16,7 @@ from rq.compat import as_text from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend +from rq.utils import utcnow class CustomJob(Job): @@ -420,3 +422,24 @@ class TestWorker(RQTestCase): self.assertNotEqual(worker.maintenance_date, None) self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0) + + def test_should_run_maintenance_tasks(self): + """Workers should run maintenance tasks on startup and every hour.""" + queue = Queue(connection=self.testconn) + worker = Worker(queue) + self.assertTrue(worker.should_run_maintenance_tasks) + + worker.maintenance_date = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.maintenance_date = utcnow() - timedelta(seconds=3700) + self.assertTrue(worker.should_run_maintenance_tasks) + + def test_worker_calls_clean_registries(self): + """Worker calls clean_registries when run.""" + queue = Queue(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(registry.key, 1, 'foo') + + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + self.assertEqual(self.testconn.zcard(registry.key), 0)