From c00d3681f95ce04af5342a4b68cb6f9239fe701d Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Wed, 25 May 2016 15:17:37 -0400 Subject: [PATCH] Failing test to demonstrate issue #702 Test that demonstrates that if a work-horse process is terminated unexpectedly the job being processed could be stuck at the "Started" state (https://github.com/nvie/rq/issues/702) --- tests/test_worker.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 7df27f0..8d0368f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -14,7 +14,8 @@ import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - run_dummy_heroku_worker, access_self) + run_dummy_heroku_worker, access_self, + long_running_job) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -577,6 +578,9 @@ def kill_worker(pid, double_kill): time.sleep(0.5) os.kill(pid, signal.SIGTERM) +def kill_work_horse(pid): + os.kill(pid, signal.SIGKILL) + class TimeoutTestCase: def setUp(self): @@ -649,6 +653,35 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertIsNotNone(shutdown_requested_date) self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + @slow + def test_work_horse_death_sets_job_failed(self): + """worker with an ongoing job whose work horse dies unexpectadly should + set the job's status either to FINISHED or FAILED + """ + fooq = Queue('foo') + failed_q = get_failed_queue() + self.assertEqual(failed_q.count, 0) + self.assertEqual(fooq.count, 0) + w = Worker(fooq) + registry = StartedJobRegistry(connection=self.testconn) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' + if os.path.exists(sentinel_file): + os.remove(sentinel_file) + fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + job, queue = w.dequeue_job_and_maintain_ttl(5) + w.fork_work_horse(job, queue) + p = Process(target=kill_work_horse, args=(w._horse_pid,)) + p.start() + p.join(1) + w.monitor_work_horse(job) + job_status = job.get_status() + if os.path.exists(sentinel_file): + self.assertEqual(job_status, JobStatus.FINISHED) + os.remove(sentinel_file) + else: + self.assertEqual(job_status, JobStatus.FAILED) + self.assertEqual(failed_q.count, 1) + self.assertEqual(fooq.count, 0) def schedule_access_self(): q = Queue('default', connection=get_current_connection())