modify zadd calls for redis-py 3.0 (#1016)

* modify zadd calls for redis-py 3.0

redis-py 3.0 changes the zadd interface that accepts a single
mapping argument that is expected to be a dict.
https://github.com/andymccurdy/redis-py#mset-msetnx-and-zadd

* change FailedQueue.push_job_id to always push a str

redis-py 3.0 does not attempt to cast values to str and is left
to the user.

* remove Redis connection patching

Since in redis-py 3.0, Redis == StrictRedis class, we no longer
need to patch _zadd and other methods.
Ref: https://github.com/rq/rq/pull/1016#issuecomment-441010847
main
Darshan Rai 6 years ago committed by Selwin Ong
parent 6559b0ffd7
commit ada2ad03ca

@ -3,7 +3,7 @@ them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It should be integrated in your web stack
easily.
RQ requires Redis >= 2.7.0.
RQ requires Redis >= 3.0.0.
[![Build status](https://travis-ci.org/rq/rq.svg?branch=master)](https://secure.travis-ci.org/rq/rq)
[![PyPI](https://img.shields.io/pypi/pyversions/rq.svg)](https://pypi.python.org/pypi/rq)

@ -7,7 +7,7 @@ RQ (_Redis Queue_) is a simple Python library for queueing jobs and processing
them in the background with workers. It is backed by Redis and it is designed
to have a low barrier to entry. It can be integrated in your web stack easily.
RQ requires Redis >= 2.7.0.
RQ requires Redis >= 3.0.0.
## Getting started

@ -1,2 +1,2 @@
redis>=2.7
redis>=3.0
click>=3.0.0

@ -9,7 +9,7 @@ from functools import partial
import click
import redis
from redis import StrictRedis
from redis import Redis
from redis.sentinel import Sentinel
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
@ -30,7 +30,7 @@ def read_config_file(module):
if k.upper() == k])
def get_redis_from_config(settings, connection_class=StrictRedis):
def get_redis_from_config(settings, connection_class=Redis):
"""Returns a StrictRedis instance from a dictionary of settings.
To use redis sentinel, you must specify a dictionary in the configuration file.
Example of a dictionary with keys without values:

@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
from functools import partial
from redis import Redis, StrictRedis
from redis import Redis
def fix_return_type(func):
@ -16,42 +16,3 @@ def fix_return_type(func):
value = -1
return value
return _inner
PATCHED_METHODS = ['_setex', '_lrem', '_zadd', '_pipeline', '_ttl']
def _hset(self, key, field_name, value, pipeline=None):
connection = pipeline if pipeline is not None else self
connection.hset(key, field_name, value)
def patch_connection(connection):
# Don't patch already patches objects
if all([hasattr(connection, attr) for attr in PATCHED_METHODS]):
return connection
connection._hset = partial(_hset, connection)
if isinstance(connection, Redis):
connection._setex = partial(StrictRedis.setex, connection)
connection._lrem = partial(StrictRedis.lrem, connection)
connection._zadd = partial(StrictRedis.zadd, connection)
connection._pipeline = partial(StrictRedis.pipeline, connection)
connection._ttl = fix_return_type(partial(StrictRedis.ttl, connection))
if hasattr(connection, 'pttl'):
connection._pttl = fix_return_type(partial(StrictRedis.pttl, connection))
# add support for mock redis objects
elif hasattr(connection, 'setex'):
connection._setex = connection.setex
connection._lrem = connection.lrem
connection._zadd = connection.zadd
connection._pipeline = connection.pipeline
connection._ttl = connection.ttl
if hasattr(connection, 'pttl'):
connection._pttl = connection.pttl
else:
raise ValueError('Unanticipated connection type: {}. Please report this.'.format(type(connection)))
return connection

@ -4,9 +4,8 @@ from __future__ import (absolute_import, division, print_function,
from contextlib import contextmanager
from redis import StrictRedis
from redis import Redis
from .compat.connections import patch_connection
from .local import LocalStack, release_local
@ -17,7 +16,7 @@ class NoRedisConnectionException(Exception):
@contextmanager
def Connection(connection=None): # noqa
if connection is None:
connection = StrictRedis()
connection = Redis()
push_connection(connection)
try:
yield
@ -30,7 +29,7 @@ def Connection(connection=None): # noqa
def push_connection(redis):
"""Pushes the given connection on the stack."""
_connection_stack.push(patch_connection(redis))
_connection_stack.push(redis)
def pop_connection():
@ -47,7 +46,7 @@ def use_connection(redis=None):
release_local(_connection_stack)
if redis is None:
redis = StrictRedis()
redis = Redis()
push_connection(redis)
@ -63,7 +62,7 @@ def resolve_connection(connection=None):
Raises an exception if it cannot resolve a connection now.
"""
if connection is not None:
return patch_connection(connection)
return connection
connection = get_current_connection()
if connection is None:

@ -23,7 +23,7 @@ def cleanup_ghosts(conn=None):
"""
conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn):
if conn._ttl(worker.key) == -1:
if conn.ttl(worker.key) == -1:
ttl = worker.default_worker_ttl
conn.expire(worker.key, ttl)
logger.info('Marked ghosted worker {0} to expire in {1} seconds.'.format(worker.name, ttl))

@ -1,7 +1,7 @@
DEFAULT_JOB_CLASS = 'rq.job.Job'
DEFAULT_QUEUE_CLASS = 'rq.Queue'
DEFAULT_WORKER_CLASS = 'rq.Worker'
DEFAULT_CONNECTION_CLASS = 'redis.StrictRedis'
DEFAULT_CONNECTION_CLASS = 'redis.Redis'
DEFAULT_WORKER_TTL = 420
DEFAULT_JOB_MONITORING_INTERVAL = 30
DEFAULT_RESULT_TTL = 500

@ -155,7 +155,8 @@ class Job(object):
def set_status(self, status, pipeline=None):
self._status = status
self.connection._hset(self.key, 'status', self._status, pipeline)
connection = pipeline or self.connection
connection.hset(self.key, 'status', self._status)
def _set_status(self, status):
warnings.warn(
@ -529,7 +530,7 @@ class Job(object):
cancellation.
"""
from .queue import Queue
pipeline = pipeline or self.connection._pipeline()
pipeline = pipeline or self.connection.pipeline()
if self.origin:
q = Queue(name=self.origin, connection=self.connection)
q.remove(self, pipeline=pipeline)

@ -121,7 +121,7 @@ class Queue(object):
if delete_jobs:
self.empty()
with self.connection._pipeline() as pipeline:
with self.connection.pipeline() as pipeline:
pipeline.srem(self.redis_queues_keys, self._key)
pipeline.delete(self._key)
pipeline.execute()
@ -183,7 +183,7 @@ class Queue(object):
pipeline.lrem(self.key, 1, job_id)
return
return self.connection._lrem(self.key, 1, job_id)
return self.connection.lrem(self.key, 1, job_id)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
@ -237,7 +237,7 @@ class Queue(object):
if not isinstance(depends_on, self.job_class):
depends_on = self.job_class(id=depends_on,
connection=self.connection)
with self.connection._pipeline() as pipe:
with self.connection.pipeline() as pipe:
while True:
try:
pipe.watch(depends_on.key)
@ -320,7 +320,7 @@ class Queue(object):
If Queue is instantiated with is_async=False, job is executed immediately.
"""
pipe = pipeline if pipeline is not None else self.connection._pipeline()
pipe = pipeline if pipeline is not None else self.connection.pipeline()
# Add Queue key set
pipe.sadd(self.redis_queues_keys, self.key)
@ -354,7 +354,7 @@ class Queue(object):
"""
from .registry import DeferredJobRegistry
pipe = pipeline if pipeline is not None else self.connection._pipeline()
pipe = pipeline if pipeline is not None else self.connection.pipeline()
dependents_key = job.dependents_key
while True:
@ -522,7 +522,7 @@ class FailedQueue(Queue):
queue).
"""
with self.connection._pipeline() as pipeline:
with self.connection.pipeline() as pipeline:
# Add Queue key set
self.connection.sadd(self.redis_queues_keys, self.key)
@ -530,7 +530,7 @@ class FailedQueue(Queue):
job.save(pipeline=pipeline, include_meta=False)
job.cleanup(ttl=-1, pipeline=pipeline) # failed job won't expire
self.push_job_id(job.id, pipeline=pipeline)
self.push_job_id(str(job.id), pipeline=pipeline)
pipeline.execute()
return job

@ -44,9 +44,9 @@ class BaseRegistry(object):
if score == -1:
score = '+inf'
if pipeline is not None:
return pipeline.zadd(self.key, score, job.id)
return pipeline.zadd(self.key, {job.id: score})
return self.connection._zadd(self.key, score, job.id)
return self.connection.zadd(self.key, {job.id: score})
def remove(self, job, pipeline=None):
connection = pipeline if pipeline is not None else self.connection

@ -277,7 +277,7 @@ class Worker(object):
raise ValueError(msg.format(self.name))
key = self.key
queues = ','.join(self.queue_names())
with self.connection._pipeline() as p:
with self.connection.pipeline() as p:
p.delete(key)
now = utcnow()
now_in_string = utcformat(utcnow())
@ -292,7 +292,7 @@ class Worker(object):
def register_death(self):
"""Registers its own death."""
self.log.debug('Registering death')
with self.connection._pipeline() as p:
with self.connection.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
worker_registration.unregister(self, p)
@ -703,7 +703,7 @@ class Worker(object):
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5
with self.connection._pipeline() as pipeline:
with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
@ -723,7 +723,7 @@ class Worker(object):
2. Removing the job from the started_job_registry
3. Setting the workers current job to None
"""
with self.connection._pipeline() as pipeline:
with self.connection.pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(job.origin,
self.connection,
@ -745,7 +745,7 @@ class Worker(object):
def handle_job_success(self, job, queue, started_job_registry):
with self.connection._pipeline() as pipeline:
with self.connection.pipeline() as pipeline:
while True:
try:
# if dependencies are inserted after enqueue_dependents

@ -17,7 +17,7 @@ def register(worker, pipeline=None):
def unregister(worker, pipeline=None):
"""Remove worker key from Redis."""
if pipeline is None:
connection = worker.connection._pipeline()
connection = worker.connection.pipeline()
else:
connection = pipeline

@ -1,5 +1,5 @@
[bdist_rpm]
requires = redis >= 2.7.0
requires = redis >= 3.0.0
click >= 3.0
[wheel]

@ -32,7 +32,7 @@ setup(
zip_safe=False,
platforms='any',
install_requires=[
'redis >= 2.7.0',
'redis >= 3.0.0',
'click >= 5.0'
],
python_requires='>=2.7',

@ -4,7 +4,7 @@ from __future__ import (absolute_import, division, print_function,
import logging
from redis import StrictRedis
from redis import Redis
from rq import pop_connection, push_connection
from rq.compat import is_python_version
@ -19,7 +19,7 @@ def find_empty_redis_database():
will use/connect it when no keys are in there.
"""
for dbnum in range(4, 17):
testconn = StrictRedis(db=dbnum)
testconn = Redis(db=dbnum)
empty = testconn.dbsize() == 0
if empty:
return testconn

@ -3,7 +3,7 @@ from __future__ import (absolute_import, division, print_function,
unicode_literals)
import mock
from redis import StrictRedis
from redis import Redis
from rq.decorators import job
from rq.job import Job
from rq.worker import DEFAULT_RESULT_TTL
@ -152,7 +152,7 @@ class TestDecorator(RQTestCase):
def test_decorator_connection_laziness(self, resolve_connection):
"""Ensure that job decorator resolve connection in `lazy` way """
resolve_connection.return_value = StrictRedis()
resolve_connection.return_value = Redis()
@job(queue='queue_name')
def foo():

@ -62,17 +62,17 @@ class TestRegistry(RQTestCase):
def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, timestamp + 10, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 20, 'bar')
self.testconn.zadd(self.registry.key, {'foo': timestamp + 10})
self.testconn.zadd(self.registry.key, {'bar': timestamp + 20})
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])
def test_get_expired_job_ids(self):
"""Getting expired job ids form StartedJobRegistry."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.testconn.zadd(self.registry.key, {'foo': 1})
self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.assertEqual(self.registry.get_expired_job_ids(), ['foo'])
self.assertEqual(self.registry.get_expired_job_ids(timestamp + 20),
@ -86,7 +86,7 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
job = queue.enqueue(say_hello)
self.testconn.zadd(self.registry.key, 2, job.id)
self.testconn.zadd(self.registry.key, {job.id: 2})
self.registry.cleanup(1)
self.assertNotIn(job.id, failed_queue.job_ids)
@ -142,8 +142,8 @@ class TestRegistry(RQTestCase):
def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count."""
timestamp = current_timestamp() + 10
self.testconn.zadd(self.registry.key, timestamp, 'foo')
self.testconn.zadd(self.registry.key, timestamp, 'bar')
self.testconn.zadd(self.registry.key, {'foo': timestamp})
self.testconn.zadd(self.registry.key, {'bar': timestamp})
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)
@ -153,10 +153,10 @@ class TestRegistry(RQTestCase):
queue = Queue(connection=self.testconn)
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, 1, 'foo')
self.testconn.zadd(finished_job_registry.key, {'foo': 1})
started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')
self.testconn.zadd(started_job_registry.key, {'foo': 1})
clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
@ -175,9 +175,9 @@ class TestFinishedJobRegistry(RQTestCase):
def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')
self.testconn.zadd(self.registry.key, timestamp + 30, 'baz')
self.testconn.zadd(self.registry.key, {'foo': 1})
self.testconn.zadd(self.registry.key, {'bar': timestamp + 10})
self.testconn.zadd(self.registry.key, {'baz': timestamp + 30})
self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar', 'baz'])

@ -415,7 +415,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertNotEqual(self.testconn._ttl(job.key), 0)
self.assertNotEqual(self.testconn.ttl(job.key), 0)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with -1 result_ttl don't expire
@ -423,7 +423,7 @@ class TestWorker(RQTestCase):
w = Worker([q])
self.assertIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
w.work(burst=True)
self.assertEqual(self.testconn._ttl(job.key), -1)
self.assertEqual(self.testconn.ttl(job.key), -1)
self.assertNotIn(job.get_id().encode('utf-8'), self.testconn.lrange(q.key, 0, -1))
# Job with result_ttl = 0 gets deleted immediately
@ -676,12 +676,12 @@ class TestWorker(RQTestCase):
"""worker.clean_registries sets last_cleaned_at and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.testconn.zadd(foo_registry.key, {'foo': 1})
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)
bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.testconn.zadd(bar_registry.key, {'bar': 1})
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)
worker = Worker([foo_queue, bar_queue])
@ -706,7 +706,7 @@ class TestWorker(RQTestCase):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')
self.testconn.zadd(registry.key, {'foo': 1})
worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)

Loading…
Cancel
Save