diff --git a/rq/queue.py b/rq/queue.py index eda26ed..9c33be6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -74,6 +74,20 @@ class Queue(object): """Returns a count of all messages in the queue.""" return conn.llen(self.key) + def compact(self): + """Removes all "dead" jobs from the queue by cycling through it, while + guarantueeing FIFO semantics. + """ + COMPACT_QUEUE = 'rq:queue:_compact' + + conn.rename(self.key, COMPACT_QUEUE) + while True: + job_id = conn.lpop(COMPACT_QUEUE) + if job_id is None: + break + if Job.exists(job_id): + conn.rpush(self.key, job_id) + def push_job_id(self, job_id): # noqa """Pushes a job ID on the corresponding Redis queue.""" diff --git a/tests/test_queue.py b/tests/test_queue.py index 270a101..92790b9 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -49,6 +49,24 @@ class TestQueue(RQTestCase): self.testconn.rpush('rq:queue:example', 'sentinel message') self.assertEquals(q.is_empty(), False) + def test_compact(self): + """Compacting queueus.""" + q = Queue() + + q.enqueue(testjob, 'Alice') + bob = q.enqueue(testjob, 'Bob') + q.enqueue(testjob, 'Charlie') + debrah = q.enqueue(testjob, 'Debrah') + + bob.cancel() + debrah.cancel() + + self.assertEquals(q.count, 4) + + q.compact() + + self.assertEquals(q.count, 2) + def test_enqueue(self): # noqa """Enqueueing job onto queues."""