From a77c3d910428d43e734d7e5aa3c21d7993b7eaf8 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 15 Nov 2011 01:04:44 +0100 Subject: [PATCH] Support quitting when all work is done (i.e. queue is empty). --- rq/daemon.py | 4 ++-- rq/worker.py | 10 ++++++++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/rq/daemon.py b/rq/daemon.py index f891b63..d77cc0e 100644 --- a/rq/daemon.py +++ b/rq/daemon.py @@ -4,7 +4,7 @@ except ImportError: from logging import Logger from .worker import Worker -def run_daemon(queue_keys, rv_ttl=500): +def run_daemon(queue_keys, rv_ttl=500, quit_when_done=False): """Simple implementation of a Redis queue worker, based on http://flask.pocoo.org/snippets/73/ @@ -17,4 +17,4 @@ def run_daemon(queue_keys, rv_ttl=500): for key in queue_keys: log.info('- %s' % (key,)) - worker.work() + worker.work(quit_when_done) diff --git a/rq/worker.py b/rq/worker.py index 58709d5..f24548b 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -45,10 +45,16 @@ class Worker(object): self.log.debug(message) procname.setprocname('rq: %s' % (message,)) - def work(self): + def work(self, quit_when_done=False): while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - queue, msg = conn.blpop(self.queue_keys()) + if quit_when_done: + value = conn.lpop(self.queue_keys()) + if value is None: + break # No more work, so quitting + queue, msg = value + else: + queue, msg = conn.blpop(self.queue_keys()) self.fork_and_perform_job(queue, msg) def fork_and_perform_job(self, queue, msg):