From a905961f940c912cce66de4878f0a72a4712fc6d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 18 Nov 2011 01:44:28 +0100 Subject: [PATCH] Add some sample scripts. --- bin/rqgenload | 59 +++++++++++++++++++++++++++++++++++++++++++++++++ bin/rqinfo | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++ bin/rqworker | 1 + rq/dummy.py | 28 +++++++++++++++++++++++ rq/queue.py | 11 ++++++++-- setup.py | 1 + 6 files changed, 159 insertions(+), 2 deletions(-) create mode 100755 bin/rqgenload create mode 100755 bin/rqinfo create mode 100644 rq/dummy.py diff --git a/bin/rqgenload b/bin/rqgenload new file mode 100755 index 0000000..12bf33e --- /dev/null +++ b/bin/rqgenload @@ -0,0 +1,59 @@ +#!/usr/bin/env python +import optparse +from rq import use_redis, Queue, Worker +from rq import dummy + + +def parse_args(): + parser = optparse.OptionParser() + parser.add_option('-n', '--count', type='int', dest='count', default=1) + opts, args = parser.parse_args() + return (opts, args, parser) + +def main(): + opts, args, parser = parse_args() + + use_redis() + + #funcs = filter(lambda s: not s.startswith('_'), dir(rq.dummy)) + #print(funcs) + + queues = ('default', 'foo', 'bar', 'qux', 'high', 'normal', 'low') + + sample_calls = [ + (dummy.do_nothing, [], {}), + (dummy.sleep, [1], {}), + (dummy.sleep, [2], {}), + ] + + for i in range(opts.count): + import random + f, args, kwargs = random.choice(sample_calls) + + q = Queue(random.choice(queues)) + q.enqueue(f, *args, **kwargs) + + #q = Queue('foo') + #q.enqueue(do_nothing) + #q.enqueue(sleep, 3) + #q = Queue('bar') + #q.enqueue(yield_stuff) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + +if __name__ == '__main__': + main() diff --git a/bin/rqinfo b/bin/rqinfo new file mode 100755 index 0000000..663b5e5 --- /dev/null +++ b/bin/rqinfo @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import optparse +from rq import use_redis, Queue, Worker +from rq.dummy import * + + +def parse_args(): + parser = optparse.OptionParser() + parser.add_option('-q', '--queues', dest='subcmd', + action='store_const', const='queues', + help='Shows stats for queues.') + parser.add_option('-w', '--workers', dest='subcmd', + action='store_const', const='workers', + help='Shows stats for workers.') + opts, args = parser.parse_args() + return (opts, args, parser) + +def main(): + opts, args, parser = parse_args() + + if not opts.subcmd: + parser.error('Specify either --queues or --workers.') + + use_redis() + + if opts.subcmd == 'workers': + raise NotImplementedError() + elif opts.subcmd == 'queues': + #q = Queue('foo') + #q.enqueue(do_nothing) + #q.enqueue(sleep, 3) + #q = Queue('bar') + #q.enqueue(yield_stuff) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + #q.enqueue(do_nothing) + qs = Queue.all() + num_jobs = 0 + for q in qs: + count = q.count + chart = '█' * count + print('%-12s %3d |%s' % (q.name, count, chart)) + num_jobs += count + print('%d queues, %d jobs total' % (len(qs), num_jobs)) + +if __name__ == '__main__': + main() diff --git a/bin/rqworker b/bin/rqworker index 2379f0f..b3b190c 100755 --- a/bin/rqworker +++ b/bin/rqworker @@ -2,6 +2,7 @@ import optparse from rq import use_redis, Queue, Worker + def parse_args(): parser = optparse.OptionParser() parser.add_option('-b', '--burst', dest='burst', diff --git a/rq/dummy.py b/rq/dummy.py new file mode 100644 index 0000000..f4b8dc2 --- /dev/null +++ b/rq/dummy.py @@ -0,0 +1,28 @@ +""" +Some dummy tasks that are well-suited for generating load for testing purposes. +""" +import time + +def do_nothing(): + pass + +def sleep(secs): + time.sleep(secs) + +def endless_loop(): + x = 7 + while True: + x *= 28 + if x % 3 == 0: + x //= 21 + if x == 0: + x = 82 + +def div_by_zero(): + 1/0 + +def yield_stuff(): + yield 7 + yield 'foo' + yield (3.14, 2.18) + yield yield_stuff() diff --git a/rq/queue.py b/rq/queue.py index e427800..9e961cb 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -40,7 +40,14 @@ class Job(object): @total_ordering class Queue(object): - redis_queue_namespace_prefix = 'rq:' + redis_queue_namespace_prefix = 'rq:queue:' + + @classmethod + def all(cls): + """Returns an iterable of all Queues. + """ + prefix = cls.redis_queue_namespace_prefix + return map(cls.from_queue_key, conn.keys('%s*' % prefix)) @classmethod def from_queue_key(cls, queue_key): @@ -76,7 +83,7 @@ class Queue(object): return conn.llen(self.key) def enqueue(self, f, *args, **kwargs): - rv_key = '%s:result:%s' % (self.key, str(uuid.uuid4())) + rv_key = 'rq:result:%s:%s' % (self.name, str(uuid.uuid4())) if f.__module__ == '__main__': raise ValueError('Functions from the __main__ module cannot be processed by workers.') message = dumps((f, args, kwargs, rv_key)) diff --git a/setup.py b/setup.py index 3012b95..3c3aeea 100644 --- a/setup.py +++ b/setup.py @@ -19,6 +19,7 @@ setup( zip_safe=False, platforms='any', install_requires=['redis', 'logbook', 'blinker'], + scripts=['bin/rqinfo', 'bin/rqworker'], classifiers=[ # As from http://pypi.python.org/pypi?%3Aaction=list_classifiers 'Development Status :: 2 - Pre-Alpha',