diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py index 84e7aac..0871dec 100644 --- a/rq/compat/__init__.py +++ b/rq/compat/__init__.py @@ -104,14 +104,3 @@ except ImportError: return timedelta(0) utc = UTC() - - -def hmset(pipe_or_connection, name, mapping): - # redis-py versions 3.5.0 and above accept a mapping parameter for hset - # This requires Redis server >= 4.0 so this is temporarily commented out - # and will be re-enabled at a later date - # try: - # return pipe_or_connection.hset(name, mapping=mapping) - # earlier versions require hmset to be used - # except TypeError: - return pipe_or_connection.hmset(name, mapping) diff --git a/rq/job.py b/rq/job.py index ce82784..5ba66a4 100644 --- a/rq/job.py +++ b/rq/job.py @@ -7,11 +7,11 @@ import pickle import warnings import zlib +from distutils.version import StrictVersion from functools import partial from uuid import uuid4 -from rq.compat import (as_text, decode_redis_hash, hmset, string_types, - text_type) +from rq.compat import as_text, decode_redis_hash, string_types, text_type from .connections import resolve_connection from .exceptions import NoSuchJobError from .local import LocalStack @@ -345,6 +345,7 @@ class Job(object): self._dependency_ids = [] self.meta = {} self.serializer = resolve_serializer(serializer) + self.redis_server_version = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -565,7 +566,21 @@ class Job(object): key = self.key connection = pipeline if pipeline is not None else self.connection - hmset(connection, key, self.to_dict(include_meta=include_meta)) + mapping = self.to_dict(include_meta=include_meta) + + if self.get_redis_server_version() >= StrictVersion("4.0.0"): + connection.hset(key, mapping=mapping) + else: + connection.hmset(key, mapping) + + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version def save_meta(self): """Stores job meta from the job instance to the corresponding Redis key.""" diff --git a/rq/queue.py b/rq/queue.py index 5f0b38e..25fa95c 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -6,6 +6,7 @@ import uuid import warnings from datetime import datetime +from distutils.version import StrictVersion from redis import WatchError from .compat import as_text, string_types, total_ordering, utc @@ -74,6 +75,7 @@ class Queue(object): self.job_class = job_class self.serializer = resolve_serializer(serializer) + self.redis_server_version = None def __len__(self): return self.count @@ -87,6 +89,15 @@ class Queue(object): def __iter__(self): yield self + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version + @property def key(self): """Returns the Redis key for this Queue.""" @@ -156,12 +167,20 @@ class Queue(object): def get_job_position(self, job_or_id): """Returns the position of a job within the queue - WARNING: The current implementation has a complexity of worse than O(N) - and should not be used for very long job queues. Future implementation - may use Redis LPOS command to improve the complexity to O(N) and - running natively in Redis C implementation. + Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of + worse than O(N) and should not be used for very long job queues. Redis + and redis-py version afterwards should support the LPOS command + handling job positions within Redis c implementation. """ job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id + + if self.get_redis_server_version() >= StrictVersion("6.0.6"): + try: + return self.connection.lpos(self.key, job_id) + except AttributeError: + # not yet implemented by redis-py + pass + if job_id in self.job_ids: return self.job_ids.index(job_id) return None diff --git a/rq/worker.py b/rq/worker.py index 1f91c1a..7cfa130 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -13,6 +13,7 @@ import time import traceback import warnings from datetime import timedelta +from distutils.version import StrictVersion from uuid import uuid4 try: @@ -23,7 +24,7 @@ except ImportError: from redis import WatchError from . import worker_registration -from .compat import PY2, as_text, hmset, string_types, text_type +from .compat import PY2, as_text, string_types, text_type from .connections import get_current_connection, push_connection, pop_connection from .defaults import (DEFAULT_RESULT_TTL, @@ -167,6 +168,8 @@ class Worker(object): connection = get_current_connection() self.connection = connection + self.redis_server_version = None + if prepare_for_work: self.hostname = socket.gethostname() self.pid = os.getpid() @@ -216,6 +219,15 @@ class Worker(object): elif exception_handlers is not None: self.push_exc_handler(exception_handlers) + def get_redis_server_version(self): + """Return Redis server version of connection""" + if not self.redis_server_version: + self.redis_server_version = StrictVersion( + self.connection.info("server")["redis_version"] + ) + + return self.redis_server_version + def validate_queues(self): """Sanity check for the given queues.""" for queue in self.queues: @@ -268,7 +280,8 @@ class Worker(object): now = utcnow() now_in_string = utcformat(now) self.birth_date = now - hmset(p, key, mapping={ + + mapping={ 'birth': now_in_string, 'last_heartbeat': now_in_string, 'queues': queues, @@ -276,7 +289,13 @@ class Worker(object): 'hostname': self.hostname, 'version': self.version, 'python_version': self.python_version, - }) + } + + if self.get_redis_server_version() >= StrictVersion("4.0.0"): + p.hset(key, mapping=mapping) + else: + p.hmset(key, mapping) + worker_registration.register(self, p) p.expire(key, self.default_worker_ttl + 60) p.execute() @@ -581,6 +600,7 @@ class Worker(object): if result is not None: job, queue = result + job.redis_server_version = self.get_redis_server_version() if self.log_job_description: self.log.info( '%s: %s (%s)', green(queue.name),