Add better connection management.

To start using RQ, push a Redis connection up its stack, like so:

    from rq import push_connection
    push_connection(Redis())
main
Vincent Driessen 13 years ago
parent d8d388c841
commit 518db8c24b

@ -1,6 +1,7 @@
import uuid import uuid
from pickle import loads, dumps from pickle import loads, dumps
from rdb import conn from .conn import current_connection, push_connection, pop_connection
from .queue import Queue
def to_queue_key(queue_name): def to_queue_key(queue_name):
return 'rq:%s' % (queue_name,) return 'rq:%s' % (queue_name,)
@ -13,7 +14,7 @@ class DelayedResult(object):
@property @property
def return_value(self): def return_value(self):
if self._rv is None: if self._rv is None:
rv = conn.get(self.key) rv = current_connection().get(self.key)
if rv is not None: if rv is not None:
# cache the result # cache the result
self._rv = loads(rv) self._rv = loads(rv)
@ -31,10 +32,8 @@ class job(object):
if f.__module__ == '__main__': if f.__module__ == '__main__':
raise ValueError('Functions from the __main__ module cannot be processed by workers.') raise ValueError('Functions from the __main__ module cannot be processed by workers.')
s = dumps((f, key, args, kwargs)) s = dumps((f, key, args, kwargs))
conn.rpush(queue_key, s) current_connection().rpush(queue_key, s)
return DelayedResult(key) return DelayedResult(key)
f.delay = delay f.delay = delay
return f return f

@ -0,0 +1,18 @@
from werkzeug.local import LocalStack
class NoRedisConnectionException(Exception):
pass
_conn = LocalStack()
def push_connection(redis_conn):
_conn.push(redis_conn)
def pop_connection():
return _conn.pop()
def current_connection():
conn = _conn.top
if conn is None:
raise NoRedisConnectionException('Connect to Redis first.')
return conn

@ -1,3 +1,5 @@
from . import current_connection
def to_queue_key(queue_name): def to_queue_key(queue_name):
return 'rq:%s' % (queue_name,) return 'rq:%s' % (queue_name,)
@ -13,5 +15,9 @@ class Queue(object):
def key(self): def key(self):
return self._key return self._key
@property
def empty(self):
return current_connection().llen(self.key) == 0
def __str__(self): def __str__(self):
return self.name return self.name

@ -5,8 +5,8 @@ import time
import procname import procname
from logbook import Logger from logbook import Logger
from pickle import loads, dumps from pickle import loads, dumps
from rdb import conn
from .queue import Queue from .queue import Queue
from .conn import current_connection
class NoQueueError(Exception): pass class NoQueueError(Exception): pass
@ -45,7 +45,7 @@ class Worker(object):
def work(self): def work(self):
while True: while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
queue, msg = conn.blpop(self.queue_keys()) queue, msg = current_connection().blpop(self.queue_keys())
self.fork_and_perform_job(queue, msg) self.fork_and_perform_job(queue, msg)
def fork_and_perform_job(self, queue, msg): def fork_and_perform_job(self, queue, msg):
@ -80,7 +80,7 @@ class Worker(object):
else: else:
self.log.info('Job ended normally without result') self.log.info('Job ended normally without result')
if rv is not None: if rv is not None:
p = conn.pipeline() p = current_connection().pipeline()
conn.set(key, dumps(rv)) p.set(key, dumps(rv))
conn.expire(key, self.rv_ttl) p.expire(key, self.rv_ttl)
p.execute() p.execute()

Loading…
Cancel
Save