Refactor the whole Redis connection stuff to be just as easy as in RDB.

main
Vincent Driessen 14 years ago
parent 5c4163400e
commit d721f0708b

@ -1,39 +1,5 @@
import uuid from .proxy import conn
from pickle import loads, dumps
from .conn import current_connection, push_connection, pop_connection
from .queue import Queue from .queue import Queue
from .job import job
def to_queue_key(queue_name): __all__ = ['conn', 'Queue', 'job']
return 'rq:%s' % (queue_name,)
class DelayedResult(object):
def __init__(self, key):
self.key = key
self._rv = None
@property
def return_value(self):
if self._rv is None:
rv = current_connection().get(self.key)
if rv is not None:
# cache the result
self._rv = loads(rv)
return self._rv
class job(object):
def __init__(self, queue='normal'):
self.queue = queue
def __call__(self, f):
def delay(*args, **kwargs):
queue_key = to_queue_key(self.queue)
key = '%s:result:%s' % (queue_key, str(uuid.uuid4()))
if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
s = dumps((f, key, args, kwargs))
current_connection().rpush(queue_key, s)
return DelayedResult(key)
f.delay = delay
return f

@ -1,18 +0,0 @@
from werkzeug.local import LocalStack
class NoRedisConnectionException(Exception):
pass
_conn = LocalStack()
def push_connection(redis_conn):
_conn.push(redis_conn)
def pop_connection():
return _conn.pop()
def current_connection():
conn = _conn.top
if conn is None:
raise NoRedisConnectionException('Connect to Redis first.')
return conn

@ -0,0 +1,36 @@
import uuid
from pickle import loads, dumps
from .proxy import conn
from .queue import Queue
class DelayedResult(object):
def __init__(self, key):
self.key = key
self._rv = None
@property
def return_value(self):
if self._rv is None:
rv = conn.get(self.key)
if rv is not None:
# cache the result
self._rv = loads(rv)
return self._rv
class job(object):
def __init__(self, queue_name='default'):
self.queue = Queue(queue_name)
def __call__(self, f):
def delay(*args, **kwargs):
rv_key = '%s:result:%s' % (self.queue.key, str(uuid.uuid4()))
if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.')
s = dumps((f, rv_key, args, kwargs))
conn.rpush(self.queue.key, s)
return DelayedResult(rv_key)
f.delay = delay
return f

@ -0,0 +1,30 @@
import redis
class NoRedisConnectionException(Exception):
pass
class RedisConnectionProxy(object):
def __init__(self):
self.stack = []
def _get_current_object(self):
try:
return self.stack[-1]
except IndexError:
msg = 'No Redis connection configured.'
raise NoRedisConnectionException(msg)
def pop(self):
return self.stack.pop()
def push(self, db):
self.stack.append(db)
def __getattr__(self, name):
return getattr(self._get_current_object(), name)
conn = RedisConnectionProxy()
__all__ = ['conn']

@ -1,4 +1,4 @@
from . import current_connection from .proxy import conn
def to_queue_key(queue_name): def to_queue_key(queue_name):
return 'rq:%s' % (queue_name,) return 'rq:%s' % (queue_name,)
@ -17,7 +17,7 @@ class Queue(object):
@property @property
def empty(self): def empty(self):
return current_connection().llen(self.key) == 0 return conn.llen(self.key) == 0
def __str__(self): def __str__(self):
return self.name return self.name

@ -1,25 +1,33 @@
import unittest import unittest
from blinker import signal from blinker import signal
from redis import Redis from redis import Redis
import rq from rq import conn, Queue
from rq import Queue
class RQTestCase(unittest.TestCase): class RQTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
super(RQTestCase, self).setUp() super(RQTestCase, self).setUp()
rq.push_connection(Redis()) # Set up connection to Redis
self.conn = rq.current_connection() testconn = Redis()
conn.push(testconn)
self.conn.flushdb() # Flush beforewards (we like our hygiene)
conn.flushdb()
signal('setup').send(self) signal('setup').send(self)
# Store the connection (for sanity checking)
self.testconn = testconn
def tearDown(self): def tearDown(self):
signal('teardown').send(self) signal('teardown').send(self)
self.conn.flushdb() # Flush afterwards
conn = rq.pop_connection() conn.flushdb()
assert conn == self.conn
# Pop the connection to Redis
testconn = conn.pop()
assert testconn == self.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
super(RQTestCase, self).tearDown() super(RQTestCase, self).tearDown()
@ -35,7 +43,7 @@ class TestQueue(RQTestCase):
q = Queue('my-queue') q = Queue('my-queue')
self.assertEquals(q.empty, True) self.assertEquals(q.empty, True)
self.conn.rpush('rq:my-queue', 'some val') conn.rpush('rq:my-queue', 'some val')
self.assertEquals(q.empty, False) self.assertEquals(q.empty, False)

Loading…
Cancel
Save