From 2baa2e370fea30003ea85961d78bf6738f34de4e Mon Sep 17 00:00:00 2001 From: Idan Kamara Date: Wed, 30 Jan 2013 14:28:26 +0200 Subject: [PATCH 01/11] job: save the real exception when unpickling fails We raise our own exception which hides the real error (often an ImportError), making it difficult to see what happend. Instead, save the original exception too. --- rq/exceptions.py | 4 ++-- rq/job.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 4d2cb6a..4fb983d 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -11,6 +11,6 @@ class NoQueueError(Exception): class UnpickleError(Exception): - def __init__(self, message, raw_data): - super(UnpickleError, self).__init__(message) + def __init__(self, message, raw_data, inner_exception): + super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data diff --git a/rq/job.py b/rq/job.py index e94f512..aebf9eb 100644 --- a/rq/job.py +++ b/rq/job.py @@ -27,8 +27,8 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except (StandardError, UnpicklingError): - raise UnpickleError('Could not unpickle.', pickled_string) + except (StandardError, UnpicklingError), e: + raise UnpickleError('Could not unpickle.', pickled_string, e) return obj From 67880343f1e3e8a0127db445ea07afba4e1a97b9 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Thu, 31 Jan 2013 16:28:16 +0100 Subject: [PATCH 02/11] Make inner_exception optional, and use new except syntax. --- rq/exceptions.py | 2 +- rq/job.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rq/exceptions.py b/rq/exceptions.py index 4fb983d..59c1429 100644 --- a/rq/exceptions.py +++ b/rq/exceptions.py @@ -11,6 +11,6 @@ class NoQueueError(Exception): class UnpickleError(Exception): - def __init__(self, message, raw_data, inner_exception): + def __init__(self, message, raw_data, inner_exception=None): super(UnpickleError, self).__init__(message, inner_exception) self.raw_data = raw_data diff --git a/rq/job.py b/rq/job.py index aebf9eb..2000806 100644 --- a/rq/job.py +++ b/rq/job.py @@ -27,7 +27,7 @@ def unpickle(pickled_string): """ try: obj = loads(pickled_string) - except (StandardError, UnpicklingError), e: + except (StandardError, UnpicklingError) as e: raise UnpickleError('Could not unpickle.', pickled_string, e) return obj From 54254f227112f6cd32cc47ee1a2494ef9923a10e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 6 Feb 2013 22:36:23 +0100 Subject: [PATCH 03/11] Patch the connection instances. This patches the connection object (which is either a StrictRedis instance or a Redis instance), to have alternative class methods that behave exactly like their StrictRedis counterparts, no matter whether which type the object is. Only the ambiguous methods are patched. The exhaustive list: - _zadd (fixes argument order) - _lrem (fixes argument order) - _setex (fixes argument order) - _pipeline (always returns a StrictPipeline) - _ttl (fixes return value) - _pttl (fixes return value) This makes it possible to call the methods reliably without polluting the RQ code any further. --- CHANGES.md | 8 ++++++-- rq/compat/connections.py | 44 ++++++++++++++++++++++++++++++++++++++++ rq/connections.py | 11 +++++----- rq/queue.py | 4 ++-- rq/scripts/__init__.py | 4 ++-- tests/__init__.py | 4 ++-- tests/test_worker.py | 4 ++-- 7 files changed, 64 insertions(+), 15 deletions(-) create mode 100644 rq/compat/connections.py diff --git a/CHANGES.md b/CHANGES.md index 4bf554a..7cee984 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,10 @@ - `ended_at` is now recorded for normally finished jobs, too. (Previously only for failed jobs.) +- Adds support for both `Redis` and `StrictRedis` connection types + +- Makes `StrictRedis` the default connection type if none is explicitly provided + ### 0.3.4 (January 23rd, 2013) @@ -93,11 +97,11 @@ invocations: ```python - from redis import Redis + from redis import StrictRedis from rq.decorators import job # Connect to Redis - redis = Redis() + redis = StrictRedis() @job('high', timeout=10, connection=redis) def some_work(x, y): diff --git a/rq/compat/connections.py b/rq/compat/connections.py new file mode 100644 index 0000000..0374b5b --- /dev/null +++ b/rq/compat/connections.py @@ -0,0 +1,44 @@ +from redis import Redis, StrictRedis +from functools import partial + + +def fix_return_type(func): + # deliberately no functools.wraps() call here, since the function being + # wrapped is a partial, which has no module + def _inner(*args, **kwargs): + value = func(*args, **kwargs) + if value is None: + value = -1 + return value + return _inner + + +def patch_connection(connection): + if not isinstance(connection, StrictRedis): + raise ValueError('A StrictRedis or Redis connection is required.') + + # Don't patch already patches objects + PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] + if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): + return connection + + if isinstance(connection, Redis): + connection._setex = partial(StrictRedis.setex, connection) + connection._lrem = partial(StrictRedis.lrem, connection) + connection._zadd = partial(StrictRedis.zadd, connection) + connection._pipeline = partial(StrictRedis.pipeline, connection) + connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) + if hasattr(connection, 'pttl'): + connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) + elif isinstance(connection, StrictRedis): + connection._setex = connection.setex + connection._lrem = connection.lrem + connection._zadd = connection.zadd + connection._pipeline = connection.pipeline + connection._ttl = connection.ttl + if hasattr(connection, 'pttl'): + connection._pttl = connection.pttl + else: + raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) + + return connection diff --git a/rq/connections.py b/rq/connections.py index 05de2f1..f4a72e4 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -1,6 +1,7 @@ from contextlib import contextmanager -from redis import Redis +from redis import StrictRedis from .local import LocalStack, release_local +from .compat.connections import patch_connection class NoRedisConnectionException(Exception): @@ -10,7 +11,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): if connection is None: - connection = Redis() + connection = StrictRedis() push_connection(connection) try: yield @@ -23,7 +24,7 @@ def Connection(connection=None): def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(redis) + _connection_stack.push(patch_connection(redis)) def pop_connection(): @@ -40,7 +41,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = Redis() + redis = StrictRedis() push_connection(redis) @@ -56,7 +57,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return connection + return patch_connection(connection) connection = get_current_connection() if connection is None: diff --git a/rq/queue.py b/rq/queue.py index b962b09..0e4c11a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -302,11 +302,11 @@ class FailedQueue(Queue): job = Job.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) - self.connection.lrem(self.key, job_id) + self.connection._lrem(self.key, 0, job_id) return # Delete it from the failed queue (raise an error if that failed) - if self.connection.lrem(self.key, job.id) == 0: + if self.connection._lrem(self.key, 0, job.id) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') job.exc_info = None diff --git a/rq/scripts/__init__.py b/rq/scripts/__init__.py index 94441ab..119f4f6 100644 --- a/rq/scripts/__init__.py +++ b/rq/scripts/__init__.py @@ -48,8 +48,8 @@ def setup_default_arguments(args, settings): def setup_redis(args): if args.url is not None: - redis_conn = redis.from_url(args.url, db=args.db) + redis_conn = redis.StrictRedis.from_url(args.url, db=args.db) else: - redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db, + redis_conn = redis.StrictRedis(host=args.host, port=args.port, db=args.db, password=args.password) use_connection(redis_conn) diff --git a/tests/__init__.py b/tests/__init__.py index cba1035..688c11f 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -5,7 +5,7 @@ if is_python_version((2, 7), (3, 2)): else: import unittest2 as unittest # noqa -from redis import Redis +from redis import StrictRedis from rq import push_connection, pop_connection @@ -14,7 +14,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = Redis(db=dbnum) + testconn = StrictRedis(db=dbnum) empty = len(testconn.keys('*')) == 0 if empty: return testconn diff --git a/tests/test_worker.py b/tests/test_worker.py index e3e706e..b662b09 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -185,13 +185,13 @@ class TestWorker(RQTestCase): job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) w.work(burst=True) - self.assertNotEqual(self.testconn.ttl(job.key), 0) + self.assertNotEqual(self.testconn._ttl(job.key), 0) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) w.work(burst=True) - self.assertEqual(self.testconn.ttl(job.key), None) + self.assertEqual(self.testconn._ttl(job.key), -1) # Job with result_ttl = 0 gets deleted immediately job = q.enqueue(say_hello, args=('Frank',), result_ttl=0) From c427eda36cc6d729d8078d04539a1707b637aeba Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 6 Feb 2013 23:25:50 +0100 Subject: [PATCH 04/11] Bump version to 0.3.5. --- rq/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/version.py b/rq/version.py index 8e18e5b..c3e0bec 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.5-dev' +VERSION = '0.3.5' From cd2a759be8dacc149f4b8f7cd8158d6079e5a8cb Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 6 Feb 2013 23:26:52 +0100 Subject: [PATCH 05/11] Add release date. --- CHANGES.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7cee984..9341b53 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,5 @@ ### 0.3.5 -(not yet released) +(February 6th, 2013) - `ended_at` is now recorded for normally finished jobs, too. (Previously only for failed jobs.) From 7d37c257aef88cdc2fb28d18bfed54adaf267b06 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 6 Feb 2013 23:28:03 +0100 Subject: [PATCH 06/11] Prepare for 0.3.6. --- CHANGES.md | 6 ++++++ rq/version.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 9341b53..104d503 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +### 0.3.6 +(not yet released) + +- ... + + ### 0.3.5 (February 6th, 2013) diff --git a/rq/version.py b/rq/version.py index c3e0bec..d4ab04a 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.5' +VERSION = '0.3.6-dev' From 83525c42c749a6908ce7f623594d7bd3d3f1084e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 11 Feb 2013 14:59:31 +0100 Subject: [PATCH 07/11] Prevent against 'created_at' field being None. --- rq/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index 2000806..746e625 100644 --- a/rq/job.py +++ b/rq/job.py @@ -289,7 +289,7 @@ class Job(object): key = self.key obj = {} - obj['created_at'] = times.format(self.created_at, 'UTC') + obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') if self.func_name is not None: obj['data'] = dumps(self.job_tuple) From 4688498e2d9bc07a5e3d3fac4224eac234d8ecf4 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 11 Feb 2013 15:06:24 +0100 Subject: [PATCH 08/11] Attach job ID when unpickling fails in .dequeue(). This makes the behaviour consistent with .dequeue_any(). --- rq/queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/queue.py b/rq/queue.py index 0e4c11a..e84d0a5 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -223,6 +223,7 @@ class Queue(object): except UnpickleError as e: # Attach queue information on the exception for improved error # reporting + e.job_id = job_id e.queue = self raise e return job From 05d744c9afce4d6328092107dc2aa5c4700fdf92 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Wed, 13 Feb 2013 11:50:40 +0700 Subject: [PATCH 09/11] Requeuing a job should set its status back to Queued. --- rq/queue.py | 1 + tests/test_queue.py | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/rq/queue.py b/rq/queue.py index b962b09..2103620 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -309,6 +309,7 @@ class FailedQueue(Queue): if self.connection.lrem(self.key, job.id) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') + job.status = Status.QUEUED job.exc_info = None q = Queue(job.origin, connection=self.connection) q.enqueue_job(job, timeout=job.timeout) diff --git a/tests/test_queue.py b/tests/test_queue.py index 4a88f65..bd0d378 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -259,6 +259,16 @@ class TestFailedQueue(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.timeout, 200) + def test_requeue_sets_status_to_queued(self): + """Requeueing a job should set its status back to QUEUED.""" + job = Job.create(func=div_by_zero, args=(1, 2, 3)) + job.save() + get_failed_queue().quarantine(job, Exception('Some fake error')) + get_failed_queue().requeue(job.id) + + job = Job.fetch(job.id) + self.assertEqual(job.status, Status.QUEUED) + def test_enqueue_preserves_result_ttl(self): """Enqueueing persists result_ttl.""" q = Queue() From 192770773a0c756bd2401b49eccd426cbdbf9e83 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 13 Feb 2013 12:04:34 +0100 Subject: [PATCH 10/11] Remove unused import. --- rq/job.py | 1 - 1 file changed, 1 deletion(-) diff --git a/rq/job.py b/rq/job.py index 746e625..4d625bf 100644 --- a/rq/job.py +++ b/rq/job.py @@ -1,7 +1,6 @@ import importlib import inspect import times -from collections import namedtuple from uuid import uuid4 from cPickle import loads, dumps, UnpicklingError from .local import LocalStack From a7f927f845152c2eb03494dc359c2f491037a28d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 13 Feb 2013 12:04:53 +0100 Subject: [PATCH 11/11] Fix non-relative import. --- rq/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/utils.py b/rq/utils.py index 14241a6..5fa4940 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -9,7 +9,7 @@ import os import sys import logging -from compat import is_python_version +from .compat import is_python_version def gettermsize():