From 5f949f4cefed67de2554ef8dfc08bd62142a52a4 Mon Sep 17 00:00:00 2001 From: mr-trouble <39416610+mr-trouble@users.noreply.github.com> Date: Sun, 19 Jan 2020 03:31:06 -0800 Subject: [PATCH] =?UTF-8?q?Add=20a=20hard=20kill=20from=20the=20parent=20p?= =?UTF-8?q?rocess=20with=20a=2010%=20increased=20timeout=20=E2=80=A6=20(#1?= =?UTF-8?q?169)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add a hard kill from the parent process with a 10% increased timeout in case the forked process gets stuck and cannot stop itself. * Added test for the force kill of the parent process. * Changed 10% to +1 second, and other misc changes based on review comments. --- rq/worker.py | 9 +++++++++ tests/test_worker.py | 27 +++++++++++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 25b98e8..92e63b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -674,6 +674,9 @@ class Worker(object): either executes successfully or the status of the job is set to failed """ + + ret_val = None + job.started_at = job.started_at or utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -683,6 +686,12 @@ class Worker(object): # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. self.heartbeat(self.job_monitoring_interval + 5) + + # Kill the job from this side if something is really wrong (interpreter lock/etc). + if (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): + self.kill_horse() + break + except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during diff --git a/tests/test_worker.py b/tests/test_worker.py index e983bf5..7d8ed5a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1055,6 +1055,33 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(job in failed_job_registry) self.assertEqual(fooq.count, 0) + @slow + def test_work_horse_force_death(self): + """Simulate a frozen worker that doesn't observe the timeout properly. + Fake it by artificially setting the timeout of the parent process to + something much smaller after the process is already forked. + """ + fooq = Queue('foo') + self.assertEqual(fooq.count, 0) + w = Worker(fooq) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' + if os.path.exists(sentinel_file): + os.remove(sentinel_file) + fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + job, queue = w.dequeue_job_and_maintain_ttl(5) + w.fork_work_horse(job, queue) + job.timeout = 5 + w.job_monitoring_interval = 1 + now = utcnow() + w.monitor_work_horse(job) + fudge_factor = 1 + total_time = w.job_monitoring_interval + 5 + fudge_factor + self.assertTrue((utcnow() - now).total_seconds() < total_time) + self.assertEqual(job.get_status(), JobStatus.FAILED) + failed_job_registry = FailedJobRegistry(queue=fooq) + self.assertTrue(job in failed_job_registry) + self.assertEqual(fooq.count, 0) + def schedule_access_self(): q = Queue('default', connection=get_current_connection())