Merge branch 'master' into yaniv-aknin-worker_ttl

main
Vincent Driessen 12 years ago
commit 640195d5e4

@ -1,9 +1,19 @@
### 0.3.5 ### 0.3.6
(not yet released) (not yet released)
- ...
### 0.3.5
(February 6th, 2013)
- `ended_at` is now recorded for normally finished jobs, too. (Previously only - `ended_at` is now recorded for normally finished jobs, too. (Previously only
for failed jobs.) for failed jobs.)
- Adds support for both `Redis` and `StrictRedis` connection types
- Makes `StrictRedis` the default connection type if none is explicitly provided
### 0.3.4 ### 0.3.4
(January 23rd, 2013) (January 23rd, 2013)
@ -93,11 +103,11 @@
invocations: invocations:
```python ```python
from redis import Redis from redis import StrictRedis
from rq.decorators import job from rq.decorators import job
# Connect to Redis # Connect to Redis
redis = Redis() redis = StrictRedis()
@job('high', timeout=10, connection=redis) @job('high', timeout=10, connection=redis)
def some_work(x, y): def some_work(x, y):

@ -0,0 +1,44 @@
from redis import Redis, StrictRedis
from functools import partial
def fix_return_type(func):
# deliberately no functools.wraps() call here, since the function being
# wrapped is a partial, which has no module
def _inner(*args, **kwargs):
value = func(*args, **kwargs)
if value is None:
value = -1
return value
return _inner
def patch_connection(connection):
if not isinstance(connection, StrictRedis):
raise ValueError('A StrictRedis or Redis connection is required.')
# Don't patch already patches objects
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
return connection
if isinstance(connection, Redis):
connection._setex = partial(StrictRedis.setex, connection)
connection._lrem = partial(StrictRedis.lrem, connection)
connection._zadd = partial(StrictRedis.zadd, connection)
connection._pipeline = partial(StrictRedis.pipeline, connection)
connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
if hasattr(connection, 'pttl'):
connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
elif isinstance(connection, StrictRedis):
connection._setex = connection.setex
connection._lrem = connection.lrem
connection._zadd = connection.zadd
connection._pipeline = connection.pipeline
connection._ttl = connection.ttl
if hasattr(connection, 'pttl'):
connection._pttl = connection.pttl
else:
raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
return connection

@ -1,6 +1,7 @@
from contextlib import contextmanager from contextlib import contextmanager
from redis import Redis from redis import StrictRedis
from .local import LocalStack, release_local from .local import LocalStack, release_local
from .compat.connections import patch_connection
class NoRedisConnectionException(Exception): class NoRedisConnectionException(Exception):
@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception):
@contextmanager @contextmanager
def Connection(connection=None): def Connection(connection=None):
if connection is None: if connection is None:
connection = Redis() connection = StrictRedis()
push_connection(connection) push_connection(connection)
try: try:
yield yield
@ -23,7 +24,7 @@ def Connection(connection=None):
def push_connection(redis): def push_connection(redis):
"""Pushes the given connection on the stack.""" """Pushes the given connection on the stack."""
_connection_stack.push(redis) _connection_stack.push(patch_connection(redis))
def pop_connection(): def pop_connection():
@ -40,7 +41,7 @@ def use_connection(redis=None):
release_local(_connection_stack) release_local(_connection_stack)
if redis is None: if redis is None:
redis = Redis() redis = StrictRedis()
push_connection(redis) push_connection(redis)
@ -56,7 +57,7 @@ def resolve_connection(connection=None):
Raises an exception if it cannot resolve a connection now. Raises an exception if it cannot resolve a connection now.
""" """
if connection is not None: if connection is not None:
return connection return patch_connection(connection)
connection = get_current_connection() connection = get_current_connection()
if connection is None: if connection is None:

@ -11,8 +11,8 @@ class NoQueueError(Exception):
class UnpickleError(Exception): class UnpickleError(Exception):
def __init__(self, message, raw_data): def __init__(self, message, raw_data, inner_exception=None):
super(UnpickleError, self).__init__(message) super(UnpickleError, self).__init__(message, inner_exception)
self.raw_data = raw_data self.raw_data = raw_data
class DequeueTimeout(Exception): class DequeueTimeout(Exception):

