Implement requeue() method on FailedQueue.

main
Vincent Driessen 13 years ago
parent 8e85c7eee3
commit 7e0b843d06

@ -2,6 +2,10 @@ class NoSuchJobError(Exception):
pass pass
class InvalidJobOperationError(Exception):
pass
class NoQueueError(Exception): class NoQueueError(Exception):
pass pass

@ -2,7 +2,7 @@ import times
from functools import total_ordering from functools import total_ordering
from .proxy import conn from .proxy import conn
from .job import Job from .job import Job
from .exceptions import NoSuchJobError, UnpickleError from .exceptions import NoSuchJobError, UnpickleError, InvalidJobOperationError
def compact(lst): def compact(lst):
@ -121,10 +121,6 @@ class Queue(object):
self.push_job_id(job.id) self.push_job_id(job.id)
return job return job
def requeue(self, job):
"""Requeues an existing (typically a failed job) onto the queue."""
raise NotImplementedError('Implement this')
def pop_job_id(self): def pop_job_id(self):
"""Pops a given job ID from this Redis queue.""" """Pops a given job ID from this Redis queue."""
return conn.lpop(self.key) return conn.lpop(self.key)
@ -238,3 +234,15 @@ class FailedQueue(Queue):
job.ended_at = times.now() job.ended_at = times.now()
job.exc_info = exc_info job.exc_info = exc_info
return self.enqueue_job(job, set_meta_data=False) return self.enqueue_job(job, set_meta_data=False)
def requeue(self, job_id):
"""Requeues the given job ID."""
job = Job.fetch(job_id)
# Delete it from the FailedQueue (raise an error if that failed)
if conn.lrem(self.key, job.id) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')
job.exc_info = None
q = Queue(job.origin)
q.enqueue_job(job)

@ -152,3 +152,4 @@ class TestJob(RQTestCase):
self.testconn.hset(job.key, 'data', unimportable_data) self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
job.refresh() job.refresh()

@ -1,6 +1,7 @@
from tests import RQTestCase from tests import RQTestCase
from tests import testjob from tests import testjob, failing_job
from rq import Queue, FailedQueue, Job from rq import Queue, FailedQueue, Job
from rq.exceptions import InvalidJobOperationError
class TestQueue(RQTestCase): class TestQueue(RQTestCase):
@ -189,3 +190,29 @@ class TestQueue(RQTestCase):
self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False), self.assertEquals(Queue.dequeue_any([Queue(), Queue('low')], False),
None) None)
self.assertEquals(q.count, 0) self.assertEquals(q.count, 0)
class TestFailedQueue(RQTestCase):
def test_requeue_job(self):
"""Requeueing existing jobs."""
job = Job.for_call(failing_job, 1, 2, 3)
job.origin = 'fake'
job.save()
FailedQueue().quarantine(job, Exception('Some fake error'))
self.assertItemsEqual(Queue.all(), [FailedQueue()])
self.assertEquals(FailedQueue().count, 1)
FailedQueue().requeue(job.id)
self.assertEquals(FailedQueue().count, 0)
self.assertEquals(Queue('fake').count, 1)
def test_requeue_nonfailed_job_fails(self):
"""Requeueing non-failed jobs raises error."""
q = Queue()
job = q.enqueue(testjob, 'Nick', foo='bar')
# Assert that we cannot requeue a job that's not on the failed queue
with self.assertRaises(InvalidJobOperationError):
FailedQueue().requeue(job.id)

Loading…
Cancel
Save