Added worker.clean_registries().

main
Selwin Ong 10 years ago
parent faf9d3e668
commit 5782ac10c4

@ -20,7 +20,7 @@ from .exceptions import DequeueTimeout, NoQueueError
from .job import Job, JobStatus from .job import Job, JobStatus
from .logutils import setup_loghandlers from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue from .queue import get_failed_queue, Queue
from .registry import FinishedJobRegistry, StartedJobRegistry from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse
@ -146,6 +146,7 @@ class Worker(object):
self._stopped = False self._stopped = False
self.log = logger self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection) self.failed_queue = get_failed_queue(connection=self.connection)
self.maintenance_date = None
# By default, push the "move-to-failed-queue" exception handler onto # By default, push the "move-to-failed-queue" exception handler onto
# the stack # the stack
@ -646,6 +647,12 @@ class Worker(object):
"""The hash does not take the database/connection into account""" """The hash does not take the database/connection into account"""
return hash(self.name) return hash(self.name)
def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
clean_registries(queue)
self.maintenance_date = utcnow()
class SimpleWorker(Worker): class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs): def main_work_horse(self, *args, **kwargs):

@ -401,3 +401,22 @@ class TestWorker(RQTestCase):
death_date = w.death_date death_date = w.death_date
self.assertIsNotNone(death_date) self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime') self.assertEquals(type(death_date).__name__, 'datetime')
def test_clean_queue_registries(self):
"""worker.clean_registries sets maintenance_date and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
self.assertEqual(worker.maintenance_date, None)
worker.clean_registries()
self.assertNotEqual(worker.maintenance_date, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0)

Loading…
Cancel
Save