@ -38,7 +38,6 @@ def state_symbol(state):
def show_queues(args):
def show_queues(args):
while True:
if len(args.queues):
if len(args.queues):
qs = map(Queue, args.queues)
qs = map(Queue, args.queues)
else:
else:
@ -57,16 +56,13 @@ def show_queues(args):
scale = get_scale(max_count)
scale = get_scale(max_count)
ratio = chartwidth * 1.0 / scale
ratio = chartwidth * 1.0 / scale
if args.interval:
os.system('clear')
for q in qs:
for q in qs:
count = counts[q]
count = counts[q]
if not args.raw:
if not args.raw:
chart = green('|' + '█' * int(ratio * count))
chart = green('|' + '█' * int(ratio * count))
line = '%-12s %s %d' % (q.name, chart, count)
line = '%-12s %s %d' % (q.name, chart, count)
else:
else:
line = '%-12 s %d' % (q.name, count)
line = 'queue %s %d' % (q.name, count)
print(line)
print(line)
num_jobs += count
num_jobs += count
@ -75,27 +71,40 @@ def show_queues(args):
if not args.raw:
if not args.raw:
print('%d queues, %d jobs total' % (len(qs), num_jobs))
print('%d queues, %d jobs total' % (len(qs), num_jobs))
if args.interval:
time.sleep(args.interval)
else:
break
def show_workers(args):
def show_workers(args):
while True:
if len(args.queues):
qs = map(Queue, args.queues)
def any_matching_queue(worker):
def queue_matches(q):
return q in qs
return any(map(queue_matches, worker.queues))
# Filter out workers that don't match the queue filter
ws = [w for w in Worker.all() if any_matching_queue(w)]
else:
qs = Queue.all()
qs = Queue.all()
ws = Worker.all()
ws = Worker.all()
if args.interval:
if not args.by_queue:
os.system('clear')
def filter_queues(queue_names):
return [qname for qname in queue_names if Queue(qname) in qs]
queues = {qname: [] for qname in qs}
for w in ws:
if not args.raw:
print '%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(filter_queues(w.queue_names())))
else:
print 'worker %s %s %s' % (w.name, w.state, ','.join(filter_queues(w.queue_names())))
else:
# Create reverse lookup table
queues = {q: [] for q in qs}
for w in ws:
for w in ws:
for q in w.queues:
for q in w.queues:
if not q in queues:
if not q in queues:
queues[q] = []
continue
queues[q].append(w)
queues[q].append(w)
if args.by_queue:
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0
for q in queues:
for q in queues:
if queues[q]:
if queues[q]:
@ -103,15 +112,20 @@ def show_workers(args):
else:
else:
queues_str = '– '
queues_str = '– '
print '%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)
print '%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)
else:
for w in ws:
print '%s %s: %s' % (w.name, state_symbol(w.state), ', '.join(w.queue_names()))
print '%d workers, %d queues' % (len(ws), len(queues))
if args.interval:
if not args.raw:
time.sleep(args.interval)
print '%d workers, %d queues' % (len(ws), len(qs))
else:
break
def show_both(args):
show_queues(args)
if not args.raw:
print ''
show_workers(args)
if not args.raw:
print ''
import datetime
print 'Updated: %s' % datetime.datetime.now()
def parse_args():
def parse_args():
@ -120,22 +134,24 @@ def parse_args():
parser.add_argument('--port', '-p', type=int, default=6379, help='The Redis portnumber (default: 6379)')
parser.add_argument('--port', '-p', type=int, default=6379, help='The Redis portnumber (default: 6379)')
parser.add_argument('--db', '-d', type=int, default=0, help='The Redis database (default: 0)')
parser.add_argument('--db', '-d', type=int, default=0, help='The Redis database (default: 0)')
parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)')
parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts')
parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info')
parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info')
parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue')
parser.add_argument('queues', nargs='*', help='The queues to poll')
return parser.parse_args()
parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument('--interval', '-i', metavar='N', type=float, default=0, help='Updates stats every N seconds (default: don\'t poll)')
subparsers = parser.add_subparsers()
queues_p = subparsers.add_parser('queues', parents=[parent_parser], help='Show queue info')
queues_p.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts')
queues_p.add_argument('queues', nargs='*', help='The queues to poll')
queues_p.set_defaults(func=show_queues)
workers_p = subparsers.add_parser('workers', parents=[parent_parser], help='Show worker activity')
workers_p.add_argument('--by-queue', '-Q', dest='by_queue', default=False, action='store_true', help='Shows workers by queue')
workers_p.set_defaults(func=show_workers)
return parser.parse_args()
def interval(val, func, args):
while True:
if val:
os.system('clear')
func(args)
if val:
time.sleep(val)
else:
break
def main():
def main():
@ -148,7 +164,14 @@ def main():
redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db)
redis_conn = redis.Redis(host=args.host, port=args.port, db=args.db)
use_connection(redis_conn)
use_connection(redis_conn)
try:
try:
args.func(args)
if args.only_queues:
func = show_queues
elif args.only_workers:
func = show_workers
else:
func = show_both
interval(args.interval, func, args)
except ConnectionError as e:
except ConnectionError as e:
print(e)
print(e)