From 518db8c24b1e3edcea2c5a02ca58954710ded49f Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 14 Nov 2011 14:17:30 +0100 Subject: [PATCH] Add better connection management. To start using RQ, push a Redis connection up its stack, like so: from rq import push_connection push_connection(Redis()) --- rq/__init__.py | 9 ++++----- rq/conn.py | 18 ++++++++++++++++++ rq/queue.py | 6 ++++++ rq/worker.py | 10 +++++----- 4 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 rq/conn.py diff --git a/rq/__init__.py b/rq/__init__.py index c125891..5c67d36 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,6 +1,7 @@ import uuid from pickle import loads, dumps -from rdb import conn +from .conn import current_connection, push_connection, pop_connection +from .queue import Queue def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -13,7 +14,7 @@ class DelayedResult(object): @property def return_value(self): if self._rv is None: - rv = conn.get(self.key) + rv = current_connection().get(self.key) if rv is not None: # cache the result self._rv = loads(rv) @@ -31,10 +32,8 @@ class job(object): if f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed by workers.') s = dumps((f, key, args, kwargs)) - conn.rpush(queue_key, s) + current_connection().rpush(queue_key, s) return DelayedResult(key) f.delay = delay return f - - diff --git a/rq/conn.py b/rq/conn.py new file mode 100644 index 0000000..6ab9f51 --- /dev/null +++ b/rq/conn.py @@ -0,0 +1,18 @@ +from werkzeug.local import LocalStack + +class NoRedisConnectionException(Exception): + pass + +_conn = LocalStack() + +def push_connection(redis_conn): + _conn.push(redis_conn) + +def pop_connection(): + return _conn.pop() + +def current_connection(): + conn = _conn.top + if conn is None: + raise NoRedisConnectionException('Connect to Redis first.') + return conn diff --git a/rq/queue.py b/rq/queue.py index f5e015b..a35ae3e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,3 +1,5 @@ +from . import current_connection + def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -13,5 +15,9 @@ class Queue(object): def key(self): return self._key + @property + def empty(self): + return current_connection().llen(self.key) == 0 + def __str__(self): return self.name diff --git a/rq/worker.py b/rq/worker.py index 76636d1..99198ae 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -5,8 +5,8 @@ import time import procname from logbook import Logger from pickle import loads, dumps -from rdb import conn from .queue import Queue +from .conn import current_connection class NoQueueError(Exception): pass @@ -45,7 +45,7 @@ class Worker(object): def work(self): while True: self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - queue, msg = conn.blpop(self.queue_keys()) + queue, msg = current_connection().blpop(self.queue_keys()) self.fork_and_perform_job(queue, msg) def fork_and_perform_job(self, queue, msg): @@ -80,7 +80,7 @@ class Worker(object): else: self.log.info('Job ended normally without result') if rv is not None: - p = conn.pipeline() - conn.set(key, dumps(rv)) - conn.expire(key, self.rv_ttl) + p = current_connection().pipeline() + p.set(key, dumps(rv)) + p.expire(key, self.rv_ttl) p.execute()