Merge branch 'master' of github.com:selwin/rq

main
Selwin Ong 12 years ago
commit e5fa82aa5d

@ -1,9 +1,19 @@
### 0.3.5 ### 0.3.6
(not yet released) (not yet released)
- ...
### 0.3.5
(February 6th, 2013)
- `ended_at` is now recorded for normally finished jobs, too. (Previously only - `ended_at` is now recorded for normally finished jobs, too. (Previously only
for failed jobs.) for failed jobs.)
- Adds support for both `Redis` and `StrictRedis` connection types
- Makes `StrictRedis` the default connection type if none is explicitly provided
### 0.3.4 ### 0.3.4
(January 23rd, 2013) (January 23rd, 2013)
@ -93,11 +103,11 @@
invocations: invocations:
```python ```python
from redis import Redis from redis import StrictRedis
from rq.decorators import job from rq.decorators import job
# Connect to Redis # Connect to Redis
redis = Redis() redis = StrictRedis()
@job('high', timeout=10, connection=redis) @job('high', timeout=10, connection=redis)
def some_work(x, y): def some_work(x, y):

@ -0,0 +1,44 @@
from redis import Redis, StrictRedis
from functools import partial
def fix_return_type(func):
# deliberately no functools.wraps() call here, since the function being
# wrapped is a partial, which has no module
def _inner(*args, **kwargs):
value = func(*args, **kwargs)
if value is None:
value = -1
return value
return _inner
def patch_connection(connection):
if not isinstance(connection, StrictRedis):
raise ValueError('A StrictRedis or Redis connection is required.')
# Don't patch already patches objects
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
return connection
if isinstance(connection, Redis):
connection._setex = partial(StrictRedis.setex, connection)
connection._lrem = partial(StrictRedis.lrem, connection)
connection._zadd = partial(StrictRedis.zadd, connection)
connection._pipeline = partial(StrictRedis.pipeline, connection)
connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
if hasattr(connection, 'pttl'):
connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
elif isinstance(connection, StrictRedis):
connection._setex = connection.setex
connection._lrem = connection.lrem
connection._zadd = connection.zadd
connection._pipeline = connection.pipeline
connection._ttl = connection.ttl
if hasattr(connection, 'pttl'):
connection._pttl = connection.pttl
else:
raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
return connection

@ -1,6 +1,7 @@
from contextlib import contextmanager from contextlib import contextmanager
from redis import Redis from redis import StrictRedis
from .local import LocalStack, release_local from .local import LocalStack, release_local
from .compat.connections import patch_connection
class NoRedisConnectionException(Exception): class NoRedisConnectionException(Exception):
@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception):
@contextmanager @contextmanager
def Connection(connection=None): def Connection(connection=None):
if connection is None: if connection is None:
connection = Redis() connection = StrictRedis()
push_connection(connection) push_connection(connection)
try: try:
yield yield
@ -23,7 +24,7 @@ def Connection(connection=None):
def push_connection(redis): def push_connection(redis):
"""Pushes the given connection on the stack.""" """Pushes the given connection on the stack."""
_connection_stack.push(redis) _connection_stack.push(patch_connection(redis))
def pop_connection(): def pop_connection():
@ -40,7 +41,7 @@ def use_connection(redis=None):
release_local(_connection_stack) release_local(_connection_stack)
if redis is None: if redis is None:
redis = Redis() redis = StrictRedis()
push_connection(redis) push_connection(redis)
@ -56,7 +57,7 @@ def resolve_connection(connection=None):
Raises an exception if it cannot resolve a connection now. Raises an exception if it cannot resolve a connection now.
""" """
if connection is not None: if connection is not None:
return connection return patch_connection(connection)
connection = get_current_connection() connection = get_current_connection()
if connection is None: if connection is None:

@ -11,6 +11,6 @@ class NoQueueError(Exception):
class UnpickleError(Exception): class UnpickleError(Exception):
def __init__(self, message, raw_data): def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message) super(UnpickleError, self).__init__(message, inner_exception)
self.raw_data = raw_data self.raw_data = raw_data

@ -27,8 +27,8 @@ def unpickle(pickled_string):
""" """
try: try:
obj = loads(pickled_string) obj = loads(pickled_string)
except (StandardError, UnpicklingError): except (StandardError, UnpicklingError) as e:
raise UnpickleError('Could not unpickle.', pickled_string) raise UnpickleError('Could not unpickle.', pickled_string, e)
return obj return obj

@ -302,11 +302,11 @@ class FailedQueue(Queue):
job = Job.fetch(job_id, connection=self.connection) job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing) # Silently ignore/remove this job and return (i.e. do nothing)
self.connection.lrem(self.key, job_id) self.connection._lrem(self.key, 0, job_id)
return return
# Delete it from the failed queue (raise an error if that failed) # Delete it from the failed queue (raise an error if that failed)
if self.connection.lrem(self.key, job.id) == 0: if self.connection._lrem(self.key, 0, job.id) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.') raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.status = Status.QUEUED job.status = Status.QUEUED

@ -48,8 +48,8 @@ def setup_default_arguments(args, settings):
def setup_redis(args): def setup_redis(args):
if args.url is not None: if args.url is not None:
redis_conn = redis.from_url(args.url, db=args.db) redis_conn = redis.StrictRedis.from_url(args.url, db=args.db)
else: else:
redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
password=args.password) password=args.password)
use_connection(redis_conn) use_connection(redis_conn)

@ -1 +1 @@
VERSION = '0.3.5-dev' VERSION = '0.3.6-dev'

@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)):
else: else:
import unittest2 as unittest # noqa import unittest2 as unittest # noqa
from redis import Redis from redis import StrictRedis
from rq import push_connection, pop_connection from rq import push_connection, pop_connection
@ -14,7 +14,7 @@ def find_empty_redis_database():
will use/connect it when no keys are in there. will use/connect it when no keys are in there.
""" """
for dbnum in range(4, 17): for dbnum in range(4, 17):
testconn = Redis(db=dbnum) testconn = StrictRedis(db=dbnum)
empty = len(testconn.keys('*')) == 0 empty = len(testconn.keys('*')) == 0
if empty: if empty:
return testconn return testconn

@ -185,13 +185,13 @@ class TestWorker(RQTestCase):
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertNotEqual(self.testconn.ttl(job.key), 0) self.assertNotEqual(self.testconn._ttl(job.key), 0)
# Job with -1 result_ttl don't expire # Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None) self.assertEqual(self.testconn._ttl(job.key), -1)
# Job with result_ttl = 0 gets deleted immediately # Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)

Loading…
Cancel
Save