Add a hard kill from the parent process with a 10% increased timeout … (#1169)

* Add a hard kill from the parent process with a 10% increased timeout in case the forked process gets stuck and cannot stop itself.

* Added test for the force kill of the parent process.

* Changed 10% to +1 second, and other misc changes based on review comments.
main
mr-trouble 5 years ago committed by Selwin Ong
parent 37a6304a4f
commit 5f949f4cef

@ -674,6 +674,9 @@ class Worker(object):
either executes successfully or the status of the job is set to either executes successfully or the status of the job is set to
failed failed
""" """
ret_val = None
job.started_at = job.started_at or utcnow()
while True: while True:
try: try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
@ -683,6 +686,12 @@ class Worker(object):
# Horse has not exited yet and is still running. # Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive. # Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5) self.heartbeat(self.job_monitoring_interval + 5)
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if (utcnow() - job.started_at).total_seconds() > (job.timeout + 1):
self.kill_horse()
break
except OSError as e: except OSError as e:
# In case we encountered an OSError due to EINTR (which is # In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during # caused by a SIGINT or SIGTERM signal during

@ -1055,6 +1055,33 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(job in failed_job_registry) self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
@slow
def test_work_horse_force_death(self):
"""Simulate a frozen worker that doesn't observe the timeout properly.
Fake it by artificially setting the timeout of the parent process to
something much smaller after the process is already forked.
"""
fooq = Queue('foo')
self.assertEqual(fooq.count, 0)
w = Worker(fooq)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file):
os.remove(sentinel_file)
fooq.enqueue(create_file_after_timeout, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5)
w.fork_work_horse(job, queue)
job.timeout = 5
w.job_monitoring_interval = 1
now = utcnow()
w.monitor_work_horse(job)
fudge_factor = 1
total_time = w.job_monitoring_interval + 5 + fudge_factor
self.assertTrue((utcnow() - now).total_seconds() < total_time)
self.assertEqual(job.get_status(), JobStatus.FAILED)
failed_job_registry = FailedJobRegistry(queue=fooq)
self.assertTrue(job in failed_job_registry)
self.assertEqual(fooq.count, 0)
def schedule_access_self(): def schedule_access_self():
q = Queue('default', connection=get_current_connection()) q = Queue('default', connection=get_current_connection())

Loading…
Cancel
Save