diff --git a/CHANGES.md b/CHANGES.md index 4bf554a..104d503 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,9 +1,19 @@ -### 0.3.5 +### 0.3.6 (not yet released) +- ... + + +### 0.3.5 +(February 6th, 2013) + - `ended_at` is now recorded for normally finished jobs, too. (Previously only for failed jobs.) +- Adds support for both `Redis` and `StrictRedis` connection types + +- Makes `StrictRedis` the default connection type if none is explicitly provided + ### 0.3.4 (January 23rd, 2013) @@ -93,11 +103,11 @@ invocations: ```python - from redis import Redis + from redis import StrictRedis from rq.decorators import job # Connect to Redis - redis = Redis() + redis = StrictRedis() @job('high', timeout=10, connection=redis) def some_work(x, y): diff --git a/rq/compat/connections.py b/rq/compat/connections.py new file mode 100644 index 0000000..0374b5b --- /dev/null +++ b/rq/compat/connections.py @@ -0,0 +1,44 @@ +from redis import Redis, StrictRedis +from functools import partial + + +def fix_return_type(func): + # deliberately no functools.wraps() call here, since the function being + # wrapped is a partial, which has no module + def _inner(*args, **kwargs): + value = func(*args, **kwargs) + if value is None: + value = -1 + return value + return _inner + + +def patch_connection(connection): + if not isinstance(connection, StrictRedis): + raise ValueError('A StrictRedis or Redis connection is required.') + + # Don't patch already patches objects + PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] + if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): + return connection + + if isinstance(connection, Redis): + connection._setex = partial(StrictRedis.setex, connection) + connection._lrem = partial(StrictRedis.lrem, connection) + connection._zadd = partial(StrictRedis.zadd, connection) + connection._pipeline = partial(StrictRedis.pipeline, connection) + connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) + if hasattr(connection, 'pttl'): + connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) + elif isinstance(connection, StrictRedis): + connection._setex = connection.setex + connection._lrem = connection.lrem + connection._zadd = connection.zadd + connection._pipeline = connection.pipeline + connection._ttl = connection.ttl + if hasattr(connection, 'pttl'): + connection._pttl = connection.pttl + else: + raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) + + return connection diff --git a/rq/connections.py b/rq/connections.py index 05de2f1..f4a72e4 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,6 +1,7 @@ from contextlib import contextmanager -from redis import Redis +from redis import StrictRedis from .local import LocalStack, release_local +from .compat.connections import patch_connection class NoRedisConnectionException(Exception): @@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): if connection is None: - connection = Redis() + connection = StrictRedis() push_connection(connection) try: yield @@ -23,7 +24,7 @@ def Connection(connection=None): def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(redis) + _connection_stack.push(patch_connection(redis)) def pop_connection(): @@ -40,7 +41,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = Redis() + redis = StrictRedis() push_connection(redis) @@ -56,7 +57,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return connection + return patch_connection(connection) connection = get_current_connection() if connection is None: diff --git a/rq/exceptions.py b/rq/exceptions.py index 4d2cb6a..59c1429 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -11,6 +11,6 @@ class NoQueueError(Exception): class UnpickleError(Exception): - def __init__(self, message, raw_data): - super(UnpickleError, self).__init__(message) + def __init__(self, message, raw_data, inner_exception=None): + super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data diff --git a/rq/job.py b/rq/job.py index e94f512..2000806 100644 --- a/rq/job.py +++ b/rq/job.py @@ -27,8 +27,8 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except (StandardError, UnpicklingError): - raise UnpickleError('Could not unpickle.', pickled_string) + except (StandardError, UnpicklingError) as e: + raise UnpickleError('Could not unpickle.', pickled_string, e) return obj diff --git a/rq/queue.py b/rq/queue.py index 2103620..ea65559 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -302,11 +302,11 @@ class FailedQueue(Queue): job = Job.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) - self.connection.lrem(self.key, job_id) + self.connection._lrem(self.key, 0, job_id) return # Delete it from the failed queue (raise an error if that failed) - if self.connection.lrem(self.key, job.id) == 0: + if self.connection._lrem(self.key, 0, job.id) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') job.status = Status.QUEUED diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 94441ab..119f4f6 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,8 +48,8 @@ def setup_default_arguments(args, settings): def setup_redis(args): if args.url is not None: - redis_conn = redis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) else: - redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, + redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password) use_connection(redis_conn) diff --git a/rq/version.py b/rq/version.py index 8e18e5b..d4ab04a 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.5-dev' +VERSION = '0.3.6-dev' diff --git a/tests/__init__.py b/tests/__init__.py index cba1035..688c11f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)): else: import unittest2 as unittest # noqa -from redis import Redis +from redis import StrictRedis from rq import push_connection, pop_connection @@ -14,7 +14,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = Redis(db=dbnum) + testconn = StrictRedis(db=dbnum) empty = len(testconn.keys('*')) == 0 if empty: return testconn diff --git a/tests/test_worker.py b/tests/test_worker.py index e3e706e..b662b09 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -185,13 +185,13 @@ class TestWorker(RQTestCase): job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) w.work(burst=True) - self.assertNotEqual(self.testconn.ttl(job.key), 0) + self.assertNotEqual(self.testconn._ttl(job.key), 0) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) w.work(burst=True) - self.assertEqual(self.testconn.ttl(job.key), None) + self.assertEqual(self.testconn._ttl(job.key), -1) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)