From d721f0708bd0995daaaf229bc40ac9ad306e3b2f Mon Sep 17 00:00:00 2001 From: Vincent Driessen <vincent@3rdcloud.com> Date: Mon, 14 Nov 2011 21:25:52 +0100 Subject: [PATCH] Refactor the whole Redis connection stuff to be just as easy as in RDB. --- rq/__init__.py | 40 +++------------------------------------- rq/conn.py | 18 ------------------ rq/job.py | 36 ++++++++++++++++++++++++++++++++++++ rq/proxy.py | 30 ++++++++++++++++++++++++++++++ rq/queue.py | 4 ++-- tests/test_rq.py | 26 +++++++++++++++++--------- 6 files changed, 88 insertions(+), 66 deletions(-) create mode 100644 rq/job.py create mode 100644 rq/proxy.py diff --git a/rq/__init__.py b/rq/__init__.py index 5c67d36..9ed8d19 100644 --- a/rq/__init__.py +++ b/rq/__init__.py @@ -1,39 +1,5 @@ -import uuid -from pickle import loads, dumps -from .conn import current_connection, push_connection, pop_connection +from .proxy import conn from .queue import Queue +from .job import job -def to_queue_key(queue_name): - return 'rq:%s' % (queue_name,) - -class DelayedResult(object): - def __init__(self, key): - self.key = key - self._rv = None - - @property - def return_value(self): - if self._rv is None: - rv = current_connection().get(self.key) - if rv is not None: - # cache the result - self._rv = loads(rv) - return self._rv - - -class job(object): - def __init__(self, queue='normal'): - self.queue = queue - - def __call__(self, f): - def delay(*args, **kwargs): - queue_key = to_queue_key(self.queue) - key = '%s:result:%s' % (queue_key, str(uuid.uuid4())) - if f.__module__ == '__main__': - raise ValueError('Functions from the __main__ module cannot be processed by workers.') - s = dumps((f, key, args, kwargs)) - current_connection().rpush(queue_key, s) - return DelayedResult(key) - f.delay = delay - return f - +__all__ = ['conn', 'Queue', 'job'] diff --git a/rq/conn.py b/rq/conn.py index 6ab9f51..e69de29 100644 --- a/rq/conn.py +++ b/rq/conn.py @@ -1,18 +0,0 @@ -from werkzeug.local import LocalStack - -class NoRedisConnectionException(Exception): - pass - -_conn = LocalStack() - -def push_connection(redis_conn): - _conn.push(redis_conn) - -def pop_connection(): - return _conn.pop() - -def current_connection(): - conn = _conn.top - if conn is None: - raise NoRedisConnectionException('Connect to Redis first.') - return conn diff --git a/rq/job.py b/rq/job.py new file mode 100644 index 0000000..615e174 --- /dev/null +++ b/rq/job.py @@ -0,0 +1,36 @@ +import uuid +from pickle import loads, dumps +from .proxy import conn +from .queue import Queue + + +class DelayedResult(object): + def __init__(self, key): + self.key = key + self._rv = None + + @property + def return_value(self): + if self._rv is None: + rv = conn.get(self.key) + if rv is not None: + # cache the result + self._rv = loads(rv) + return self._rv + + +class job(object): + def __init__(self, queue_name='default'): + self.queue = Queue(queue_name) + + def __call__(self, f): + def delay(*args, **kwargs): + rv_key = '%s:result:%s' % (self.queue.key, str(uuid.uuid4())) + if f.__module__ == '__main__': + raise ValueError('Functions from the __main__ module cannot be processed by workers.') + s = dumps((f, rv_key, args, kwargs)) + conn.rpush(self.queue.key, s) + return DelayedResult(rv_key) + f.delay = delay + return f + diff --git a/rq/proxy.py b/rq/proxy.py new file mode 100644 index 0000000..6e6bdfc --- /dev/null +++ b/rq/proxy.py @@ -0,0 +1,30 @@ +import redis + +class NoRedisConnectionException(Exception): + pass + + +class RedisConnectionProxy(object): + def __init__(self): + self.stack = [] + + def _get_current_object(self): + try: + return self.stack[-1] + except IndexError: + msg = 'No Redis connection configured.' + raise NoRedisConnectionException(msg) + + def pop(self): + return self.stack.pop() + + def push(self, db): + self.stack.append(db) + + def __getattr__(self, name): + return getattr(self._get_current_object(), name) + + +conn = RedisConnectionProxy() + +__all__ = ['conn'] diff --git a/rq/queue.py b/rq/queue.py index a35ae3e..2ca5f53 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -1,4 +1,4 @@ -from . import current_connection +from .proxy import conn def to_queue_key(queue_name): return 'rq:%s' % (queue_name,) @@ -17,7 +17,7 @@ class Queue(object): @property def empty(self): - return current_connection().llen(self.key) == 0 + return conn.llen(self.key) == 0 def __str__(self): return self.name diff --git a/tests/test_rq.py b/tests/test_rq.py index d962be4..26c46cb 100644 --- a/tests/test_rq.py +++ b/tests/test_rq.py @@ -1,25 +1,33 @@ import unittest from blinker import signal from redis import Redis -import rq -from rq import Queue +from rq import conn, Queue + class RQTestCase(unittest.TestCase): def setUp(self): super(RQTestCase, self).setUp() - rq.push_connection(Redis()) - self.conn = rq.current_connection() + # Set up connection to Redis + testconn = Redis() + conn.push(testconn) - self.conn.flushdb() + # Flush beforewards (we like our hygiene) + conn.flushdb() signal('setup').send(self) + # Store the connection (for sanity checking) + self.testconn = testconn + def tearDown(self): signal('teardown').send(self) - self.conn.flushdb() - conn = rq.pop_connection() - assert conn == self.conn + # Flush afterwards + conn.flushdb() + + # Pop the connection to Redis + testconn = conn.pop() + assert testconn == self.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' super(RQTestCase, self).tearDown() @@ -35,7 +43,7 @@ class TestQueue(RQTestCase): q = Queue('my-queue') self.assertEquals(q.empty, True) - self.conn.rpush('rq:my-queue', 'some val') + conn.rpush('rq:my-queue', 'some val') self.assertEquals(q.empty, False)