Implemented WorkingQueue.cleanup().

main
Selwin Ong 11 years ago
parent e61d1505bc
commit 90c7eeb111

@ -158,9 +158,10 @@ class Queue(object):
if self.job_class.exists(job_id, self.connection): if self.job_class.exists(job_id, self.connection):
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def push_job_id(self, job_id): def push_job_id(self, job_id, pipeline=None):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id) connection = pipeline if pipeline is not None else self.connection
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None): result_ttl=None, description=None, depends_on=None):

@ -36,3 +36,15 @@ class WorkingQueue:
"""Returns list of all job ids.""" """Returns list of all job ids."""
return self.connection.zrange(self.key, start, end) return self.connection.zrange(self.key, start, end)
def cleanup(self):
"""Removes expired job ids to FailedQueue."""
job_ids = self.get_expired_job_ids()
if job_ids:
failed_queue = FailedQueue(connection=self.connection)
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.execute()
return job_ids

@ -2,6 +2,7 @@
from __future__ import absolute_import from __future__ import absolute_import
from rq.job import Job from rq.job import Job
from rq.queue import FailedQueue
from rq.utils import current_timestamp from rq.utils import current_timestamp
from rq.working_queue import WorkingQueue from rq.working_queue import WorkingQueue
@ -42,3 +43,13 @@ class TestQueue(RQTestCase):
self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar')
self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo'])
def test_cleanup(self):
"""Moving expired jobs to FailedQueue."""
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.working_queue.key, 1, 'foo')
self.working_queue.cleanup()
self.assertIn('foo', failed_queue.job_ids)
Loading…
Cancel
Save