diff --git a/rq/compat/__init__.py b/rq/compat/__init__.py deleted file mode 100644 index c2275e2..0000000 --- a/rq/compat/__init__.py +++ /dev/null @@ -1,61 +0,0 @@ -import sys - - -def is_python_version(*versions): - for version in versions: - if (sys.version_info[0] == version[0] and sys.version_info >= version): - return True - return False - - -try: - from functools import total_ordering -except ImportError: - def total_ordering(cls): # noqa - """Class decorator that fills in missing ordering methods""" - convert = { - '__lt__': [('__gt__', lambda self, other: other < self), - ('__le__', lambda self, other: not other < self), - ('__ge__', lambda self, other: not self < other)], - '__le__': [('__ge__', lambda self, other: other <= self), - ('__lt__', lambda self, other: not other <= self), - ('__gt__', lambda self, other: not self <= other)], - '__gt__': [('__lt__', lambda self, other: other > self), - ('__ge__', lambda self, other: not other > self), - ('__le__', lambda self, other: not self > other)], - '__ge__': [('__le__', lambda self, other: other >= self), - ('__gt__', lambda self, other: not other >= self), - ('__lt__', lambda self, other: not self >= other)] - } - roots = set(dir(cls)) & set(convert) - if not roots: - raise ValueError('must define at least one ordering operation: < > <= >=') # noqa - root = max(roots) # prefer __lt__ to __le__ to __gt__ to __ge__ - for opname, opfunc in convert[root]: - if opname not in roots: - opfunc.__name__ = str(opname) - opfunc.__doc__ = getattr(int, opname).__doc__ - setattr(cls, opname, opfunc) - return cls - - -PY2 = sys.version_info[0] == 2 - -# Python 3.x and up -text_type = str -string_types = (str,) - - -def as_text(v): - if v is None: - return None - elif isinstance(v, bytes): - return v.decode('utf-8') - elif isinstance(v, str): - return v - else: - raise ValueError('Unknown type %r' % type(v)) - - -def decode_redis_hash(h): - return dict((as_text(k), h[k]) for k in h) diff --git a/rq/compat/connections.py b/rq/compat/connections.py deleted file mode 100644 index 49b9685..0000000 --- a/rq/compat/connections.py +++ /dev/null @@ -1,9 +0,0 @@ -def fix_return_type(func): - # deliberately no functools.wraps() call here, since the function being - # wrapped is a partial, which has no module - def _inner(*args, **kwargs): - value = func(*args, **kwargs) - if value is None: - value = -1 - return value - return _inner diff --git a/rq/compat/dictconfig.py b/rq/compat/dictconfig.py deleted file mode 100644 index c9876da..0000000 --- a/rq/compat/dictconfig.py +++ /dev/null @@ -1,554 +0,0 @@ -# flake8: noqa -# This is a copy of the Python logging.config.dictconfig module. It is -# provided here for backwards compatibility for Python versions prior to 2.7. -# -# Copyright 2009-2010 by Vinay Sajip. All Rights Reserved. -# -# Permission to use, copy, modify, and distribute this software and its -# documentation for any purpose and without fee is hereby granted, -# provided that the above copyright notice appear in all copies and that -# both that copyright notice and this permission notice appear in -# supporting documentation, and that the name of Vinay Sajip -# not be used in advertising or publicity pertaining to distribution -# of the software without specific, written prior permission. -# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING -# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL -# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR -# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER -# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT -# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - -import logging.handlers -import re -import sys -import types -from rq.compat import string_types - -IDENTIFIER = re.compile('^[a-z_][a-z0-9_]*$', re.I) - -def valid_ident(s): - m = IDENTIFIER.match(s) - if not m: - raise ValueError('Not a valid Python identifier: %r' % s) - return True - -# -# This function is defined in logging only in recent versions of Python -# -try: - from logging import _checkLevel -except ImportError: - def _checkLevel(level): - if isinstance(level, int): - rv = level - elif str(level) == level: - if level not in logging._levelNames: - raise ValueError('Unknown level: %r' % level) - rv = logging._levelNames[level] - else: - raise TypeError('Level not an integer or a ' - 'valid string: %r' % level) - return rv - -# The ConvertingXXX classes are wrappers around standard Python containers, -# and they serve to convert any suitable values in the container. The -# conversion converts base dicts, lists and tuples to their wrapped -# equivalents, whereas strings which match a conversion format are converted -# appropriately. -# -# Each wrapper should have a configurator attribute holding the actual -# configurator to use for conversion. - -class ConvertingDict(dict): - """A converting dictionary wrapper.""" - - def __getitem__(self, key): - value = dict.__getitem__(self, key) - result = self.configurator.convert(value) - #If the converted value is different, save for next time - if value is not result: - self[key] = result - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - result.key = key - return result - - def get(self, key, default=None): - value = dict.get(self, key, default) - result = self.configurator.convert(value) - #If the converted value is different, save for next time - if value is not result: - self[key] = result - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - result.key = key - return result - - def pop(self, key, default=None): - value = dict.pop(self, key, default) - result = self.configurator.convert(value) - if value is not result: - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - result.key = key - return result - -class ConvertingList(list): - """A converting list wrapper.""" - def __getitem__(self, key): - value = list.__getitem__(self, key) - result = self.configurator.convert(value) - #If the converted value is different, save for next time - if value is not result: - self[key] = result - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - result.key = key - return result - - def pop(self, idx=-1): - value = list.pop(self, idx) - result = self.configurator.convert(value) - if value is not result: - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - return result - -class ConvertingTuple(tuple): - """A converting tuple wrapper.""" - def __getitem__(self, key): - value = tuple.__getitem__(self, key) - result = self.configurator.convert(value) - if value is not result: - if type(result) in (ConvertingDict, ConvertingList, - ConvertingTuple): - result.parent = self - result.key = key - return result - -class BaseConfigurator: - """ - The configurator base class which defines some useful defaults. - """ - - CONVERT_PATTERN = re.compile(r'^(?P[a-z]+)://(?P.*)$') - - WORD_PATTERN = re.compile(r'^\s*(\w+)\s*') - DOT_PATTERN = re.compile(r'^\.\s*(\w+)\s*') - INDEX_PATTERN = re.compile(r'^\[\s*(\w+)\s*\]\s*') - DIGIT_PATTERN = re.compile(r'^\d+$') - - value_converters = { - 'ext' : 'ext_convert', - 'cfg' : 'cfg_convert', - } - - # We might want to use a different one, e.g. importlib - importer = __import__ - - def __init__(self, config): - self.config = ConvertingDict(config) - self.config.configurator = self - - def resolve(self, s): - """ - Resolve strings to objects using standard import and attribute - syntax. - """ - name = s.split('.') - used = name.pop(0) - try: - found = self.importer(used) - for frag in name: - used += '.' + frag - try: - found = getattr(found, frag) - except AttributeError: - self.importer(used) - found = getattr(found, frag) - return found - except ImportError: - e, tb = sys.exc_info()[1:] - v = ValueError('Cannot resolve %r: %s' % (s, e)) - v.__cause__, v.__traceback__ = e, tb - raise v - - def ext_convert(self, value): - """Default converter for the ext:// protocol.""" - return self.resolve(value) - - def cfg_convert(self, value): - """Default converter for the cfg:// protocol.""" - rest = value - m = self.WORD_PATTERN.match(rest) - if m is None: - raise ValueError("Unable to convert %r" % value) - else: - rest = rest[m.end():] - d = self.config[m.groups()[0]] - #print d, rest - while rest: - m = self.DOT_PATTERN.match(rest) - if m: - d = d[m.groups()[0]] - else: - m = self.INDEX_PATTERN.match(rest) - if m: - idx = m.groups()[0] - if not self.DIGIT_PATTERN.match(idx): - d = d[idx] - else: - try: - n = int(idx) # try as number first (most likely) - d = d[n] - except TypeError: - d = d[idx] - if m: - rest = rest[m.end():] - else: - raise ValueError('Unable to convert ' - '%r at %r' % (value, rest)) - #rest should be empty - return d - - def convert(self, value): - """ - Convert values to an appropriate type. dicts, lists and tuples are - replaced by their converting alternatives. Strings are checked to - see if they have a conversion format and are converted if they do. - """ - if not isinstance(value, ConvertingDict) and isinstance(value, dict): - value = ConvertingDict(value) - value.configurator = self - elif not isinstance(value, ConvertingList) and isinstance(value, list): - value = ConvertingList(value) - value.configurator = self - elif not isinstance(value, ConvertingTuple) and\ - isinstance(value, tuple): - value = ConvertingTuple(value) - value.configurator = self - elif isinstance(value, string_types): # str for py3k - m = self.CONVERT_PATTERN.match(value) - if m: - d = m.groupdict() - prefix = d['prefix'] - converter = self.value_converters.get(prefix, None) - if converter: - suffix = d['suffix'] - converter = getattr(self, converter) - value = converter(suffix) - return value - - def configure_custom(self, config): - """Configure an object with a user-supplied factory.""" - c = config.pop('()') - if not hasattr(c, '__call__') and type(c) != type: - c = self.resolve(c) - props = config.pop('.', None) - # Check for valid identifiers - kwargs = dict([(k, config[k]) for k in config if valid_ident(k)]) - result = c(**kwargs) - if props: - for name, value in props.items(): - setattr(result, name, value) - return result - - def as_tuple(self, value): - """Utility function which converts lists to tuples.""" - if isinstance(value, list): - value = tuple(value) - return value - -class DictConfigurator(BaseConfigurator): - """ - Configure logging using a dictionary-like object to describe the - configuration. - """ - - def configure(self): - """Do the configuration.""" - - config = self.config - if 'version' not in config: - raise ValueError("dictionary doesn't specify a version") - if config['version'] != 1: - raise ValueError("Unsupported version: %s" % config['version']) - incremental = config.pop('incremental', False) - EMPTY_DICT = {} - logging._acquireLock() - try: - if incremental: - handlers = config.get('handlers', EMPTY_DICT) - # incremental handler config only if handler name - # ties in to logging._handlers (Python 2.7) - if sys.version_info[:2] == (2, 7): - for name in handlers: - if name not in logging._handlers: - raise ValueError('No handler found with ' - 'name %r' % name) - else: - try: - handler = logging._handlers[name] - handler_config = handlers[name] - level = handler_config.get('level', None) - if level: - handler.setLevel(_checkLevel(level)) - except Exception as e: - raise ValueError('Unable to configure handler ' - '%r: %s' % (name, e)) - loggers = config.get('loggers', EMPTY_DICT) - for name in loggers: - try: - self.configure_logger(name, loggers[name], True) - except Exception as e: - raise ValueError('Unable to configure logger ' - '%r: %s' % (name, e)) - root = config.get('root', None) - if root: - try: - self.configure_root(root, True) - except Exception as e: - raise ValueError('Unable to configure root ' - 'logger: %s' % e) - else: - disable_existing = config.pop('disable_existing_loggers', True) - - logging._handlers.clear() - del logging._handlerList[:] - - # Do formatters first - they don't refer to anything else - formatters = config.get('formatters', EMPTY_DICT) - for name in formatters: - try: - formatters[name] = self.configure_formatter( - formatters[name]) - except Exception as e: - raise ValueError('Unable to configure ' - 'formatter %r: %s' % (name, e)) - # Next, do filters - they don't refer to anything else, either - filters = config.get('filters', EMPTY_DICT) - for name in filters: - try: - filters[name] = self.configure_filter(filters[name]) - except Exception as e: - raise ValueError('Unable to configure ' - 'filter %r: %s' % (name, e)) - - # Next, do handlers - they refer to formatters and filters - # As handlers can refer to other handlers, sort the keys - # to allow a deterministic order of configuration - handlers = config.get('handlers', EMPTY_DICT) - for name in sorted(handlers): - try: - handler = self.configure_handler(handlers[name]) - handler.name = name - handlers[name] = handler - except Exception as e: - raise ValueError('Unable to configure handler ' - '%r: %s' % (name, e)) - # Next, do loggers - they refer to handlers and filters - - #we don't want to lose the existing loggers, - #since other threads may have pointers to them. - #existing is set to contain all existing loggers, - #and as we go through the new configuration we - #remove any which are configured. At the end, - #what's left in existing is the set of loggers - #which were in the previous configuration but - #which are not in the new configuration. - root = logging.root - existing = root.manager.loggerDict.keys() - #The list needs to be sorted so that we can - #avoid disabling child loggers of explicitly - #named loggers. With a sorted list it is easier - #to find the child loggers. - existing.sort() - #We'll keep the list of existing loggers - #which are children of named loggers here... - child_loggers = [] - #now set up the new ones... - loggers = config.get('loggers', EMPTY_DICT) - for name in loggers: - if name in existing: - i = existing.index(name) - prefixed = name + "." - pflen = len(prefixed) - num_existing = len(existing) - i = i + 1 # look at the entry after name - while (i < num_existing) and\ - (existing[i][:pflen] == prefixed): - child_loggers.append(existing[i]) - i = i + 1 - existing.remove(name) - try: - self.configure_logger(name, loggers[name]) - except Exception as e: - raise ValueError('Unable to configure logger ' - '%r: %s' % (name, e)) - - #Disable any old loggers. There's no point deleting - #them as other threads may continue to hold references - #and by disabling them, you stop them doing any logging. - #However, don't disable children of named loggers, as that's - #probably not what was intended by the user. - for log in existing: - logger = root.manager.loggerDict[log] - if log in child_loggers: - logger.level = logging.NOTSET - logger.handlers = [] - logger.propagate = True - elif disable_existing: - logger.disabled = True - - # And finally, do the root logger - root = config.get('root', None) - if root: - try: - self.configure_root(root) - except Exception as e: - raise ValueError('Unable to configure root ' - 'logger: %s' % e) - finally: - logging._releaseLock() - - def configure_formatter(self, config): - """Configure a formatter from a dictionary.""" - if '()' in config: - factory = config['()'] # for use in exception handler - try: - result = self.configure_custom(config) - except TypeError as te: - if "'format'" not in str(te): - raise - #Name of parameter changed from fmt to format. - #Retry with old name. - #This is so that code can be used with older Python versions - #(e.g. by Django) - config['fmt'] = config.pop('format') - config['()'] = factory - result = self.configure_custom(config) - else: - fmt = config.get('format', None) - dfmt = config.get('datefmt', None) - result = logging.Formatter(fmt, dfmt) - return result - - def configure_filter(self, config): - """Configure a filter from a dictionary.""" - if '()' in config: - result = self.configure_custom(config) - else: - name = config.get('name', '') - result = logging.Filter(name) - return result - - def add_filters(self, filterer, filters): - """Add filters to a filterer from a list of names.""" - for f in filters: - try: - filterer.addFilter(self.config['filters'][f]) - except Exception as e: - raise ValueError('Unable to add filter %r: %s' % (f, e)) - - def configure_handler(self, config): - """Configure a handler from a dictionary.""" - formatter = config.pop('formatter', None) - if formatter: - try: - formatter = self.config['formatters'][formatter] - except Exception as e: - raise ValueError('Unable to set formatter ' - '%r: %s' % (formatter, e)) - level = config.pop('level', None) - filters = config.pop('filters', None) - if '()' in config: - c = config.pop('()') - if not hasattr(c, '__call__') and type(c) != type: - c = self.resolve(c) - factory = c - else: - klass = self.resolve(config.pop('class')) - #Special case for handler which refers to another handler - if issubclass(klass, logging.handlers.MemoryHandler) and\ - 'target' in config: - try: - config['target'] = self.config['handlers'][config['target']] - except Exception as e: - raise ValueError('Unable to set target handler ' - '%r: %s' % (config['target'], e)) - elif issubclass(klass, logging.handlers.SMTPHandler) and\ - 'mailhost' in config: - config['mailhost'] = self.as_tuple(config['mailhost']) - elif issubclass(klass, logging.handlers.SysLogHandler) and\ - 'address' in config: - config['address'] = self.as_tuple(config['address']) - factory = klass - kwargs = dict([(str(k), config[k]) for k in config if valid_ident(k)]) - try: - result = factory(**kwargs) - except TypeError as te: - if "'stream'" not in str(te): - raise - #The argument name changed from strm to stream - #Retry with old name. - #This is so that code can be used with older Python versions - #(e.g. by Django) - kwargs['strm'] = kwargs.pop('stream') - result = factory(**kwargs) - if formatter: - result.setFormatter(formatter) - if level is not None: - result.setLevel(_checkLevel(level)) - if filters: - self.add_filters(result, filters) - return result - - def add_handlers(self, logger, handlers): - """Add handlers to a logger from a list of names.""" - for h in handlers: - try: - logger.addHandler(self.config['handlers'][h]) - except Exception as e: - raise ValueError('Unable to add handler %r: %s' % (h, e)) - - def common_logger_config(self, logger, config, incremental=False): - """ - Perform configuration which is common to root and non-root loggers. - """ - level = config.get('level', None) - if level is not None: - logger.setLevel(_checkLevel(level)) - if not incremental: - #Remove any existing handlers - for h in logger.handlers[:]: - logger.removeHandler(h) - handlers = config.get('handlers', None) - if handlers: - self.add_handlers(logger, handlers) - filters = config.get('filters', None) - if filters: - self.add_filters(logger, filters) - - def configure_logger(self, name, config, incremental=False): - """Configure a non-root logger from a dictionary.""" - logger = logging.getLogger(name) - self.common_logger_config(logger, config, incremental) - propagate = config.get('propagate', None) - if propagate is not None: - logger.propagate = propagate - - def configure_root(self, config, incremental=False): - """Configure a root logger from a dictionary.""" - root = logging.getLogger() - self.common_logger_config(root, config, incremental) - -dictConfigClass = DictConfigurator - -def dictConfig(config): - """Configure logging using a dictionary.""" - dictConfigClass(config).configure() diff --git a/rq/decorators.py b/rq/decorators.py index 5398a7c..3c8dc83 100644 --- a/rq/decorators.py +++ b/rq/decorators.py @@ -5,8 +5,6 @@ if t.TYPE_CHECKING: from redis import Redis from .job import Retry -from rq.compat import string_types - from .defaults import DEFAULT_RESULT_TTL from .queue import Queue from .utils import backend_class @@ -53,7 +51,7 @@ class job: # noqa def __call__(self, f): @wraps(f) def delay(*args, **kwargs): - if isinstance(self.queue, string_types): + if isinstance(self.queue, str): queue = self.queue_class(name=self.queue, connection=self.connection) else: diff --git a/rq/defaults.py b/rq/defaults.py index 93f603e..2e99a28 100644 --- a/rq/defaults.py +++ b/rq/defaults.py @@ -8,5 +8,6 @@ DEFAULT_JOB_MONITORING_INTERVAL = 30 DEFAULT_RESULT_TTL = 500 DEFAULT_FAILURE_TTL = 31536000 # 1 year in seconds DEFAULT_LOGGING_DATE_FORMAT = '%H:%M:%S' +DEFAULT_SCHEDULER_FALLBACK_PERIOD = 120 DEFAULT_LOGGING_FORMAT = '%(asctime)s %(message)s' CALLBACK_TIMEOUT = 60 diff --git a/rq/dummy.py b/rq/dummy.py deleted file mode 100644 index 12f360b..0000000 --- a/rq/dummy.py +++ /dev/null @@ -1,38 +0,0 @@ -""" -Some dummy tasks that are well-suited for generating load for testing purposes. -""" - -import random -import time - - -def do_nothing(): - pass - - -def sleep(secs: int): - time.sleep(secs) - - -def endless_loop(): - while True: - time.sleep(1) - - -def div_by_zero(): - 1 / 0 - - -def fib(n: int): - if n <= 1: - return 1 - else: - return fib(n - 2) + fib(n - 1) - - -def random_failure(): - if random.choice([True, False]): - class RandomError(Exception): - pass - raise RandomError('Ouch!') - return 'OK' diff --git a/rq/job.py b/rq/job.py index 84e9345..c41c17d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -20,13 +20,13 @@ if t.TYPE_CHECKING: from redis import Redis from redis.client import Pipeline -from rq.compat import as_text, decode_redis_hash, string_types from .connections import resolve_connection from .exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from .local import LocalStack from .serializers import resolve_serializer from .utils import (get_version, import_attribute, parse_timeout, str_to_date, - utcformat, utcnow, ensure_list, get_call_string) + utcformat, utcnow, ensure_list, get_call_string, as_text, + decode_redis_hash) # Serialize pickle dumps using the highest pickle protocol (binary, default # uses ascii) @@ -127,7 +127,7 @@ class Job: job._func_name = func.__name__ elif inspect.isfunction(func) or inspect.isbuiltin(func): job._func_name = '{0}.{1}'.format(func.__module__, func.__qualname__) - elif isinstance(func, string_types): + elif isinstance(func, str): job._func_name = as_text(func) elif not inspect.isclass(func) and hasattr(func, '__call__'): # a callable class instance job._instance = func @@ -469,7 +469,7 @@ class Job: def set_id(self, value: str): """Sets a job ID for the given job.""" - if not isinstance(value, string_types): + if not isinstance(value, str): raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value diff --git a/rq/queue.py b/rq/queue.py index 853cd2a..3cdfcb6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -2,8 +2,10 @@ import uuid import sys import warnings import typing as t +import logging from collections import namedtuple from datetime import datetime, timezone +from functools import total_ordering from redis import WatchError @@ -11,13 +13,20 @@ if t.TYPE_CHECKING: from redis import Redis from redis.client import Pipeline -from .compat import as_text, string_types, total_ordering +from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_RESULT_TTL from .exceptions import DequeueTimeout, NoSuchJobError from .job import Job, JobStatus from .serializers import resolve_serializer -from .utils import backend_class, get_version, import_attribute, parse_timeout, utcnow +from .utils import backend_class, get_version, import_attribute, make_colorizer, parse_timeout, utcnow + + +green = make_colorizer('darkgreen') +yellow = make_colorizer('darkyellow') +blue = make_colorizer('darkblue') + +logger = logging.getLogger("rq.queue") def compact(lst): @@ -78,6 +87,7 @@ class Queue: self._key = '{0}{1}'.format(prefix, name) self._default_timeout = parse_timeout(default_timeout) or self.DEFAULT_TIMEOUT self._is_async = is_async + self.log = logger if 'async' in kwargs: self._is_async = kwargs['async'] @@ -85,7 +95,7 @@ class Queue: # override class attribute job_class if one was passed if job_class is not None: - if isinstance(job_class, string_types): + if isinstance(job_class, str): job_class = import_attribute(job_class) self.job_class = job_class @@ -204,8 +214,13 @@ class Queue: end = offset + (length - 1) else: end = length - return [as_text(job_id) for job_id in - self.connection.lrange(self.key, start, end)] + job_ids = [ + as_text(job_id) + for job_id + in self.connection.lrange(self.key, start, end) + ] + self.log.debug(f"Getting jobs for queue {green(self.name)}: {len(job_ids)} found.") + return job_ids def get_jobs(self, offset: int = 0, length: int = -1): """Returns a slice of jobs in the queue.""" @@ -293,9 +308,10 @@ class Queue: 'at_front' allows you to push the job onto the front instead of the back of the queue""" connection = pipeline if pipeline is not None else self.connection if at_front: - connection.lpush(self.key, job_id) + result = connection.lpush(self.key, job_id) else: - connection.rpush(self.key, job_id) + result = connection.rpush(self.key, job_id) + self.log.debug(f"Pushed job {blue(job_id)} into {green(self.name)}, {result} job(s) are in queue.") def create_job(self, func: t.Callable[..., t.Any], args=None, kwargs=None, timeout=None, result_ttl=None, ttl=None, failure_ttl=None, @@ -472,7 +488,7 @@ class Queue: * A string, representing the location of a function (must be meaningful to the import context of the workers) """ - if not isinstance(f, string_types) and f.__module__ == '__main__': + if not isinstance(f, str) and f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed ' 'by workers') @@ -682,7 +698,7 @@ class Queue: return as_text(self.connection.lpop(self.key)) @classmethod - def lpop(cls, queue_keys, timeout, connection: t.Optional['Redis'] = None): + def lpop(cls, queue_keys, timeout: int, connection: t.Optional['Redis'] = None): """Helper method. Intermediate method to abstract away from some Redis API details, where LPOP accepts only a single key, whereas BLPOP accepts multiple. So if we want the non-blocking LPOP, we need to @@ -699,10 +715,13 @@ class Queue: if timeout is not None: # blocking variant if timeout == 0: raise ValueError('RQ does not support indefinite timeouts. Please pick a timeout value > 0') + logger.debug(f"Starting BLPOP operation for queues {green(queue_keys)} with timeout of {timeout}") result = connection.blpop(queue_keys, timeout) if result is None: + logger.debug(f"BLPOP Timeout, no jobs found on queues {green(queue_keys)}") raise DequeueTimeout(timeout, queue_keys) queue_key, job_id = result + logger.debug(f"Dequeued job {blue(job_id)} from queue {green(queue_key)}") return queue_key, job_id else: # non-blocking variant for queue_key in queue_keys: diff --git a/rq/registry.py b/rq/registry.py index 2d484ec..d988981 100644 --- a/rq/registry.py +++ b/rq/registry.py @@ -8,7 +8,7 @@ if t.TYPE_CHECKING: from redis import Redis from redis.client import Pipeline -from .compat import as_text +from .utils import as_text from .connections import resolve_connection from .defaults import DEFAULT_FAILURE_TTL from .exceptions import InvalidJobOperation, NoSuchJobError diff --git a/rq/results.py b/rq/results.py index 931907c..cf85a5b 100644 --- a/rq/results.py +++ b/rq/results.py @@ -1,16 +1,12 @@ -import json from typing import Any, Optional import zlib from base64 import b64decode, b64encode from datetime import datetime, timezone from enum import Enum -from uuid import uuid4 - from redis import Redis -from redis.client import Pipeline -from .compat import decode_redis_hash +from .utils import decode_redis_hash from .job import Job from .serializers import resolve_serializer from .utils import now diff --git a/rq/scheduler.py b/rq/scheduler.py index 577976c..4305cde 100644 --- a/rq/scheduler.py +++ b/rq/scheduler.py @@ -9,7 +9,8 @@ from multiprocessing import Process from redis import SSLConnection, UnixDomainSocketConnection -from .defaults import DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT +from .defaults import (DEFAULT_LOGGING_DATE_FORMAT, DEFAULT_LOGGING_FORMAT, + DEFAULT_SCHEDULER_FALLBACK_PERIOD) from .job import Job from .logutils import setup_loghandlers from .queue import Queue @@ -98,7 +99,7 @@ class RQScheduler: return False if not self.lock_acquisition_time: return True - return (datetime.now() - self.lock_acquisition_time).total_seconds() > 600 + return (datetime.now() - self.lock_acquisition_time).total_seconds() > DEFAULT_SCHEDULER_FALLBACK_PERIOD def acquire_locks(self, auto_start=False): """Returns names of queue it successfully acquires lock on""" diff --git a/rq/serializers.py b/rq/serializers.py index babab69..00fd0a7 100644 --- a/rq/serializers.py +++ b/rq/serializers.py @@ -2,7 +2,6 @@ from functools import partial import pickle import json -from .compat import string_types from .utils import import_attribute @@ -30,7 +29,7 @@ def resolve_serializer(serializer: str): if not serializer: return DefaultSerializer - if isinstance(serializer, string_types): + if isinstance(serializer, str): serializer = import_attribute(serializer) default_serializer_methods = ('dumps', 'loads') diff --git a/rq/utils.py b/rq/utils.py index 5752976..e347ec3 100644 --- a/rq/utils.py +++ b/rq/utils.py @@ -20,7 +20,6 @@ if t.TYPE_CHECKING: from redis.exceptions import ResponseError -from .compat import as_text, string_types from .exceptions import TimeoutFormatError logger = logging.getLogger(__name__) @@ -126,6 +125,21 @@ class ColorizingStreamHandler(logging.StreamHandler): return message +def as_text(v): + if v is None: + return None + elif isinstance(v, bytes): + return v.decode('utf-8') + elif isinstance(v, str): + return v + else: + raise ValueError('Unknown type %r' % type(v)) + + +def decode_redis_hash(h): + return dict((as_text(k), h[k]) for k in h) + + def import_attribute(name: str): """Returns an attribute from a dotted path name. Example: `path.to.func`. @@ -243,7 +257,7 @@ def first(iterable: t.Iterable, default=None, key=None): def is_nonstring_iterable(obj: t.Any) -> bool: """Returns whether the obj is an iterable, but not a string""" - return isinstance(obj, Iterable) and not isinstance(obj, string_types) + return isinstance(obj, Iterable) and not isinstance(obj, str) def ensure_list(obj: t.Any) -> t.List: @@ -263,7 +277,7 @@ def backend_class(holder, default_name, override=None): """Get a backend class using its default attribute name or an override""" if override is None: return getattr(holder, default_name) - elif isinstance(override, string_types): + elif isinstance(override, str): return import_attribute(override) else: return override diff --git a/rq/worker.py b/rq/worker.py index 7841faa..c5ecdf0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -29,7 +29,7 @@ import redis.exceptions from . import worker_registration from .command import parse_payload, PUBSUB_CHANNEL_TEMPLATE, handle_command -from .compat import as_text, string_types, text_type +from .utils import as_text from .connections import get_current_connection, push_connection, pop_connection from .defaults import (CALLBACK_TIMEOUT, DEFAULT_RESULT_TTL, @@ -60,7 +60,7 @@ green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') blue = make_colorizer('darkblue') -logger = logging.getLogger(__name__) +logger = logging.getLogger("rq.worker") class StopRequested(Exception): @@ -181,7 +181,7 @@ class Worker: if connection is None: connection = get_current_connection() self.connection = connection - + self.redis_server_version = None self.job_class = backend_class(self, 'job_class', override=job_class) @@ -193,7 +193,7 @@ class Worker: queues = [self.queue_class(name=q, connection=connection, job_class=self.job_class, serializer=self.serializer) - if isinstance(q, string_types) else q + if isinstance(q, str) else q for q in ensure_list(queues)] self.name: str = name or uuid4().hex @@ -690,10 +690,12 @@ class Worker: if self.should_run_maintenance_tasks: self.run_maintenance_tasks() + self.log.debug(f"Dequeueing jobs on queues {self._ordered_queues} and timeout {timeout}") result = self.queue_class.dequeue_any(self._ordered_queues, timeout, connection=self.connection, job_class=self.job_class, serializer=self.serializer) + self.log.debug(f"Dequeued job {result[1]} from {result[0]}") if result is not None: job, queue = result @@ -946,7 +948,7 @@ class Worker: """Performs misc bookkeeping like updating states prior to job execution. """ - + self.log.debug(f"Preparing for execution of Job ID {job.id}") with self.connection.pipeline() as pipeline: self.set_current_job_id(job.id, pipeline=pipeline) self.set_current_job_working_time(0, pipeline=pipeline) @@ -957,6 +959,7 @@ class Worker: job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() + self.log.debug(f"Job preparation finished.") msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) @@ -1084,12 +1087,14 @@ class Worker: def execute_success_callback(self, job: 'Job', result): """Executes success_callback with timeout""" + self.log.debug(f"Running success callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.success_callback(job, self.connection, result) def execute_failure_callback(self, job): """Executes failure_callback with timeout""" + self.log.debug(f"Running failure callbacks for {job.id}") job.heartbeat(utcnow(), CALLBACK_TIMEOUT) with self.death_penalty_class(CALLBACK_TIMEOUT, JobTimeoutException, job_id=job.id): job.failure_callback(job, self.connection, *sys.exc_info()) @@ -1101,6 +1106,7 @@ class Worker: push_connection(self.connection) started_job_registry = queue.started_job_registry + self.log.debug("Started Job Registry set.") try: self.prepare_job_execution(job) @@ -1108,7 +1114,9 @@ class Worker: job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): + self.log.debug("Performing Job...") rv = job.perform() + self.log.debug(f"Finished performing Job ID {job.id}") job.ended_at = utcnow() @@ -1123,6 +1131,7 @@ class Worker: queue=queue, started_job_registry=started_job_registry) except: # NOQA + self.log.debug(f"Job {job.id} raised an exception.") job.ended_at = utcnow() exc_info = sys.exc_info() exc_string = ''.join(traceback.format_exception(*exc_info)) @@ -1148,7 +1157,7 @@ class Worker: self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id) if rv is not None: - log_result = "{0!r}".format(as_text(text_type(rv))) + log_result = "{0!r}".format(as_text(str(rv))) self.log.debug('Result: %s', yellow(log_result)) if self.log_result_lifespan: @@ -1164,7 +1173,7 @@ class Worker: def handle_exception(self, job: 'Job', *exc_info): """Walks the exception handler stack to delegate exception handling.""" - + self.log.debug(f"Handling exception for {job.id}.") exc_string = ''.join(traceback.format_exception(*exc_info)) # If the job cannot be deserialized, it will raise when func_name or diff --git a/rq/worker_registration.py b/rq/worker_registration.py index 787c7a1..c1e1181 100644 --- a/rq/worker_registration.py +++ b/rq/worker_registration.py @@ -6,7 +6,7 @@ if t.TYPE_CHECKING: from .worker import Worker from .queue import Queue -from .compat import as_text +from .utils import as_text from rq.utils import split_list diff --git a/setup.py b/setup.py index 8853f47..19907a5 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,7 @@ setup( zip_safe=False, platforms='any', install_requires=get_requirements(), - python_requires='>=3.5', + python_requires='>=3.6', entry_points={ 'console_scripts': [ 'rq = rq.cli:main', diff --git a/tests/fixtures.py b/tests/fixtures.py index 7307091..e75fb68 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -14,7 +14,6 @@ from multiprocessing import Process from redis import Redis from rq import Connection, get_current_job, get_current_connection, Queue from rq.decorators import job -from rq.compat import text_type from rq.worker import HerokuWorker, Worker @@ -36,7 +35,7 @@ async def say_hello_async(name=None): def say_hello_unicode(name=None): """A job with a single argument and a return value.""" - return text_type(say_hello(name)) # noqa + return str(say_hello(name)) # noqa def do_nothing(): diff --git a/tests/test_job.py b/tests/test_job.py index 237c5ef..46fbe73 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -7,7 +7,7 @@ from datetime import datetime, timedelta from redis import WatchError -from rq.compat import as_text +from rq.utils import as_text from rq.exceptions import DeserializationError, InvalidJobOperation, NoSuchJobError from rq.job import Job, JobStatus, Dependency, cancel_job, get_current_job from rq.queue import Queue diff --git a/tests/test_registry.py b/tests/test_registry.py index d7b607f..28a29ca 100644 --- a/tests/test_registry.py +++ b/tests/test_registry.py @@ -1,7 +1,7 @@ from datetime import datetime, timedelta from rq.serializers import JSONSerializer -from rq.compat import as_text +from rq.utils import as_text from rq.defaults import DEFAULT_FAILURE_TTL from rq.exceptions import InvalidJobOperation from rq.job import Job, JobStatus, requeue_job diff --git a/tests/test_worker.py b/tests/test_worker.py index 6c7b405..1977c93 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -28,7 +28,7 @@ from tests.fixtures import ( ) from rq import Queue, SimpleWorker, Worker, get_current_connection -from rq.compat import as_text +from rq.utils import as_text from rq.job import Job, JobStatus, Retry from rq.registry import StartedJobRegistry, FailedJobRegistry, FinishedJobRegistry from rq.results import Result