mirror of https://github.com/peter4431/rq.git
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
184 lines
5.1 KiB
Python
184 lines
5.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Miscellaneous helper functions.
|
|
|
|
The formatter for ANSI colored console output is heavily based on Pygments
|
|
terminal colorizing code, originally by Georg Brandl.
|
|
"""
|
|
import importlib
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
from .compat import is_python_version
|
|
|
|
|
|
def gettermsize():
|
|
def ioctl_GWINSZ(fd):
|
|
try:
|
|
import fcntl, termios, struct # noqa
|
|
cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234'))
|
|
except:
|
|
return None
|
|
return cr
|
|
cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2)
|
|
if not cr:
|
|
try:
|
|
fd = os.open(os.ctermid(), os.O_RDONLY)
|
|
cr = ioctl_GWINSZ(fd)
|
|
os.close(fd)
|
|
except:
|
|
pass
|
|
if not cr:
|
|
try:
|
|
cr = (os.environ['LINES'], os.environ['COLUMNS'])
|
|
except:
|
|
cr = (25, 80)
|
|
return int(cr[1]), int(cr[0])
|
|
|
|
|
|
class _Colorizer(object):
|
|
def __init__(self):
|
|
esc = "\x1b["
|
|
|
|
self.codes = {}
|
|
self.codes[""] = ""
|
|
self.codes["reset"] = esc + "39;49;00m"
|
|
|
|
self.codes["bold"] = esc + "01m"
|
|
self.codes["faint"] = esc + "02m"
|
|
self.codes["standout"] = esc + "03m"
|
|
self.codes["underline"] = esc + "04m"
|
|
self.codes["blink"] = esc + "05m"
|
|
self.codes["overline"] = esc + "06m"
|
|
|
|
dark_colors = ["black", "darkred", "darkgreen", "brown", "darkblue",
|
|
"purple", "teal", "lightgray"]
|
|
light_colors = ["darkgray", "red", "green", "yellow", "blue",
|
|
"fuchsia", "turquoise", "white"]
|
|
|
|
x = 30
|
|
for d, l in zip(dark_colors, light_colors):
|
|
self.codes[d] = esc + "%im" % x
|
|
self.codes[l] = esc + "%i;01m" % x
|
|
x += 1
|
|
|
|
del d, l, x
|
|
|
|
self.codes["darkteal"] = self.codes["turquoise"]
|
|
self.codes["darkyellow"] = self.codes["brown"]
|
|
self.codes["fuscia"] = self.codes["fuchsia"]
|
|
self.codes["white"] = self.codes["bold"]
|
|
|
|
if hasattr(sys.stdout, "isatty"):
|
|
self.notty = not sys.stdout.isatty()
|
|
else:
|
|
self.notty = True
|
|
|
|
def reset_color(self):
|
|
return self.codes["reset"]
|
|
|
|
def colorize(self, color_key, text):
|
|
if not sys.stdout.isatty():
|
|
return text
|
|
else:
|
|
return self.codes[color_key] + text + self.codes["reset"]
|
|
|
|
def ansiformat(self, attr, text):
|
|
"""
|
|
Format ``text`` with a color and/or some attributes::
|
|
|
|
color normal color
|
|
*color* bold color
|
|
_color_ underlined color
|
|
+color+ blinking color
|
|
"""
|
|
result = []
|
|
if attr[:1] == attr[-1:] == '+':
|
|
result.append(self.codes['blink'])
|
|
attr = attr[1:-1]
|
|
if attr[:1] == attr[-1:] == '*':
|
|
result.append(self.codes['bold'])
|
|
attr = attr[1:-1]
|
|
if attr[:1] == attr[-1:] == '_':
|
|
result.append(self.codes['underline'])
|
|
attr = attr[1:-1]
|
|
result.append(self.codes[attr])
|
|
result.append(text)
|
|
result.append(self.codes['reset'])
|
|
return ''.join(result)
|
|
|
|
|
|
colorizer = _Colorizer()
|
|
|
|
|
|
def make_colorizer(color):
|
|
"""Creates a function that colorizes text with the given color.
|
|
|
|
For example:
|
|
|
|
green = make_colorizer('darkgreen')
|
|
red = make_colorizer('red')
|
|
|
|
Then, you can use:
|
|
|
|
print "It's either " + green('OK') + ' or ' + red('Oops')
|
|
"""
|
|
def inner(text):
|
|
return colorizer.colorize(color, text)
|
|
return inner
|
|
|
|
|
|
class ColorizingStreamHandler(logging.StreamHandler):
|
|
|
|
levels = {
|
|
logging.WARNING: make_colorizer('darkyellow'),
|
|
logging.ERROR: make_colorizer('darkred'),
|
|
logging.CRITICAL: make_colorizer('darkred'),
|
|
}
|
|
|
|
def __init__(self, exclude=None, *args, **kwargs):
|
|
self.exclude = exclude
|
|
if is_python_version((2, 6)):
|
|
logging.StreamHandler.__init__(self, *args, **kwargs)
|
|
else:
|
|
super(ColorizingStreamHandler, self).__init__(*args, **kwargs)
|
|
|
|
@property
|
|
def is_tty(self):
|
|
isatty = getattr(self.stream, 'isatty', None)
|
|
return isatty and isatty()
|
|
|
|
def format(self, record):
|
|
message = logging.StreamHandler.format(self, record)
|
|
if self.is_tty:
|
|
colorize = self.levels.get(record.levelno, lambda x: x)
|
|
|
|
# Don't colorize any traceback
|
|
parts = message.split('\n', 1)
|
|
parts[0] = " ".join([parts[0].split(" ", 1)[0], colorize(parts[0].split(" ", 1)[1])])
|
|
|
|
message = '\n'.join(parts)
|
|
|
|
return message
|
|
|
|
|
|
def import_attribute(name):
|
|
"""Return an attribute from a dotted path name (e.g. "path.to.func")."""
|
|
module_name, attribute = name.rsplit('.', 1)
|
|
module = importlib.import_module(module_name)
|
|
return getattr(module, attribute)
|
|
|
|
|
|
def utcnow():
|
|
return datetime.datetime.utcnow()
|
|
|
|
|
|
def utcformat(dt):
|
|
return dt.strftime(u"%Y-%m-%dT%H:%M:%SZ")
|
|
|
|
|
|
def utcparse(string):
|
|
return datetime.datetime.strptime(string, "%Y-%m-%dT%H:%M:%SZ")
|