Add redis_server_version to worker/job/queue (#1286)

The variable contains the server version and allows to determine
available features. This is relevant for API changes like HSET mappings
in version 4.0.0 or LPOS in version 6.0.6.

To keep the number of connection.info() calls low, the information is
*cached* once determined, as a server version unlikely changes while
keeping the connection up.

Signed-off-by: Paul Spooren <mail@aparcar.org>
main
Paul Spooren 5 years ago committed by GitHub
parent 3902962aac
commit 361db4aa83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -104,14 +104,3 @@ except ImportError:
return timedelta(0) return timedelta(0)
utc = UTC() utc = UTC()
def hmset(pipe_or_connection, name, mapping):
# redis-py versions 3.5.0 and above accept a mapping parameter for hset
# This requires Redis server >= 4.0 so this is temporarily commented out
# and will be re-enabled at a later date
# try:
# return pipe_or_connection.hset(name, mapping=mapping)
# earlier versions require hmset to be used
# except TypeError:
return pipe_or_connection.hmset(name, mapping)

@ -7,11 +7,11 @@ import pickle
import warnings import warnings
import zlib import zlib
from distutils.version import StrictVersion
from functools import partial from functools import partial
from uuid import uuid4 from uuid import uuid4
from rq.compat import (as_text, decode_redis_hash, hmset, string_types, from rq.compat import as_text, decode_redis_hash, string_types, text_type
text_type)
from .connections import resolve_connection from .connections import resolve_connection
from .exceptions import NoSuchJobError from .exceptions import NoSuchJobError
from .local import LocalStack from .local import LocalStack
@ -345,6 +345,7 @@ class Job(object):
self._dependency_ids = [] self._dependency_ids = []
self.meta = {} self.meta = {}
self.serializer = resolve_serializer(serializer) self.serializer = resolve_serializer(serializer)
self.redis_server_version = None
def __repr__(self): # noqa # pragma: no cover def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -565,7 +566,21 @@ class Job(object):
key = self.key key = self.key
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
hmset(connection, key, self.to_dict(include_meta=include_meta)) mapping = self.to_dict(include_meta=include_meta)
if self.get_redis_server_version() >= StrictVersion("4.0.0"):
connection.hset(key, mapping=mapping)
else:
connection.hmset(key, mapping)
def get_redis_server_version(self):
"""Return Redis server version of connection"""
if not self.redis_server_version:
self.redis_server_version = StrictVersion(
self.connection.info("server")["redis_version"]
)
return self.redis_server_version
def save_meta(self): def save_meta(self):
"""Stores job meta from the job instance to the corresponding Redis key.""" """Stores job meta from the job instance to the corresponding Redis key."""

@ -6,6 +6,7 @@ import uuid
import warnings import warnings
from datetime import datetime from datetime import datetime
from distutils.version import StrictVersion
from redis import WatchError from redis import WatchError
from .compat import as_text, string_types, total_ordering, utc from .compat import as_text, string_types, total_ordering, utc
@ -74,6 +75,7 @@ class Queue(object):
self.job_class = job_class self.job_class = job_class
self.serializer = resolve_serializer(serializer) self.serializer = resolve_serializer(serializer)
self.redis_server_version = None
def __len__(self): def __len__(self):
return self.count return self.count
@ -87,6 +89,15 @@ class Queue(object):
def __iter__(self): def __iter__(self):
yield self yield self
def get_redis_server_version(self):
"""Return Redis server version of connection"""
if not self.redis_server_version:
self.redis_server_version = StrictVersion(
self.connection.info("server")["redis_version"]
)
return self.redis_server_version
@property @property
def key(self): def key(self):
"""Returns the Redis key for this Queue.""" """Returns the Redis key for this Queue."""
@ -156,12 +167,20 @@ class Queue(object):
def get_job_position(self, job_or_id): def get_job_position(self, job_or_id):
"""Returns the position of a job within the queue """Returns the position of a job within the queue
WARNING: The current implementation has a complexity of worse than O(N) Using Redis before 6.0.6 and redis-py before 3.5.4 has a complexity of
and should not be used for very long job queues. Future implementation worse than O(N) and should not be used for very long job queues. Redis
may use Redis LPOS command to improve the complexity to O(N) and and redis-py version afterwards should support the LPOS command
running natively in Redis C implementation. handling job positions within Redis c implementation.
""" """
job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id job_id = job_or_id.id if isinstance(job_or_id, self.job_class) else job_or_id
if self.get_redis_server_version() >= StrictVersion("6.0.6"):
try:
return self.connection.lpos(self.key, job_id)
except AttributeError:
# not yet implemented by redis-py
pass
if job_id in self.job_ids: if job_id in self.job_ids:
return self.job_ids.index(job_id) return self.job_ids.index(job_id)
return None return None

@ -13,6 +13,7 @@ import time
import traceback import traceback
import warnings import warnings
from datetime import timedelta from datetime import timedelta
from distutils.version import StrictVersion
from uuid import uuid4 from uuid import uuid4
try: try:
@ -23,7 +24,7 @@ except ImportError:
from redis import WatchError from redis import WatchError
from . import worker_registration from . import worker_registration
from .compat import PY2, as_text, hmset, string_types, text_type from .compat import PY2, as_text, string_types, text_type
from .connections import get_current_connection, push_connection, pop_connection from .connections import get_current_connection, push_connection, pop_connection
from .defaults import (DEFAULT_RESULT_TTL, from .defaults import (DEFAULT_RESULT_TTL,
@ -167,6 +168,8 @@ class Worker(object):
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
self.redis_server_version = None
if prepare_for_work: if prepare_for_work:
self.hostname = socket.gethostname() self.hostname = socket.gethostname()
self.pid = os.getpid() self.pid = os.getpid()
@ -216,6 +219,15 @@ class Worker(object):
elif exception_handlers is not None: elif exception_handlers is not None:
self.push_exc_handler(exception_handlers) self.push_exc_handler(exception_handlers)
def get_redis_server_version(self):
"""Return Redis server version of connection"""
if not self.redis_server_version:
self.redis_server_version = StrictVersion(
self.connection.info("server")["redis_version"]
)
return self.redis_server_version
def validate_queues(self): def validate_queues(self):
"""Sanity check for the given queues.""" """Sanity check for the given queues."""
for queue in self.queues: for queue in self.queues:
@ -268,7 +280,8 @@ class Worker(object):
now = utcnow() now = utcnow()
now_in_string = utcformat(now) now_in_string = utcformat(now)
self.birth_date = now self.birth_date = now
hmset(p, key, mapping={
mapping={
'birth': now_in_string, 'birth': now_in_string,
'last_heartbeat': now_in_string, 'last_heartbeat': now_in_string,
'queues': queues, 'queues': queues,
@ -276,7 +289,13 @@ class Worker(object):
'hostname': self.hostname, 'hostname': self.hostname,
'version': self.version, 'version': self.version,
'python_version': self.python_version, 'python_version': self.python_version,
}) }
if self.get_redis_server_version() >= StrictVersion("4.0.0"):
p.hset(key, mapping=mapping)
else:
p.hmset(key, mapping)
worker_registration.register(self, p) worker_registration.register(self, p)
p.expire(key, self.default_worker_ttl + 60) p.expire(key, self.default_worker_ttl + 60)
p.execute() p.execute()
@ -581,6 +600,7 @@ class Worker(object):
if result is not None: if result is not None:
job, queue = result job, queue = result
job.redis_server_version = self.get_redis_server_version()
if self.log_job_description: if self.log_job_description:
self.log.info( self.log.info(
'%s: %s (%s)', green(queue.name), '%s: %s (%s)', green(queue.name),

Loading…
Cancel
Save