diff --git a/rq/registry.py b/rq/registry.py index 1180d4f..ded86da 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,8 +8,9 @@ from .utils import current_timestamp class BaseRegistry(object): """ - Base implementation of job registry, implemented in Redis sorted set. Each job - is stored as a key in the registry, scored by expiration time (unix timestamp). + Base implementation of a job registry, implemented in Redis sorted set. + Each job is stored as a key in the registry, scored by expiration time + (unix timestamp). """ def __init__(self, name='default', connection=None): @@ -134,3 +135,11 @@ class DeferredJobRegistry(BaseRegistry): automatically called by `count()` and `get_job_ids()` methods implemented in BaseRegistry.""" pass + + +def clean_registries(queue): + """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" + registry = FinishedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() + registry = StartedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() diff --git a/rq/worker.py b/rq/worker.py index 39e8ce6..e53aff4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import timedelta import errno import logging import os @@ -20,7 +21,7 @@ from .exceptions import DequeueTimeout, NoQueueError from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue -from .registry import FinishedJobRegistry, StartedJobRegistry +from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse @@ -145,6 +146,7 @@ class Worker(object): self._stopped = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + self.maintenance_date = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -408,6 +410,9 @@ class Worker(object): try: self.check_for_suspension(burst) + if self.should_run_maintenance_tasks: + self.clean_registries() + if self.stopped: self.log.info('Stopping on request.') break @@ -614,7 +619,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) @@ -652,6 +657,21 @@ class Worker(object): """The hash does not take the database/connection into account""" return hash(self.name) + def clean_registries(self): + """Runs maintenance jobs on each Queue's registries.""" + for queue in self.queues: + clean_registries(queue) + self.maintenance_date = utcnow() + + @property + def should_run_maintenance_tasks(self): + """Maintenance tasks should run on first startup or every hour.""" + if self.maintenance_date is None: + return True + if (utcnow() - self.maintenance_date) > timedelta(seconds=3600): + return True + return False + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_registry.py b/tests/test_registry.py index 628636a..9bb1856 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -6,8 +6,8 @@ from rq.job import Job, JobStatus from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, - StartedJobRegistry) +from rq.registry import (clean_registries, DeferredJobRegistry, + FinishedJobRegistry, StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -107,6 +107,21 @@ class TestRegistry(RQTestCase): self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) + def test_clean_registries(self): + """clean_registries() cleans Started and Finished job registries.""" + + queue = Queue(connection=self.testconn) + + finished_job_registry = FinishedJobRegistry(connection=self.testconn) + self.testconn.zadd(finished_job_registry.key, 1, 'foo') + + started_job_registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(started_job_registry.key, 1, 'foo') + + clean_registries(queue) + self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) + class TestFinishedJobRegistry(RQTestCase): diff --git a/tests/test_worker.py b/tests/test_worker.py index c74d36a..62d2876 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +from datetime import timedelta from time import sleep from tests import RQTestCase, slow @@ -16,6 +17,7 @@ from rq.exceptions import NoQueueError from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend +from rq.utils import utcnow class CustomJob(Job): @@ -436,3 +438,43 @@ class TestWorker(RQTestCase): death_date = w.death_date self.assertIsNotNone(death_date) self.assertEquals(type(death_date).__name__, 'datetime') + + def test_clean_queue_registries(self): + """worker.clean_registries sets maintenance_date and cleans registries.""" + foo_queue = Queue('foo', connection=self.testconn) + foo_registry = StartedJobRegistry('foo', connection=self.testconn) + self.testconn.zadd(foo_registry.key, 1, 'foo') + self.assertEqual(self.testconn.zcard(foo_registry.key), 1) + + bar_queue = Queue('bar', connection=self.testconn) + bar_registry = StartedJobRegistry('bar', connection=self.testconn) + self.testconn.zadd(bar_registry.key, 1, 'bar') + self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + + worker = Worker([foo_queue, bar_queue]) + self.assertEqual(worker.maintenance_date, None) + worker.clean_registries() + self.assertNotEqual(worker.maintenance_date, None) + self.assertEqual(self.testconn.zcard(foo_registry.key), 0) + self.assertEqual(self.testconn.zcard(bar_registry.key), 0) + + def test_should_run_maintenance_tasks(self): + """Workers should run maintenance tasks on startup and every hour.""" + queue = Queue(connection=self.testconn) + worker = Worker(queue) + self.assertTrue(worker.should_run_maintenance_tasks) + + worker.maintenance_date = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.maintenance_date = utcnow() - timedelta(seconds=3700) + self.assertTrue(worker.should_run_maintenance_tasks) + + def test_worker_calls_clean_registries(self): + """Worker calls clean_registries when run.""" + queue = Queue(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(registry.key, 1, 'foo') + + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + self.assertEqual(self.testconn.zcard(registry.key), 0)