diff --git a/rq/queue.py b/rq/queue.py index 86f0406..621942a 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -158,9 +158,10 @@ class Queue(object): if self.job_class.exists(job_id, self.connection): self.connection.rpush(self.key, job_id) - def push_job_id(self, job_id): + def push_job_id(self, job_id, pipeline=None): """Pushes a job ID on the corresponding Redis queue.""" - self.connection.rpush(self.key, job_id) + connection = pipeline if pipeline is not None else self.connection + connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None, description=None, depends_on=None): diff --git a/rq/working_queue.py b/rq/working_queue.py index 3e5da45..64cf1e9 100644 --- a/rq/working_queue.py +++ b/rq/working_queue.py @@ -36,3 +36,15 @@ class WorkingQueue: """Returns list of all job ids.""" return self.connection.zrange(self.key, start, end) + def cleanup(self): + """Removes expired job ids to FailedQueue.""" + job_ids = self.get_expired_job_ids() + + if job_ids: + failed_queue = FailedQueue(connection=self.connection) + with self.connection.pipeline() as pipeline: + for job_id in job_ids: + failed_queue.push_job_id(job_id, pipeline=pipeline) + pipeline.execute() + + return job_ids diff --git a/tests/test_working_queue.py b/tests/test_working_queue.py index 9c1a1e5..af6eeb4 100644 --- a/tests/test_working_queue.py +++ b/tests/test_working_queue.py @@ -2,6 +2,7 @@ from __future__ import absolute_import from rq.job import Job +from rq.queue import FailedQueue from rq.utils import current_timestamp from rq.working_queue import WorkingQueue @@ -41,4 +42,14 @@ class TestQueue(RQTestCase): self.testconn.zadd(self.working_queue.key, 1, 'foo') self.testconn.zadd(self.working_queue.key, timestamp + 10, 'bar') - self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) \ No newline at end of file + self.assertEqual(self.working_queue.get_expired_job_ids(), ['foo']) + + def test_cleanup(self): + """Moving expired jobs to FailedQueue.""" + failed_queue = FailedQueue(connection=self.testconn) + self.assertTrue(failed_queue.is_empty()) + self.testconn.zadd(self.working_queue.key, 1, 'foo') + self.working_queue.cleanup() + self.assertIn('foo', failed_queue.job_ids) + + \ No newline at end of file