Add compact() method on Queues, to remove dead messages.

main
Vincent Driessen 13 years ago
parent 80a615a61c
commit 06ce9622ea

@ -74,6 +74,20 @@ class Queue(object):
"""Returns a count of all messages in the queue.""" """Returns a count of all messages in the queue."""
return conn.llen(self.key) return conn.llen(self.key)
def compact(self):
"""Removes all "dead" jobs from the queue by cycling through it, while
guarantueeing FIFO semantics.
"""
COMPACT_QUEUE = 'rq:queue:_compact'
conn.rename(self.key, COMPACT_QUEUE)
while True:
job_id = conn.lpop(COMPACT_QUEUE)
if job_id is None:
break
if Job.exists(job_id):
conn.rpush(self.key, job_id)
def push_job_id(self, job_id): # noqa def push_job_id(self, job_id): # noqa
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""

@ -49,6 +49,24 @@ class TestQueue(RQTestCase):
self.testconn.rpush('rq:queue:example', 'sentinel message') self.testconn.rpush('rq:queue:example', 'sentinel message')
self.assertEquals(q.is_empty(), False) self.assertEquals(q.is_empty(), False)
def test_compact(self):
"""Compacting queueus."""
q = Queue()
q.enqueue(testjob, 'Alice')
bob = q.enqueue(testjob, 'Bob')
q.enqueue(testjob, 'Charlie')
debrah = q.enqueue(testjob, 'Debrah')
bob.cancel()
debrah.cancel()
self.assertEquals(q.count, 4)
q.compact()
self.assertEquals(q.count, 2)
def test_enqueue(self): # noqa def test_enqueue(self): # noqa
"""Enqueueing job onto queues.""" """Enqueueing job onto queues."""

Loading…
Cancel
Save