From c9875696505c79a265a8e1a723ed775cedd6a8a9 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Sat, 16 Feb 2013 18:50:37 +0700 Subject: [PATCH] Safe fetching a deleted job removes the deleted job from queue. --- rq/queue.py | 10 ++++++++-- tests/test_queue.py | 26 ++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/rq/queue.py b/rq/queue.py index f3c47c2..3bf5d00 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -76,6 +76,7 @@ class Queue(object): try: job = Job.safe_fetch(job_id, connection=self.connection) except NoSuchJobError: + self.remove(job_id) return None except UnpickleError: return None @@ -88,6 +89,11 @@ class Queue(object): """Returns a count of all messages in the queue.""" return self.connection.llen(self.key) + def remove(self, job_or_id): + """Removes Job from queue, accepts either a Job instance or ID.""" + job_id = job_or_id.id if isinstance(job_or_id, Job) else job_or_id + return self.connection._lrem(self.key, 0, job_id) + def compact(self): """Removes all "dead" jobs from the queue by cycling through it, while guarantueeing FIFO semantics. @@ -316,11 +322,11 @@ class FailedQueue(Queue): job = Job.fetch(job_id, connection=self.connection) except NoSuchJobError: # Silently ignore/remove this job and return (i.e. do nothing) - self.connection._lrem(self.key, 0, job_id) + self.remove(job_id) return # Delete it from the failed queue (raise an error if that failed) - if self.connection._lrem(self.key, 0, job.id) == 0: + if self.remove(job) == 0: raise InvalidJobOperationError('Cannot requeue non-failed jobs.') job.status = Status.QUEUED diff --git a/tests/test_queue.py b/tests/test_queue.py index b2bffba..061c984 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -50,6 +50,32 @@ class TestQueue(RQTestCase): self.testconn.rpush('rq:queue:example', 'sentinel message') self.assertEquals(q.is_empty(), False) + def test_remove(self): + """Ensure queue.remove properly removes Job from queue.""" + q = Queue('example') + job = q.enqueue(say_hello) + self.assertIn(job.id, q.job_ids) + q.remove(job) + self.assertNotIn(job.id, q.job_ids) + + job = q.enqueue(say_hello) + self.assertIn(job.id, q.job_ids) + q.remove(job.id) + self.assertNotIn(job.id, q.job_ids) + + def test_jobs(self): + """Getting jobs out of a queue.""" + q = Queue('example') + self.assertEqual(q.jobs, []) + job = q.enqueue(say_hello) + self.assertEqual(q.jobs, [job]) + + # Fetching a deleted removes it from queue + job.delete() + self.assertEqual(q.job_ids, [job.id]) + q.jobs + self.assertEqual(q.job_ids, []) + def test_compact(self): """Compacting queueus.""" q = Queue()