@ -1,7 +1,6 @@
import importlib import importlib
import inspect import inspect
import times import times
from collections import namedtuple
from uuid import uuid4 from uuid import uuid4
from cPickle import loads, dumps, UnpicklingError from cPickle import loads, dumps, UnpicklingError
from .local import LocalStack from .local import LocalStack
@ -27,8 +26,8 @@ def unpickle(pickled_string):
""" """
try: try:
obj = loads(pickled_string) obj = loads(pickled_string)
except (StandardError, UnpicklingError): except (StandardError, UnpicklingError) as e:
raise UnpickleError('Could not unpickle.', pickled_string) raise UnpickleError('Could not unpickle.', pickled_string, e)
return obj return obj
@ -289,7 +288,7 @@ class Job(object):
key = self.key key = self.key
obj = {} obj = {}
obj['created_at'] = times.format(self.created_at, 'UTC') obj['created_at'] = times.format(self.created_at or times.now(), 'UTC')
if self.func_name is not None: if self.func_name is not None:
obj['data'] = dumps(self.job_tuple) obj['data'] = dumps(self.job_tuple)

@ -232,6 +232,7 @@ class Queue(object):
except UnpickleError as e: except UnpickleError as e:
# Attach queue information on the exception for improved error # Attach queue information on the exception for improved error
# reporting # reporting
e.job_id = job_id
e.queue = self e.queue = self
raise e raise e
return job return job
@ -314,13 +315,14 @@ class FailedQueue(Queue):
job = Job.fetch(job_id, connection=self.connection) job = Job.fetch(job_id, connection=self.connection)
except NoSuchJobError: except NoSuchJobError:
# Silently ignore/remove this job and return (i.e. do nothing) # Silently ignore/remove this job and return (i.e. do nothing)
self.connection.lrem(self.key, job_id) self.connection._lrem(self.key, 0, job_id)
return return
# Delete it from the failed queue (raise an error if that failed) # Delete it from the failed queue (raise an error if that failed)
if self.connection.lrem(self.key, job.id) == 0: if self.connection._lrem(self.key, 0, job.id) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.') raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.status = Status.QUEUED
job.exc_info = None job.exc_info = None
q = Queue(job.origin, connection=self.connection) q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job, timeout=job.timeout) q.enqueue_job(job, timeout=job.timeout)

@ -48,8 +48,8 @@ def setup_default_arguments(args, settings):
def setup_redis(args): def setup_redis(args):
if args.url is not None: if args.url is not None:
redis_conn = redis.from_url(args.url, db=args.db) redis_conn = redis.StrictRedis.from_url(args.url, db=args.db)
else: else:
redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db,
password=args.password) password=args.password)
use_connection(redis_conn) use_connection(redis_conn)

@ -9,7 +9,7 @@ import os
import sys import sys
import logging import logging
from compat import is_python_version from .compat import is_python_version
def gettermsize(): def gettermsize():

@ -1 +1 @@
VERSION = '0.3.5-dev' VERSION = '0.3.6-dev'

@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)):
else: else:
import unittest2 as unittest # noqa import unittest2 as unittest # noqa
from redis import Redis from redis import StrictRedis
from rq import push_connection, pop_connection from rq import push_connection, pop_connection
@ -14,7 +14,7 @@ def find_empty_redis_database():
will use/connect it when no keys are in there. will use/connect it when no keys are in there.
""" """
for dbnum in range(4, 17): for dbnum in range(4, 17):
testconn = Redis(db=dbnum) testconn = StrictRedis(db=dbnum)
empty = len(testconn.keys('*')) == 0 empty = len(testconn.keys('*')) == 0
if empty: if empty:
return testconn return testconn

@ -259,6 +259,16 @@ class TestFailedQueue(RQTestCase):
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEquals(job.timeout, 200) self.assertEquals(job.timeout, 200)
def test_requeue_sets_status_to_queued(self):
"""Requeueing a job should set its status back to QUEUED."""
job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error'))
get_failed_queue().requeue(job.id)
job = Job.fetch(job.id)
self.assertEqual(job.status, Status.QUEUED)
def test_enqueue_preserves_result_ttl(self): def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl.""" """Enqueueing persists result_ttl."""
q = Queue() q = Queue()

@ -194,13 +194,13 @@ class TestWorker(RQTestCase):
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertNotEqual(self.testconn.ttl(job.key), 0) self.assertNotEqual(self.testconn._ttl(job.key), 0)
# Job with -1 result_ttl don't expire # Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q]) w = Worker([q])
w.work(burst=True) w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None) self.assertEqual(self.testconn._ttl(job.key), -1)
# Job with result_ttl = 0 gets deleted immediately # Job with result_ttl = 0 gets deleted immediately
job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) job = q.enqueue(say_hello, args=('Frank',), result_ttl=0)

Loading…
Cancel
Save