test_job.py: Add test for cancel_job function

main
Javier Lopez 9 years ago
parent c5e62c70bb
commit 091c2568c9

@ -10,7 +10,7 @@ from tests.helpers import strip_microseconds
from rq.compat import PY2, as_text from rq.compat import PY2, as_text
from rq.exceptions import NoSuchJobError, UnpickleError from rq.exceptions import NoSuchJobError, UnpickleError
from rq.job import Job, get_current_job, JobStatus from rq.job import Job, get_current_job, JobStatus, cancel_job
from rq.queue import Queue from rq.queue import Queue
from rq.registry import DeferredJobRegistry from rq.registry import DeferredJobRegistry
from rq.utils import utcformat from rq.utils import utcformat
@ -427,3 +427,11 @@ class TestJob(RQTestCase):
queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1) queue.enqueue(fixtures.say_hello, job_id="1234", ttl=1)
time.sleep(1) time.sleep(1)
self.assertEqual(0, len(queue.get_jobs())) self.assertEqual(0, len(queue.get_jobs()))
def test_create_and_cancel_job(self):
"""test creating and using cancel_job deletes job properly"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(fixtures.say_hello)
self.assertEqual(1, len(queue.get_jobs()))
cancel_job(job.id)
self.assertEqual(0, len(queue.get_jobs()))

Loading…
Cancel
Save