test_job.py: Add cancel test on failed queue

main
Javier Lopez 9 years ago
parent 8e24d3a92d
commit e92b57d128

@ -11,7 +11,7 @@ from tests.helpers import strip_microseconds
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus, cancel_job from rq.job import Job, get_current_job, JobStatus, cancel_job
from rq.queue import Queue from rq.queue import Queue, get_failed_queue
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
from rq.worker import Worker from rq.worker import Worker
@ -435,3 +435,12 @@ class TestJob(RQTestCase):
self.assertEqual(1, len(queue.get_jobs())) self.assertEqual(1, len(queue.get_jobs()))
cancel_job(job.id) cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(0, len(queue.get_jobs()))
def test_create_failed_and_cancel_job(self):
"""test creating and using cancel_job deletes job properly"""
failed = get_failed_queue(connection=self.testconn)
job = failed.enqueue(fixtures.say_hello)
job.set_status(JobStatus.FAILED)
self.assertEqual(1, len(failed.get_jobs()))
cancel_job(job.id)
self.assertEqual(0, len(failed.get_jobs()))

Loading…
Cancel
Save