Merge pull request #334 from nvie/emptying-failed-queue

Emptying failed queue
main
Vincent Driessen 11 years ago
commit aef7af9c77

@ -1,6 +1,9 @@
### 0.4.0 ### 0.4.0
(not released yet) (not released yet)
- Emptying the failed queue from the command line is now as simple as running
`rqinfo -X` or `rqinfo --empty-failed-queue`.
- Job data is unpickled lazily. Thanks, Malthe! - Job data is unpickled lazily. Thanks, Malthe!
- Removed dependency on the `times` library. Thanks, Malthe! - Removed dependency on the `times` library. Thanks, Malthe!

@ -65,10 +65,25 @@ class Queue(object):
def empty(self): def empty(self):
"""Removes all messages on the queue.""" """Removes all messages on the queue."""
job_list = self.get_jobs() script = b"""
self.connection.delete(self.key) local prefix = "rq:job:"
for job in job_list: local q = KEYS[1]
job.cancel() local count = 0
while true do
local job_id = redis.call("lpop", q)
if job_id == false then
break
end
-- Delete the relevant keys
redis.call("del", prefix..job_id)
redis.call("del", prefix..job_id..":dependents")
count = count + 1
end
return count
"""
script = self.connection.register_script(script)
return script(keys=[self.key])
def is_empty(self): def is_empty(self):
"""Returns whether the current queue is empty.""" """Returns whether the current queue is empty."""

@ -1,16 +1,16 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import sys import argparse
import os import os
import sys
import time import time
import argparse
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
from rq import Queue, Worker
from rq import get_failed_queue, Queue, Worker
from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis)
from rq.utils import gettermsize, make_colorizer from rq.utils import gettermsize, make_colorizer
from rq.scripts import add_standard_arguments
from rq.scripts import setup_redis
from rq.scripts import read_config_file
from rq.scripts import setup_default_arguments
red = make_colorizer('darkred') red = make_colorizer('darkred')
green = make_colorizer('darkgreen') green = make_colorizer('darkgreen')
@ -116,7 +116,7 @@ def show_workers(args):
max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0 max_qname = max(map(lambda q: len(q.name), queues.keys())) if queues else 0
for q in queues: for q in queues:
if queues[q]: if queues[q]:
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa
else: else:
queues_str = '' queues_str = ''
print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str)) print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
@ -140,11 +140,12 @@ def parse_args():
parser = argparse.ArgumentParser(description='RQ command-line monitor.') parser = argparse.ArgumentParser(description='RQ command-line monitor.')
add_standard_arguments(parser) add_standard_arguments(parser)
parser.add_argument('--path', '-P', default='.', help='Specify the import path.') parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa
parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa
parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa
parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa
parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa
parser.add_argument('queues', nargs='*', help='The queues to poll') parser.add_argument('queues', nargs='*', help='The queues to poll')
return parser.parse_args() return parser.parse_args()
@ -173,15 +174,20 @@ def main():
setup_default_arguments(args, settings) setup_default_arguments(args, settings)
setup_redis(args) setup_redis(args)
try: try:
if args.only_queues: if args.empty_failed_queue:
func = show_queues num_jobs = get_failed_queue().empty()
elif args.only_workers: print('{} jobs removed from failed queue'.format(num_jobs))
func = show_workers
else: else:
func = show_both if args.only_queues:
func = show_queues
elif args.only_workers:
func = show_workers
else:
func = show_both
interval(args.interval, func, args) interval(args.interval, func, args)
except ConnectionError as e: except ConnectionError as e:
print(e) print(e)
sys.exit(1) sys.exit(1)

Loading…
Cancel
Save