Failing test to demonstrate issue #702

Test that demonstrates that if a work-horse process is terminated unexpectedly
the job being processed could be stuck at the "Started" state
(https://github.com/nvie/rq/issues/702)
main
Yannis Spiliopoulos 9 years ago
parent 07adca6ca3
commit c00d3681f9

@ -14,7 +14,8 @@ import subprocess
from tests import RQTestCase, slow from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout, from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid, div_by_zero, do_nothing, say_hello, say_pid,
run_dummy_heroku_worker, access_self) run_dummy_heroku_worker, access_self,
long_running_job)
from tests.helpers import strip_microseconds from tests.helpers import strip_microseconds
from rq import (get_failed_queue, Queue, SimpleWorker, Worker, from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
@ -577,6 +578,9 @@ def kill_worker(pid, double_kill):
time.sleep(0.5) time.sleep(0.5)
os.kill(pid, signal.SIGTERM) os.kill(pid, signal.SIGTERM)
def kill_work_horse(pid):
os.kill(pid, signal.SIGKILL)
class TimeoutTestCase: class TimeoutTestCase:
def setUp(self): def setUp(self):
@ -649,6 +653,35 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertIsNotNone(shutdown_requested_date) self.assertIsNotNone(shutdown_requested_date)
self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') self.assertEqual(type(shutdown_requested_date).__name__, 'datetime')
@slow
def test_work_horse_death_sets_job_failed(self):
"""worker with an ongoing job whose work horse dies unexpectadly should
set the job's status either to FINISHED or FAILED
"""
fooq = Queue('foo')
failed_q = get_failed_queue()
self.assertEqual(failed_q.count, 0)
self.assertEqual(fooq.count, 0)
w = Worker(fooq)
registry = StartedJobRegistry(connection=self.testconn)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
fooq.enqueue(create_file_after_timeout, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5)
w.fork_work_horse(job, queue)
p = Process(target=kill_work_horse, args=(w._horse_pid,))
p.start()
p.join(1)
w.monitor_work_horse(job)
job_status = job.get_status()
if os.path.exists(sentinel_file):
self.assertEqual(job_status, JobStatus.FINISHED)
os.remove(sentinel_file)
else:
self.assertEqual(job_status, JobStatus.FAILED)
self.assertEqual(failed_q.count, 1)
self.assertEqual(fooq.count, 0)
def schedule_access_self(): def schedule_access_self():
q = Queue('default', connection=get_current_connection()) q = Queue('default', connection=get_current_connection())

Loading…
Cancel
Save