From 985a2664a4099b192de877c9dfc824d36b43f48e Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Mon, 16 Mar 2015 23:30:41 -0700 Subject: [PATCH] Prevent `Queue#dequeue` from blowing the stack In the case of many sequential jobs having been deleted, a recursive implementation of `Queue#dequeue` is prone to blowing the stack in the absence of tail-recursion support. Change the implementation from recursive to iterative to work around this issue in CPython. --- rq/queue.py | 32 ++++++++++++++++---------------- tests/test_queue.py | 8 ++++++++ 2 files changed, 24 insertions(+), 16 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index ea39d81..baf16d6 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -329,22 +329,22 @@ class Queue(object): Returns a job_class instance, which can be executed or inspected. """ - job_id = self.pop_job_id() - if job_id is None: - return None - try: - job = self.job_class.fetch(job_id, connection=self.connection) - except NoSuchJobError as e: - # Silently pass on jobs that don't exist (anymore), - # and continue by reinvoking itself recursively - return self.dequeue() - except UnpickleError as e: - # Attach queue information on the exception for improved error - # reporting - e.job_id = job_id - e.queue = self - raise e - return job + while True: + job_id = self.pop_job_id() + if job_id is None: + return None + try: + job = self.job_class.fetch(job_id, connection=self.connection) + except NoSuchJobError as e: + # Silently pass on jobs that don't exist (anymore), + continue + except UnpickleError as e: + # Attach queue information on the exception for improved error + # reporting + e.job_id = job_id + e.queue = self + raise e + return job @classmethod def dequeue_any(cls, queues, timeout, connection=None): diff --git a/tests/test_queue.py b/tests/test_queue.py index e4e9253..6a631d7 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -173,6 +173,14 @@ class TestQueue(RQTestCase): # ...and assert the queue count when down self.assertEquals(q.count, 0) + def test_dequeue_deleted_jobs(self): + """Dequeueing deleted jobs from queues don't blow the stack.""" + q = Queue() + for _ in range(1,1000): + job = q.enqueue(say_hello) + job.delete() + q.dequeue() + def test_dequeue_instance_method(self): """Dequeueing instance method jobs from queues.""" q = Queue()