From faf9d3e66838a71e5b2e28add0a503e3fdc4fa03 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 08:46:00 +0700 Subject: [PATCH 1/3] Added clean_registries(queue) function to clean job registries related to that queue. --- rq/registry.py | 13 +++++++++++-- tests/test_registry.py | 19 +++++++++++++++++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/rq/registry.py b/rq/registry.py index 1180d4f..ded86da 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,8 +8,9 @@ from .utils import current_timestamp class BaseRegistry(object): """ - Base implementation of job registry, implemented in Redis sorted set. Each job - is stored as a key in the registry, scored by expiration time (unix timestamp). + Base implementation of a job registry, implemented in Redis sorted set. + Each job is stored as a key in the registry, scored by expiration time + (unix timestamp). """ def __init__(self, name='default', connection=None): @@ -134,3 +135,11 @@ class DeferredJobRegistry(BaseRegistry): automatically called by `count()` and `get_job_ids()` methods implemented in BaseRegistry.""" pass + + +def clean_registries(queue): + """Cleans StartedJobRegistry and FinishedJobRegistry of a queue.""" + registry = FinishedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() + registry = StartedJobRegistry(name=queue.name, connection=queue.connection) + registry.cleanup() diff --git a/tests/test_registry.py b/tests/test_registry.py index 628636a..9bb1856 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -6,8 +6,8 @@ from rq.job import Job, JobStatus from rq.queue import FailedQueue, Queue from rq.utils import current_timestamp from rq.worker import Worker -from rq.registry import (DeferredJobRegistry, FinishedJobRegistry, - StartedJobRegistry) +from rq.registry import (clean_registries, DeferredJobRegistry, + FinishedJobRegistry, StartedJobRegistry) from tests import RQTestCase from tests.fixtures import div_by_zero, say_hello @@ -107,6 +107,21 @@ class TestRegistry(RQTestCase): self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) + def test_clean_registries(self): + """clean_registries() cleans Started and Finished job registries.""" + + queue = Queue(connection=self.testconn) + + finished_job_registry = FinishedJobRegistry(connection=self.testconn) + self.testconn.zadd(finished_job_registry.key, 1, 'foo') + + started_job_registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(started_job_registry.key, 1, 'foo') + + clean_registries(queue) + self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) + self.assertEqual(self.testconn.zcard(started_job_registry.key), 0) + class TestFinishedJobRegistry(RQTestCase): From 5782ac10c40c84c495ee1033e78652084cab1078 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 09:01:25 +0700 Subject: [PATCH 2/3] Added worker.clean_registries(). --- rq/worker.py | 9 ++++++++- tests/test_worker.py | 19 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index fa3c49b..a4cd529 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -20,7 +20,7 @@ from .exceptions import DequeueTimeout, NoQueueError from .job import Job, JobStatus from .logutils import setup_loghandlers from .queue import get_failed_queue, Queue -from .registry import FinishedJobRegistry, StartedJobRegistry +from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry from .suspension import is_suspended from .timeouts import UnixSignalDeathPenalty from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse @@ -146,6 +146,7 @@ class Worker(object): self._stopped = False self.log = logger self.failed_queue = get_failed_queue(connection=self.connection) + self.maintenance_date = None # By default, push the "move-to-failed-queue" exception handler onto # the stack @@ -646,6 +647,12 @@ class Worker(object): """The hash does not take the database/connection into account""" return hash(self.name) + def clean_registries(self): + """Runs maintenance jobs on each Queue's registries.""" + for queue in self.queues: + clean_registries(queue) + self.maintenance_date = utcnow() + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 6f89f4a..961e96b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -401,3 +401,22 @@ class TestWorker(RQTestCase): death_date = w.death_date self.assertIsNotNone(death_date) self.assertEquals(type(death_date).__name__, 'datetime') + + def test_clean_queue_registries(self): + """worker.clean_registries sets maintenance_date and cleans registries.""" + foo_queue = Queue('foo', connection=self.testconn) + foo_registry = StartedJobRegistry('foo', connection=self.testconn) + self.testconn.zadd(foo_registry.key, 1, 'foo') + self.assertEqual(self.testconn.zcard(foo_registry.key), 1) + + bar_queue = Queue('bar', connection=self.testconn) + bar_registry = StartedJobRegistry('bar', connection=self.testconn) + self.testconn.zadd(bar_registry.key, 1, 'bar') + self.assertEqual(self.testconn.zcard(bar_registry.key), 1) + + worker = Worker([foo_queue, bar_queue]) + self.assertEqual(worker.maintenance_date, None) + worker.clean_registries() + self.assertNotEqual(worker.maintenance_date, None) + self.assertEqual(self.testconn.zcard(foo_registry.key), 0) + self.assertEqual(self.testconn.zcard(bar_registry.key), 0) From c3767e28e2ca43a6aad93e6947f00f205c431b5a Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 23 May 2015 10:08:04 +0700 Subject: [PATCH 3/3] Worker now runs maintenance tasks every hour and on startup. --- rq/worker.py | 15 ++++++++++++++- tests/test_worker.py | 23 +++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index a4cd529..adeb56e 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -2,6 +2,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +from datetime import timedelta import errno import logging import os @@ -403,6 +404,9 @@ class Worker(object): try: self.check_for_suspension(burst) + if self.should_run_maintenance_tasks: + self.clean_registries() + if self.stopped: self.log.info('Stopping on request.') break @@ -609,7 +613,7 @@ class Worker(object): 'arguments': job.args, 'kwargs': job.kwargs, 'queue': job.origin, - }) + }) for handler in reversed(self._exc_handlers): self.log.debug('Invoking exception handler %s' % (handler,)) @@ -653,6 +657,15 @@ class Worker(object): clean_registries(queue) self.maintenance_date = utcnow() + @property + def should_run_maintenance_tasks(self): + """Maintenance tasks should run on first startup or every hour.""" + if self.maintenance_date is None: + return True + if (utcnow() - self.maintenance_date) > timedelta(seconds=3600): + return True + return False + class SimpleWorker(Worker): def main_work_horse(self, *args, **kwargs): diff --git a/tests/test_worker.py b/tests/test_worker.py index 961e96b..3fc44bb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -3,6 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import os +from datetime import timedelta from time import sleep from tests import RQTestCase, slow @@ -15,6 +16,7 @@ from rq.compat import as_text from rq.job import Job, JobStatus from rq.registry import StartedJobRegistry from rq.suspension import resume, suspend +from rq.utils import utcnow class CustomJob(Job): @@ -420,3 +422,24 @@ class TestWorker(RQTestCase): self.assertNotEqual(worker.maintenance_date, None) self.assertEqual(self.testconn.zcard(foo_registry.key), 0) self.assertEqual(self.testconn.zcard(bar_registry.key), 0) + + def test_should_run_maintenance_tasks(self): + """Workers should run maintenance tasks on startup and every hour.""" + queue = Queue(connection=self.testconn) + worker = Worker(queue) + self.assertTrue(worker.should_run_maintenance_tasks) + + worker.maintenance_date = utcnow() + self.assertFalse(worker.should_run_maintenance_tasks) + worker.maintenance_date = utcnow() - timedelta(seconds=3700) + self.assertTrue(worker.should_run_maintenance_tasks) + + def test_worker_calls_clean_registries(self): + """Worker calls clean_registries when run.""" + queue = Queue(connection=self.testconn) + registry = StartedJobRegistry(connection=self.testconn) + self.testconn.zadd(registry.key, 1, 'foo') + + worker = Worker(queue, connection=self.testconn) + worker.work(burst=True) + self.assertEqual(self.testconn.zcard(registry.key), 0)