From ada2ad03ca959e993a86ce7259d6d96ef91c8bfa Mon Sep 17 00:00:00 2001 From: Darshan Rai Date: Mon, 3 Dec 2018 05:58:36 +0530 Subject: [PATCH] modify zadd calls for redis-py 3.0 (#1016) * modify zadd calls for redis-py 3.0 redis-py 3.0 changes the zadd interface that accepts a single mapping argument that is expected to be a dict. https://github.com/andymccurdy/redis-py#mset-msetnx-and-zadd * change FailedQueue.push_job_id to always push a str redis-py 3.0 does not attempt to cast values to str and is left to the user. * remove Redis connection patching Since in redis-py 3.0, Redis == StrictRedis class, we no longer need to patch _zadd and other methods. Ref: https://github.com/rq/rq/pull/1016#issuecomment-441010847 --- README.md | 2 +- docs/index.md | 2 +- requirements.txt | 2 +- rq/cli/helpers.py | 4 ++-- rq/compat/connections.py | 41 +-------------------------------------- rq/connections.py | 11 +++++------ rq/contrib/legacy.py | 2 +- rq/defaults.py | 2 +- rq/job.py | 5 +++-- rq/queue.py | 14 ++++++------- rq/registry.py | 4 ++-- rq/worker.py | 10 +++++----- rq/worker_registration.py | 2 +- setup.cfg | 2 +- setup.py | 2 +- tests/__init__.py | 4 ++-- tests/test_decorator.py | 4 ++-- tests/test_registry.py | 26 ++++++++++++------------- tests/test_worker.py | 10 +++++----- 19 files changed, 55 insertions(+), 94 deletions(-) diff --git a/README.md b/README.md index 2ba90e7..7a26ef8 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily. -RQ requires Redis >= 2.7.0. +RQ requires Redis >= 3.0.0. [![Build status](https://travis-ci.org/rq/rq.svg?branch=master)](https://secure.travis-ci.org/rq/rq) [![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq) diff --git a/docs/index.md b/docs/index.md index 15e77ed..64fddcf 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,7 +7,7 @@ RQ (_Redis Queue_) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily. -RQ requires Redis >= 2.7.0. +RQ requires Redis >= 3.0.0. ## Getting started diff --git a/requirements.txt b/requirements.txt index 291ee5b..7c061a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -redis>=2.7 +redis>=3.0 click>=3.0.0 diff --git a/rq/cli/helpers.py b/rq/cli/helpers.py index 33a267b..0e964f9 100644 --- a/rq/cli/helpers.py +++ b/rq/cli/helpers.py @@ -9,7 +9,7 @@ from functools import partial import click import redis -from redis import StrictRedis +from redis import Redis from redis.sentinel import Sentinel from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS, DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS) @@ -30,7 +30,7 @@ def read_config_file(module): if k.upper() == k]) -def get_redis_from_config(settings, connection_class=StrictRedis): +def get_redis_from_config(settings, connection_class=Redis): """Returns a StrictRedis instance from a dictionary of settings. To use redis sentinel, you must specify a dictionary in the configuration file. Example of a dictionary with keys without values: diff --git a/rq/compat/connections.py b/rq/compat/connections.py index 7d8798b..8e2b511 100644 --- a/rq/compat/connections.py +++ b/rq/compat/connections.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, from functools import partial -from redis import Redis, StrictRedis +from redis import Redis def fix_return_type(func): @@ -16,42 +16,3 @@ def fix_return_type(func): value = -1 return value return _inner - - -PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl'] - - -def _hset(self, key, field_name, value, pipeline=None): - connection = pipeline if pipeline is not None else self - connection.hset(key, field_name, value) - - -def patch_connection(connection): - # Don't patch already patches objects - if all([hasattr(connection, attr) for attr in PATCHED_METHODS]): - return connection - - connection._hset = partial(_hset, connection) - - if isinstance(connection, Redis): - connection._setex = partial(StrictRedis.setex, connection) - connection._lrem = partial(StrictRedis.lrem, connection) - connection._zadd = partial(StrictRedis.zadd, connection) - connection._pipeline = partial(StrictRedis.pipeline, connection) - connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection)) - if hasattr(connection, 'pttl'): - connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection)) - - # add support for mock redis objects - elif hasattr(connection, 'setex'): - connection._setex = connection.setex - connection._lrem = connection.lrem - connection._zadd = connection.zadd - connection._pipeline = connection.pipeline - connection._ttl = connection.ttl - if hasattr(connection, 'pttl'): - connection._pttl = connection.pttl - else: - raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection))) - - return connection diff --git a/rq/connections.py b/rq/connections.py index d7f6a61..7186056 100644 --- a/rq/connections.py +++ b/rq/connections.py @@ -4,9 +4,8 @@ from __future__ import (absolute_import, division, print_function, from contextlib import contextmanager -from redis import StrictRedis +from redis import Redis -from .compat.connections import patch_connection from .local import LocalStack, release_local @@ -17,7 +16,7 @@ class NoRedisConnectionException(Exception): @contextmanager def Connection(connection=None): # noqa if connection is None: - connection = StrictRedis() + connection = Redis() push_connection(connection) try: yield @@ -30,7 +29,7 @@ def Connection(connection=None): # noqa def push_connection(redis): """Pushes the given connection on the stack.""" - _connection_stack.push(patch_connection(redis)) + _connection_stack.push(redis) def pop_connection(): @@ -47,7 +46,7 @@ def use_connection(redis=None): release_local(_connection_stack) if redis is None: - redis = StrictRedis() + redis = Redis() push_connection(redis) @@ -63,7 +62,7 @@ def resolve_connection(connection=None): Raises an exception if it cannot resolve a connection now. """ if connection is not None: - return patch_connection(connection) + return connection connection = get_current_connection() if connection is None: diff --git a/rq/contrib/legacy.py b/rq/contrib/legacy.py index 710c2e3..880d865 100644 --- a/rq/contrib/legacy.py +++ b/rq/contrib/legacy.py @@ -23,7 +23,7 @@ def cleanup_ghosts(conn=None): """ conn = conn if conn else get_current_connection() for worker in Worker.all(connection=conn): - if conn._ttl(worker.key) == -1: + if conn.ttl(worker.key) == -1: ttl = worker.default_worker_ttl conn.expire(worker.key, ttl) logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl)) diff --git a/rq/defaults.py b/rq/defaults.py index 517144c..89792c0 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -1,7 +1,7 @@ DEFAULT_JOB_CLASS = 'rq.job.Job' DEFAULT_QUEUE_CLASS = 'rq.Queue' DEFAULT_WORKER_CLASS = 'rq.Worker' -DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis' +DEFAULT_CONNECTION_CLASS = 'redis.Redis' DEFAULT_WORKER_TTL = 420 DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_RESULT_TTL = 500 diff --git a/rq/job.py b/rq/job.py index 9432813..e77b085 100644 --- a/rq/job.py +++ b/rq/job.py @@ -155,7 +155,8 @@ class Job(object): def set_status(self, status, pipeline=None): self._status = status - self.connection._hset(self.key, 'status', self._status, pipeline) + connection = pipeline or self.connection + connection.hset(self.key, 'status', self._status) def _set_status(self, status): warnings.warn( @@ -529,7 +530,7 @@ class Job(object): cancellation. """ from .queue import Queue - pipeline = pipeline or self.connection._pipeline() + pipeline = pipeline or self.connection.pipeline() if self.origin: q = Queue(name=self.origin, connection=self.connection) q.remove(self, pipeline=pipeline) diff --git a/rq/queue.py b/rq/queue.py index b8b5b47..2116e50 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -121,7 +121,7 @@ class Queue(object): if delete_jobs: self.empty() - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: pipeline.srem(self.redis_queues_keys, self._key) pipeline.delete(self._key) pipeline.execute() @@ -183,7 +183,7 @@ class Queue(object): pipeline.lrem(self.key, 1, job_id) return - return self.connection._lrem(self.key, 1, job_id) + return self.connection.lrem(self.key, 1, job_id) def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while @@ -237,7 +237,7 @@ class Queue(object): if not isinstance(depends_on, self.job_class): depends_on = self.job_class(id=depends_on, connection=self.connection) - with self.connection._pipeline() as pipe: + with self.connection.pipeline() as pipe: while True: try: pipe.watch(depends_on.key) @@ -320,7 +320,7 @@ class Queue(object): If Queue is instantiated with is_async=False, job is executed immediately. """ - pipe = pipeline if pipeline is not None else self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection.pipeline() # Add Queue key set pipe.sadd(self.redis_queues_keys, self.key) @@ -354,7 +354,7 @@ class Queue(object): """ from .registry import DeferredJobRegistry - pipe = pipeline if pipeline is not None else self.connection._pipeline() + pipe = pipeline if pipeline is not None else self.connection.pipeline() dependents_key = job.dependents_key while True: @@ -522,7 +522,7 @@ class FailedQueue(Queue): queue). """ - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: # Add Queue key set self.connection.sadd(self.redis_queues_keys, self.key) @@ -530,7 +530,7 @@ class FailedQueue(Queue): job.save(pipeline=pipeline, include_meta=False) job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire - self.push_job_id(job.id, pipeline=pipeline) + self.push_job_id(str(job.id), pipeline=pipeline) pipeline.execute() return job diff --git a/rq/registry.py b/rq/registry.py index 2b8b992..b482c30 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -44,9 +44,9 @@ class BaseRegistry(object): if score == -1: score = '+inf' if pipeline is not None: - return pipeline.zadd(self.key, score, job.id) + return pipeline.zadd(self.key, {job.id: score}) - return self.connection._zadd(self.key, score, job.id) + return self.connection.zadd(self.key, {job.id: score}) def remove(self, job, pipeline=None): connection = pipeline if pipeline is not None else self.connection diff --git a/rq/worker.py b/rq/worker.py index cdfa33f..3e21cf4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -277,7 +277,7 @@ class Worker(object): raise ValueError(msg.format(self.name)) key = self.key queues = ','.join(self.queue_names()) - with self.connection._pipeline() as p: + with self.connection.pipeline() as p: p.delete(key) now = utcnow() now_in_string = utcformat(utcnow()) @@ -292,7 +292,7 @@ class Worker(object): def register_death(self): """Registers its own death.""" self.log.debug('Registering death') - with self.connection._pipeline() as p: + with self.connection.pipeline() as p: # We cannot use self.state = 'dead' here, because that would # rollback the pipeline worker_registration.unregister(self, p) @@ -703,7 +703,7 @@ class Worker(object): if heartbeat_ttl is None: heartbeat_ttl = self.job_monitoring_interval + 5 - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) self.heartbeat(heartbeat_ttl, pipeline=pipeline) @@ -723,7 +723,7 @@ class Worker(object): 2. Removing the job from the started_job_registry 3. Setting the workers current job to None """ - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: if started_job_registry is None: started_job_registry = StartedJobRegistry(job.origin, self.connection, @@ -745,7 +745,7 @@ class Worker(object): def handle_job_success(self, job, queue, started_job_registry): - with self.connection._pipeline() as pipeline: + with self.connection.pipeline() as pipeline: while True: try: # if dependencies are inserted after enqueue_dependents diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 73cb0ef..d24fac3 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -17,7 +17,7 @@ def register(worker, pipeline=None): def unregister(worker, pipeline=None): """Remove worker key from Redis.""" if pipeline is None: - connection = worker.connection._pipeline() + connection = worker.connection.pipeline() else: connection = pipeline diff --git a/setup.cfg b/setup.cfg index 5752f88..912a1b4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,5 +1,5 @@ [bdist_rpm] -requires = redis >= 2.7.0 +requires = redis >= 3.0.0 click >= 3.0 [wheel] diff --git a/setup.py b/setup.py index 3c425c7..2d9756f 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,7 @@ setup( zip_safe=False, platforms='any', install_requires=[ - 'redis >= 2.7.0', + 'redis >= 3.0.0', 'click >= 5.0' ], python_requires='>=2.7', diff --git a/tests/__init__.py b/tests/__init__.py index fea12b5..c916a30 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function, import logging -from redis import StrictRedis +from redis import Redis from rq import pop_connection, push_connection from rq.compat import is_python_version @@ -19,7 +19,7 @@ def find_empty_redis_database(): will use/connect it when no keys are in there. """ for dbnum in range(4, 17): - testconn = StrictRedis(db=dbnum) + testconn = Redis(db=dbnum) empty = testconn.dbsize() == 0 if empty: return testconn diff --git a/tests/test_decorator.py b/tests/test_decorator.py index 1b9e974..5f74935 100644 --- a/tests/test_decorator.py +++ b/tests/test_decorator.py @@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) import mock -from redis import StrictRedis +from redis import Redis from rq.decorators import job from rq.job import Job from rq.worker import DEFAULT_RESULT_TTL @@ -152,7 +152,7 @@ class TestDecorator(RQTestCase): def test_decorator_connection_laziness(self, resolve_connection): """Ensure that job decorator resolve connection in `lazy` way """ - resolve_connection.return_value = StrictRedis() + resolve_connection.return_value = Redis() @job(queue='queue_name') def foo(): diff --git a/tests/test_registry.py b/tests/test_registry.py index 41d0c1c..ed2dbd2 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -62,17 +62,17 @@ class TestRegistry(RQTestCase): def test_get_job_ids(self): """Getting job ids from StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, timestamp + 10, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 20, 'bar') + self.testconn.zadd(self.registry.key, {'foo': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 20}) self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar']) def test_get_expired_job_ids(self): """Getting expired job ids form StartedJobRegistry.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') + self.testconn.zadd(self.registry.key, {'foo': 1}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.assertEqual(self.registry.get_expired_job_ids(), ['foo']) self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20), @@ -86,7 +86,7 @@ class TestRegistry(RQTestCase): queue = Queue(connection=self.testconn) job = queue.enqueue(say_hello) - self.testconn.zadd(self.registry.key, 2, job.id) + self.testconn.zadd(self.registry.key, {job.id: 2}) self.registry.cleanup(1) self.assertNotIn(job.id, failed_queue.job_ids) @@ -142,8 +142,8 @@ class TestRegistry(RQTestCase): def test_get_job_count(self): """StartedJobRegistry returns the right number of job count.""" timestamp = current_timestamp() + 10 - self.testconn.zadd(self.registry.key, timestamp, 'foo') - self.testconn.zadd(self.registry.key, timestamp, 'bar') + self.testconn.zadd(self.registry.key, {'foo': timestamp}) + self.testconn.zadd(self.registry.key, {'bar': timestamp}) self.assertEqual(self.registry.count, 2) self.assertEqual(len(self.registry), 2) @@ -153,10 +153,10 @@ class TestRegistry(RQTestCase): queue = Queue(connection=self.testconn) finished_job_registry = FinishedJobRegistry(connection=self.testconn) - self.testconn.zadd(finished_job_registry.key, 1, 'foo') + self.testconn.zadd(finished_job_registry.key, {'foo': 1}) started_job_registry = StartedJobRegistry(connection=self.testconn) - self.testconn.zadd(started_job_registry.key, 1, 'foo') + self.testconn.zadd(started_job_registry.key, {'foo': 1}) clean_registries(queue) self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0) @@ -175,9 +175,9 @@ class TestFinishedJobRegistry(RQTestCase): def test_cleanup(self): """Finished job registry removes expired jobs.""" timestamp = current_timestamp() - self.testconn.zadd(self.registry.key, 1, 'foo') - self.testconn.zadd(self.registry.key, timestamp + 10, 'bar') - self.testconn.zadd(self.registry.key, timestamp + 30, 'baz') + self.testconn.zadd(self.registry.key, {'foo': 1}) + self.testconn.zadd(self.registry.key, {'bar': timestamp + 10}) + self.testconn.zadd(self.registry.key, {'baz': timestamp + 30}) self.registry.cleanup() self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz']) diff --git a/tests/test_worker.py b/tests/test_worker.py index 5d2370e..d56b5cb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -415,7 +415,7 @@ class TestWorker(RQTestCase): w = Worker([q]) self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) - self.assertNotEqual(self.testconn._ttl(job.key), 0) + self.assertNotEqual(self.testconn.ttl(job.key), 0) self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with -1 result_ttl don't expire @@ -423,7 +423,7 @@ class TestWorker(RQTestCase): w = Worker([q]) self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) w.work(burst=True) - self.assertEqual(self.testconn._ttl(job.key), -1) + self.assertEqual(self.testconn.ttl(job.key), -1) self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1)) # Job with result_ttl = 0 gets deleted immediately @@ -676,12 +676,12 @@ class TestWorker(RQTestCase): """worker.clean_registries sets last_cleaned_at and cleans registries.""" foo_queue = Queue('foo', connection=self.testconn) foo_registry = StartedJobRegistry('foo', connection=self.testconn) - self.testconn.zadd(foo_registry.key, 1, 'foo') + self.testconn.zadd(foo_registry.key, {'foo': 1}) self.assertEqual(self.testconn.zcard(foo_registry.key), 1) bar_queue = Queue('bar', connection=self.testconn) bar_registry = StartedJobRegistry('bar', connection=self.testconn) - self.testconn.zadd(bar_registry.key, 1, 'bar') + self.testconn.zadd(bar_registry.key, {'bar': 1}) self.assertEqual(self.testconn.zcard(bar_registry.key), 1) worker = Worker([foo_queue, bar_queue]) @@ -706,7 +706,7 @@ class TestWorker(RQTestCase): """Worker calls clean_registries when run.""" queue = Queue(connection=self.testconn) registry = StartedJobRegistry(connection=self.testconn) - self.testconn.zadd(registry.key, 1, 'foo') + self.testconn.zadd(registry.key, {'foo': 1}) worker = Worker(queue, connection=self.testconn) worker.work(burst=True)