Worker now runs maintenance tasks every hour and on startup.

main
Selwin Ong 10 years ago
parent 5782ac10c4
commit c3767e28e2

@ -2,6 +2,7 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import timedelta
import errno import errno
import logging import logging
import os import os
@ -403,6 +404,9 @@ class Worker(object):
try: try:
self.check_for_suspension(burst) self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
self.clean_registries()
if self.stopped: if self.stopped:
self.log.info('Stopping on request.') self.log.info('Stopping on request.')
break break
@ -609,7 +613,7 @@ class Worker(object):
'arguments': job.args, 'arguments': job.args,
'kwargs': job.kwargs, 'kwargs': job.kwargs,
'queue': job.origin, 'queue': job.origin,
}) })
for handler in reversed(self._exc_handlers): for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,)) self.log.debug('Invoking exception handler %s' % (handler,))
@ -653,6 +657,15 @@ class Worker(object):
clean_registries(queue) clean_registries(queue)
self.maintenance_date = utcnow() self.maintenance_date = utcnow()
@property
def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour."""
if self.maintenance_date is None:
return True
if (utcnow() - self.maintenance_date) > timedelta(seconds=3600):
return True
return False
class SimpleWorker(Worker): class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import os import os
from datetime import timedelta
from time import sleep from time import sleep
from tests import RQTestCase, slow from tests import RQTestCase, slow
@ -15,6 +16,7 @@ from rq.compat import as_text
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow
class CustomJob(Job): class CustomJob(Job):
@ -420,3 +422,24 @@ class TestWorker(RQTestCase):
self.assertNotEqual(worker.maintenance_date, None) self.assertNotEqual(worker.maintenance_date, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0)
def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.maintenance_date = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.maintenance_date = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0)

Loading…
Cancel
Save