From 4925b09aa5553cbac9ccee0d510ec935b88a9848 Mon Sep 17 00:00:00 2001 From: Yaniv Aknin Date: Sat, 26 Jan 2013 18:58:07 +0200 Subject: [PATCH] Set worker ttl and maintain it when idle/taking/finishing jobs This change could use far better test coverage, but I'm not sure how to test it without refactoring more of the code than I think is reasonable in the scope of this work. --- rq/worker.py | 28 ++++++++++++++++++++++------ tests/test_worker.py | 9 +++++++++ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 60273f2..7f3cd0a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -19,7 +19,7 @@ from .connections import get_current_connection from .job import Job, Status from .utils import make_colorizer from .logutils import setup_loghandlers -from .exceptions import NoQueueError, UnpickleError +from .exceptions import NoQueueError, UnpickleError, DequeueTimeout from .timeouts import death_penalty_after from .version import VERSION @@ -27,6 +27,7 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') +DEFAULT_WORKER_TTL = 420 DEFAULT_RESULT_TTL = 500 logger = logging.getLogger(__name__) @@ -85,6 +86,7 @@ class Worker(object): if connection is None: connection = get_current_connection() if not connection.exists(worker_key): + connection.srem(cls.redis_workers_keys, worker_key) return None name = worker_key[len(prefix):] @@ -97,8 +99,9 @@ class Worker(object): return worker - def __init__(self, queues, name=None, default_result_ttl=DEFAULT_RESULT_TTL, - connection=None, exc_handler=None): # noqa + def __init__(self, queues, name=None, + default_result_ttl=DEFAULT_RESULT_TTL, connection=None, + exc_handler=None, default_worker_ttl=DEFAULT_WORKER_TTL): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -109,6 +112,7 @@ class Worker(object): self.validate_queues() self._exc_handlers = [] self.default_result_ttl = default_result_ttl + self.default_worker_ttl = default_worker_ttl self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -200,6 +204,7 @@ class Worker(object): p.hset(key, 'birth', now) p.hset(key, 'queues', queues) p.sadd(self.redis_workers_keys, key) + p.expire(key, self.default_worker_ttl) p.execute() def register_death(self): @@ -301,10 +306,9 @@ class Worker(object): self.log.info('') self.log.info('*** Listening on %s...' % \ green(', '.join(qnames))) - timeout = None if burst else 0 + timeout = None if burst else self.default_worker_ttl - 60 try: - result = Queue.dequeue_any(self.queues, timeout, \ - connection=self.connection) + result = self.dequeue_job_and_maintain_ttl(timeout) if result is None: break except StopRequested: @@ -322,7 +326,9 @@ class Worker(object): self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) + self.connection.expire(self.key, (job.timeout or 180) + 60) self.fork_and_perform_job(job) + self.connection.expire(self.key, self.default_worker_ttl) did_perform_work = True finally: @@ -330,6 +336,16 @@ class Worker(object): self.register_death() return did_perform_work + + def dequeue_job_and_maintain_ttl(self, timeout): + while True: + try: + return Queue.dequeue_any(self.queues, timeout, + connection=self.connection) + except DequeueTimeout: + self.connection.expire(self.key, self.default_worker_ttl) + + def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes diff --git a/tests/test_worker.py b/tests/test_worker.py index e3e706e..80a6ede 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,4 +1,5 @@ import os +from time import sleep from tests import RQTestCase, slow from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ create_file_after_timeout @@ -25,6 +26,14 @@ class TestWorker(RQTestCase): self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') + def test_worker_ttl(self): + """Worker ttl.""" + w = Worker([]) + w.register_birth() # ugly: our test should only call public APIs + [worker_key] = self.testconn.smembers(Worker.redis_workers_keys) + self.assertIsNotNone(self.testconn.ttl(worker_key)) + w.register_death() + def test_work_via_string_argument(self): """Worker processes work fed via string arguments.""" q = Queue('foo')