Add tests for custom job class in worker

main
Antoine Leclair 9 years ago
parent 5c72417cda
commit 7275f62737

@ -347,6 +347,22 @@ class TestWorker(RQTestCase):
worker = Worker([q], job_class=CustomJob) worker = Worker([q], job_class=CustomJob)
self.assertEqual(worker.job_class, CustomJob) self.assertEqual(worker.job_class, CustomJob)
def test_custom_job_class_by_string(self):
"""Ensure Worker accepts custom job class using dotted notation."""
q = Queue()
worker = Worker([q], job_class='test_worker.CustomJob')
self.assertEqual(worker.job_class, CustomJob)
def test_custom_job_class_is_not_global(self):
"""Ensure Worker custom job class is not global."""
q = Queue()
worker_custom = Worker([q], job_class=CustomJob)
q_generic = Queue()
worker_generic = Worker([q_generic])
self.assertEqual(worker_custom.job_class, CustomJob)
self.assertEqual(worker_generic.job_class, Job)
self.assertEqual(Worker.job_class, Job)
def test_work_via_simpleworker(self): def test_work_via_simpleworker(self):
"""Worker processes work, with forking disabled, """Worker processes work, with forking disabled,
then returns.""" then returns."""

Loading…
Cancel
Save