From c337afde3a7ad77f8598925fdd04027372230dc6 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Sun, 1 Apr 2012 19:50:44 +0200 Subject: [PATCH] Make the connection stack thread safe. For this, I've included Werkzeug's context locals, which offers both thread and greenlet safe local variables. This fixes #47. --- rq/connections.py | 41 +---- rq/local.py | 371 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 378 insertions(+), 34 deletions(-) create mode 100644 rq/local.py diff --git a/rq/connections.py b/rq/connections.py index c714a35..eb5d197 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,41 +1,12 @@ from contextlib import contextmanager from redis import Redis +from .local import LocalStack, release_local class NoRedisConnectionException(Exception): pass -class _RedisConnectionStack(object): - def __init__(self): - self.stack = [] - - def _get_current_object(self): - try: - return self.stack[-1] - except IndexError: - msg = 'No Redis connection configured.' - raise NoRedisConnectionException(msg) - - def pop(self): - return self.stack.pop() - - def push(self, connection): - self.stack.append(connection) - - def empty(self): - del self.stack[:] - - def depth(self): - return len(self.stack) - - def __getattr__(self, name): - return getattr(self._get_current_object(), name) - - -_connection_stack = _RedisConnectionStack() - - @contextmanager def Connection(connection=None): if connection is None: @@ -64,21 +35,23 @@ def use_connection(redis=None): """Clears the stack and uses the given connection. Protects against mixed use of use_connection() and stacked connection contexts. """ - assert _connection_stack.depth() <= 1, \ + assert len(_connection_stack) <= 1, \ 'You should not mix Connection contexts with use_connection().' - _connection_stack.empty() + release_local(_connection_stack) if redis is None: redis = Redis() - push_connection(redis) + _connection_stack.push(redis) def get_current_connection(): """Returns the current Redis connection (i.e. the topmost on the connection stack). """ - return _connection_stack._get_current_object() + return _connection_stack.top + +_connection_stack = LocalStack() __all__ = ['Connection', 'get_current_connection', 'push_connection', 'pop_connection', diff --git a/rq/local.py b/rq/local.py new file mode 100644 index 0000000..a461a9f --- /dev/null +++ b/rq/local.py @@ -0,0 +1,371 @@ +# -*- coding: utf-8 -*- +""" + werkzeug.local + ~~~~~~~~~~~~~~ + + This module implements context-local objects. + + :copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details. + :license: BSD, see LICENSE for more details. +""" +# Since each thread has its own greenlet we can just use those as identifiers +# for the context. If greenlets are not available we fall back to the +# current thread ident. +try: + from greenlet import getcurrent as get_ident +except ImportError: # noqa + try: + from thread import get_ident # noqa + except ImportError: # noqa + from dummy_thread import get_ident # noqa + + +def release_local(local): + """Releases the contents of the local for the current context. + This makes it possible to use locals without a manager. + + Example:: + + >>> loc = Local() + >>> loc.foo = 42 + >>> release_local(loc) + >>> hasattr(loc, 'foo') + False + + With this function one can release :class:`Local` objects as well + as :class:`StackLocal` objects. However it is not possible to + release data held by proxies that way, one always has to retain + a reference to the underlying local object in order to be able + to release it. + + .. versionadded:: 0.6.1 + """ + local.__release_local__() + + +class Local(object): + __slots__ = ('__storage__', '__ident_func__') + + def __init__(self): + object.__setattr__(self, '__storage__', {}) + object.__setattr__(self, '__ident_func__', get_ident) + + def __iter__(self): + return iter(self.__storage__.items()) + + def __call__(self, proxy): + """Create a proxy for a name.""" + return LocalProxy(self, proxy) + + def __release_local__(self): + self.__storage__.pop(self.__ident_func__(), None) + + def __getattr__(self, name): + try: + return self.__storage__[self.__ident_func__()][name] + except KeyError: + raise AttributeError(name) + + def __setattr__(self, name, value): + ident = self.__ident_func__() + storage = self.__storage__ + try: + storage[ident][name] = value + except KeyError: + storage[ident] = {name: value} + + def __delattr__(self, name): + try: + del self.__storage__[self.__ident_func__()][name] + except KeyError: + raise AttributeError(name) + + +class LocalStack(object): + """This class works similar to a :class:`Local` but keeps a stack + of objects instead. This is best explained with an example:: + + >>> ls = LocalStack() + >>> ls.push(42) + >>> ls.top + 42 + >>> ls.push(23) + >>> ls.top + 23 + >>> ls.pop() + 23 + >>> ls.top + 42 + + They can be force released by using a :class:`LocalManager` or with + the :func:`release_local` function but the correct way is to pop the + item from the stack after using. When the stack is empty it will + no longer be bound to the current context (and as such released). + + By calling the stack without arguments it returns a proxy that resolves to + the topmost item on the stack. + + .. versionadded:: 0.6.1 + """ + + def __init__(self): + self._local = Local() + + def __release_local__(self): + self._local.__release_local__() + + def _get__ident_func__(self): + return self._local.__ident_func__ + def _set__ident_func__(self, value): # noqa + object.__setattr__(self._local, '__ident_func__', value) + __ident_func__ = property(_get__ident_func__, _set__ident_func__) + del _get__ident_func__, _set__ident_func__ + + def __call__(self): + def _lookup(): + rv = self.top + if rv is None: + raise RuntimeError('object unbound') + return rv + return LocalProxy(_lookup) + + def push(self, obj): + """Pushes a new item to the stack""" + rv = getattr(self._local, 'stack', None) + if rv is None: + self._local.stack = rv = [] + rv.append(obj) + return rv + + def pop(self): + """Removes the topmost item from the stack, will return the + old value or `None` if the stack was already empty. + """ + stack = getattr(self._local, 'stack', None) + if stack is None: + return None + elif len(stack) == 1: + release_local(self._local) + return stack[-1] + else: + return stack.pop() + + @property + def top(self): + """The topmost item on the stack. If the stack is empty, + `None` is returned. + """ + try: + return self._local.stack[-1] + except (AttributeError, IndexError): + return None + + def __len__(self): + return len(self._local.stack) + + +class LocalManager(object): + """Local objects cannot manage themselves. For that you need a local + manager. You can pass a local manager multiple locals or add them later + by appending them to `manager.locals`. Everytime the manager cleans up + it, will clean up all the data left in the locals for this context. + + The `ident_func` parameter can be added to override the default ident + function for the wrapped locals. + + .. versionchanged:: 0.6.1 + Instead of a manager the :func:`release_local` function can be used + as well. + + .. versionchanged:: 0.7 + `ident_func` was added. + """ + + def __init__(self, locals=None, ident_func=None): + if locals is None: + self.locals = [] + elif isinstance(locals, Local): + self.locals = [locals] + else: + self.locals = list(locals) + if ident_func is not None: + self.ident_func = ident_func + for local in self.locals: + object.__setattr__(local, '__ident_func__', ident_func) + else: + self.ident_func = get_ident + + def get_ident(self): + """Return the context identifier the local objects use internally for + this context. You cannot override this method to change the behavior + but use it to link other context local objects (such as SQLAlchemy's + scoped sessions) to the Werkzeug locals. + + .. versionchanged:: 0.7 + Yu can pass a different ident function to the local manager that + will then be propagated to all the locals passed to the + constructor. + """ + return self.ident_func() + + def cleanup(self): + """Manually clean up the data in the locals for this context. Call + this at the end of the request or use `make_middleware()`. + """ + for local in self.locals: + release_local(local) + + def __repr__(self): + return '<%s storages: %d>' % ( + self.__class__.__name__, + len(self.locals) + ) + + +class LocalProxy(object): + """Acts as a proxy for a werkzeug local. Forwards all operations to + a proxied object. The only operations not supported for forwarding + are right handed operands and any kind of assignment. + + Example usage:: + + from werkzeug.local import Local + l = Local() + + # these are proxies + request = l('request') + user = l('user') + + + from werkzeug.local import LocalStack + _response_local = LocalStack() + + # this is a proxy + response = _response_local() + + Whenever something is bound to l.user / l.request the proxy objects + will forward all operations. If no object is bound a :exc:`RuntimeError` + will be raised. + + To create proxies to :class:`Local` or :class:`LocalStack` objects, + call the object as shown above. If you want to have a proxy to an + object looked up by a function, you can (as of Werkzeug 0.6.1) pass + a function to the :class:`LocalProxy` constructor:: + + session = LocalProxy(lambda: get_current_request().session) + + .. versionchanged:: 0.6.1 + The class can be instanciated with a callable as well now. + """ + __slots__ = ('__local', '__dict__', '__name__') + + def __init__(self, local, name=None): + object.__setattr__(self, '_LocalProxy__local', local) + object.__setattr__(self, '__name__', name) + + def _get_current_object(self): + """Return the current object. This is useful if you want the real + object behind the proxy at a time for performance reasons or because + you want to pass the object into a different context. + """ + if not hasattr(self.__local, '__release_local__'): + return self.__local() + try: + return getattr(self.__local, self.__name__) + except AttributeError: + raise RuntimeError('no object bound to %s' % self.__name__) + + @property + def __dict__(self): + try: + return self._get_current_object().__dict__ + except RuntimeError: + raise AttributeError('__dict__') + + def __repr__(self): + try: + obj = self._get_current_object() + except RuntimeError: + return '<%s unbound>' % self.__class__.__name__ + return repr(obj) + + def __nonzero__(self): + try: + return bool(self._get_current_object()) + except RuntimeError: + return False + + def __unicode__(self): + try: + return unicode(self._get_current_object()) + except RuntimeError: + return repr(self) + + def __dir__(self): + try: + return dir(self._get_current_object()) + except RuntimeError: + return [] + + def __getattr__(self, name): + if name == '__members__': + return dir(self._get_current_object()) + return getattr(self._get_current_object(), name) + + def __setitem__(self, key, value): + self._get_current_object()[key] = value + + def __delitem__(self, key): + del self._get_current_object()[key] + + def __setslice__(self, i, j, seq): + self._get_current_object()[i:j] = seq + + def __delslice__(self, i, j): + del self._get_current_object()[i:j] + + __setattr__ = lambda x, n, v: setattr(x._get_current_object(), n, v) + __delattr__ = lambda x, n: delattr(x._get_current_object(), n) + __str__ = lambda x: str(x._get_current_object()) + __lt__ = lambda x, o: x._get_current_object() < o + __le__ = lambda x, o: x._get_current_object() <= o + __eq__ = lambda x, o: x._get_current_object() == o + __ne__ = lambda x, o: x._get_current_object() != o + __gt__ = lambda x, o: x._get_current_object() > o + __ge__ = lambda x, o: x._get_current_object() >= o + __cmp__ = lambda x, o: cmp(x._get_current_object(), o) + __hash__ = lambda x: hash(x._get_current_object()) + __call__ = lambda x, *a, **kw: x._get_current_object()(*a, **kw) + __len__ = lambda x: len(x._get_current_object()) + __getitem__ = lambda x, i: x._get_current_object()[i] + __iter__ = lambda x: iter(x._get_current_object()) + __contains__ = lambda x, i: i in x._get_current_object() + __getslice__ = lambda x, i, j: x._get_current_object()[i:j] + __add__ = lambda x, o: x._get_current_object() + o + __sub__ = lambda x, o: x._get_current_object() - o + __mul__ = lambda x, o: x._get_current_object() * o + __floordiv__ = lambda x, o: x._get_current_object() // o + __mod__ = lambda x, o: x._get_current_object() % o + __divmod__ = lambda x, o: x._get_current_object().__divmod__(o) + __pow__ = lambda x, o: x._get_current_object() ** o + __lshift__ = lambda x, o: x._get_current_object() << o + __rshift__ = lambda x, o: x._get_current_object() >> o + __and__ = lambda x, o: x._get_current_object() & o + __xor__ = lambda x, o: x._get_current_object() ^ o + __or__ = lambda x, o: x._get_current_object() | o + __div__ = lambda x, o: x._get_current_object().__div__(o) + __truediv__ = lambda x, o: x._get_current_object().__truediv__(o) + __neg__ = lambda x: -(x._get_current_object()) + __pos__ = lambda x: +(x._get_current_object()) + __abs__ = lambda x: abs(x._get_current_object()) + __invert__ = lambda x: ~(x._get_current_object()) + __complex__ = lambda x: complex(x._get_current_object()) + __int__ = lambda x: int(x._get_current_object()) + __long__ = lambda x: long(x._get_current_object()) + __float__ = lambda x: float(x._get_current_object()) + __oct__ = lambda x: oct(x._get_current_object()) + __hex__ = lambda x: hex(x._get_current_object()) + __index__ = lambda x: x._get_current_object().__index__() + __coerce__ = lambda x, o: x._get_current_object().__coerce__(x, o) + __enter__ = lambda x: x._get_current_object().__enter__() + __exit__ = lambda x, *a, **kw: x._get_current_object().__exit__(*a, **kw)