From 361db4aa83757c32e2b94a6daf5460d6a9767bd9 Mon Sep 17 00:00:00 2001 From: Paul Spooren Date: Sat, 11 Jul 2020 16:22:10 -1000 Subject: [PATCH] Add redis_server_version to worker/job/queue (#1286) The variable contains the server version and allows to determine available features. This is relevant for API changes like HSET mappings in version 4.0.0 or LPOS in version 6.0.6. To keep the number of connection.info() calls low, the information is *cached* once determined, as a server version unlikely changes while keeping the connection up. Signed-off-by: Paul Spooren --- rq/compat/__init__.py | 11 ----------- rq/job.py | 21 ++++++++++++++++++--- rq/queue.py | 27 +++++++++++++++++++++++---- rq/worker.py | 26 +++++++++++++++++++++++--- 4 files changed, 64 insertions(+), 21 deletions(-) diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 84e7aac..0871dec 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -104,14 +104,3 @@ except ImportError: return timedelta(0) utc = UTC() - - -def hmset(pipe_or_connection, name, mapping): - # redis-py versions 3.5.0 and above accept a mapping parameter for hset - # This requires Redis server >= 4.0 so this is temporarily commented out - # and will be re-enabled at a later date - # try: - # return pipe_or_connection.hset(name, mapping=mapping) - # earlier versions require hmset to be used - # except TypeError: - return pipe_or_connection.hmset(name, mapping) diff --git a/rq/job.py b/rq/job.py index ce82784..5ba66a4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -7,11 +7,11 @@ import pickle import warnings import zlib +from distutils.version import StrictVersion from functools import partial from uuid import uuid4 -from rq.compat import (as_text, decode_redis_hash, hmset, string_types, - text_type) +from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -345,6 +345,7 @@ class Job(object): self._dependency_ids = [] self.meta = {} self.serializer = resolve_serializer(serializer) + self.redis_server_version = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -565,7 +566,21 @@ class Job(object): key = self.key connection = pipeline if pipeline is not None else self.connection - hmset(connection, key, self.to_dict(include_meta=include_meta)) + mapping = self.to_dict(include_meta=include_meta) + + if self.get_redis_server_version() >= StrictVersion("4.0.0"): + connection.hset(key, mapping=mapping) + else: + connection.hmset(key, mapping) + + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" diff --git a/rq/queue.py b/rq/queue.py index 5f0b38e..25fa95c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -6,6 +6,7 @@ import uuid import warnings from datetime import datetime +from distutils.version import StrictVersion from redis import WatchError from .compat import as_text, string_types, total_ordering, utc @@ -74,6 +75,7 @@ class Queue(object): self.job_class = job_class self.serializer = resolve_serializer(serializer) + self.redis_server_version = None def __len__(self): return self.count @@ -87,6 +89,15 @@ class Queue(object): def __iter__(self): yield self + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version + @property def key(self): """Returns the Redis key for this Queue.""" @@ -156,12 +167,20 @@ class Queue(object): def get_job_position(self, job_or_id): """Returns the position of a job within the queue - WARNING: The current implementation has a complexity of worse than O(N) - and should not be used for very long job queues. Future implementation - may use Redis LPOS command to improve the complexity to O(N) and - running natively in Redis C implementation. + Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of + worse than O(N) and should not be used for very long job queues. Redis + and redis-py version afterwards should support the LPOS command + handling job positions within Redis c implementation. """ job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id + + if self.get_redis_server_version() >= StrictVersion("6.0.6"): + try: + return self.connection.lpos(self.key, job_id) + except AttributeError: + # not yet implemented by redis-py + pass + if job_id in self.job_ids: return self.job_ids.index(job_id) return None diff --git a/rq/worker.py b/rq/worker.py index 1f91c1a..7cfa130 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,6 +13,7 @@ import time import traceback import warnings from datetime import timedelta +from distutils.version import StrictVersion from uuid import uuid4 try: @@ -23,7 +24,7 @@ except ImportError: from redis import WatchError from . import worker_registration -from .compat import PY2, as_text, hmset, string_types, text_type +from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import (DEFAULT_RESULT_TTL, @@ -167,6 +168,8 @@ class Worker(object): connection = get_current_connection() self.connection = connection + self.redis_server_version = None + if prepare_for_work: self.hostname = socket.gethostname() self.pid = os.getpid() @@ -216,6 +219,15 @@ class Worker(object): elif exception_handlers is not None: self.push_exc_handler(exception_handlers) + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version + def validate_queues(self): """Sanity check for the given queues.""" for queue in self.queues: @@ -268,7 +280,8 @@ class Worker(object): now = utcnow() now_in_string = utcformat(now) self.birth_date = now - hmset(p, key, mapping={ + + mapping={ 'birth': now_in_string, 'last_heartbeat': now_in_string, 'queues': queues, @@ -276,7 +289,13 @@ class Worker(object): 'hostname': self.hostname, 'version': self.version, 'python_version': self.python_version, - }) + } + + if self.get_redis_server_version() >= StrictVersion("4.0.0"): + p.hset(key, mapping=mapping) + else: + p.hmset(key, mapping) + worker_registration.register(self, p) p.expire(key, self.default_worker_ttl + 60) p.execute() @@ -581,6 +600,7 @@ class Worker(object): if result is not None: job, queue = result + job.redis_server_version = self.get_redis_server_version() if self.log_job_description: self.log.info( '%s: %s (%s)', green(queue.name),