Merge pull request #534 from selwin/registry-maintenance

Worker automatically cleans job registries every hour
main
Selwin Ong 10 years ago
commit f370f79819

@ -8,8 +8,9 @@ from .utils import current_timestamp
class BaseRegistry(object): class BaseRegistry(object):
""" """
Base implementation of job registry, implemented in Redis sorted set. Each job Base implementation of a job registry, implemented in Redis sorted set.
is stored as a key in the registry, scored by expiration time (unix timestamp). Each job is stored as a key in the registry, scored by expiration time
(unix timestamp).
""" """
def __init__(self, name='default', connection=None): def __init__(self, name='default', connection=None):
@ -134,3 +135,11 @@ class DeferredJobRegistry(BaseRegistry):
automatically called by `count()` and `get_job_ids()` methods automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry.""" implemented in BaseRegistry."""
pass pass
def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()
registry = StartedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()

@ -2,6 +2,7 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
from datetime import timedelta
import errno import errno
import logging import logging
import os import os
@ -20,7 +21,7 @@ from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue from .queue import get_failed_queue, Queue
from .registry import FinishedJobRegistry, StartedJobRegistry from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse
@ -145,6 +146,7 @@ class Worker(object):
self._stopped = False self._stopped = False
self.log = logger self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection) self.failed_queue = get_failed_queue(connection=self.connection)
self.maintenance_date = None
# By default, push the "move-to-failed-queue" exception handler onto # By default, push the "move-to-failed-queue" exception handler onto
# the stack # the stack
@ -408,6 +410,9 @@ class Worker(object):
try: try:
self.check_for_suspension(burst) self.check_for_suspension(burst)
if self.should_run_maintenance_tasks:
self.clean_registries()
if self.stopped: if self.stopped:
self.log.info('Stopping on request.') self.log.info('Stopping on request.')
break break
@ -652,6 +657,21 @@ class Worker(object):
"""The hash does not take the database/connection into account""" """The hash does not take the database/connection into account"""
return hash(self.name) return hash(self.name)
def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
clean_registries(queue)
self.maintenance_date = utcnow()
@property
def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour."""
if self.maintenance_date is None:
return True
if (utcnow() - self.maintenance_date) > timedelta(seconds=3600):
return True
return False
class SimpleWorker(Worker): class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):

@ -6,8 +6,8 @@ from rq.job import Job, JobStatus
from rq.queue import FailedQueue, Queue from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.worker import Worker from rq.worker import Worker
from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, from rq.registry import (clean_registries, DeferredJobRegistry,
StartedJobRegistry) FinishedJobRegistry, StartedJobRegistry)
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello from tests.fixtures import div_by_zero, say_hello
@ -107,6 +107,21 @@ class TestRegistry(RQTestCase):
self.assertEqual(self.registry.count, 2) self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2) self.assertEqual(len(self.registry), 2)
def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries."""
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, 1, 'foo')
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)
class TestFinishedJobRegistry(RQTestCase): class TestFinishedJobRegistry(RQTestCase):

@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import os import os
from datetime import timedelta
from time import sleep from time import sleep
from tests import RQTestCase, slow from tests import RQTestCase, slow
@ -16,6 +17,7 @@ from rq.exceptions import NoQueueError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
from rq.utils import utcnow
class CustomJob(Job): class CustomJob(Job):
@ -436,3 +438,43 @@ class TestWorker(RQTestCase):
death_date = w.death_date death_date = w.death_date
self.assertIsNotNone(death_date) self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime') self.assertEquals(type(death_date).__name__, 'datetime')
def test_clean_queue_registries(self):
"""worker.clean_registries sets maintenance_date and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
self.assertEqual(worker.maintenance_date, None)
worker.clean_registries()
self.assertNotEqual(worker.maintenance_date, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0)
def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)
worker.maintenance_date = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.maintenance_date = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)
def test_worker_calls_clean_registries(self):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0)

Loading…
Cancel
Save