diff --git a/rq/queue.py b/rq/queue.py index e0e686e..fc1ea08 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -65,6 +65,8 @@ class Queue(object): def empty(self): """Removes all messages on the queue.""" + for job in self.get_jobs(): + job.cancel() self.connection.delete(self.key) def is_empty(self): @@ -151,7 +153,7 @@ class Queue(object): job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl, status=Status.QUEUED, description=description, dependency=after) - + # If job depends on an unfinished job, register itself on it's # parent's waitlist instead of enqueueing it. # If WatchError is raised in the process, that means something else is @@ -168,7 +170,7 @@ class Queue(object): break except WatchError: continue - + return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -219,7 +221,7 @@ class Queue(object): the properties `origin` and `enqueued_at`. If Queue is instantiated with async=False, job is executed immediately. - """ + """ # Add Queue key set self.connection.sadd(self.redis_queues_keys, self.key) diff --git a/tests/test_queue.py b/tests/test_queue.py index 37c2610..e236f0c 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -43,6 +43,14 @@ class TestQueue(RQTestCase): self.assertEquals(q.is_empty(), True) self.assertIsNone(self.testconn.lpop('rq:queue:example')) + def test_empty_removes_jobs(self): + """Emptying a queue deletes the associated job objects""" + q = Queue('example') + job = q.enqueue(say_hello) + self.assertTrue(Job.exists(job.id)) + q.empty() + self.assertFalse(Job.exists(job.id)) + def test_queue_is_empty(self): """Detecting empty queues.""" q = Queue('example')