worker shutdown tests

main
Samuel Colvin 9 years ago
parent e750134e8a
commit aada162a4d

@ -5,10 +5,13 @@ from __future__ import (absolute_import, division, print_function,
import os import os
from datetime import timedelta from datetime import timedelta
from time import sleep from time import sleep
import signal
import time
from multiprocessing import Process
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid) div_by_zero, do_nothing, say_hello, say_pid, long_running_job)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq import get_failed_queue, Queue, SimpleWorker, Worker
@ -468,3 +471,66 @@ class TestWorker(RQTestCase):
worker = Worker(queue, connection=self.testconn) worker = Worker(queue, connection=self.testconn)
worker.work(burst=True) worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0) self.assertEqual(self.testconn.zcard(registry.key), 0)
def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
if double_kill:
# give the worker time to switch signal handler
time.sleep(0.5)
os.kill(pid, signal.SIGTERM)
class TestWorkerShutdown(RQTestCase):
def setUp(self):
# we want tests to fail if signal are ignored and the work remain running,
# so set a signal to kill them after 5 seconds
signal.signal(signal.SIGALRM, self._timeout)
signal.alarm(5)
def _timeout(self, signal, frame):
raise AssertionError("test still running after 5 seconds, "
"likely the worker wasn't shutdown correctly")
@slow
def test_idle_worker_warm_shutdown(self):
w = Worker('foo')
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(1)
self.assertFalse(w._stop_requested)
@slow
def test_working_worker_warm_shutdown(self):
fooq = Queue('foo')
w = Worker(fooq)
fooq.enqueue(long_running_job, 2)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), False))
p.start()
w.work()
p.join(1)
self.assertTrue(w._stop_requested)
@slow
def test_working_worker_cold_shutdown(self):
fooq = Queue('foo')
w = Worker(fooq)
fooq.enqueue(long_running_job, 10)
self.assertFalse(w._stop_requested)
p = Process(target=kill_worker, args=(os.getpid(), True))
p.start()
self.assertRaises(SystemExit, w.work)
p.join(1)
self.assertTrue(w._stop_requested)

Loading…
Cancel
Save