diff --git a/rq/worker.py b/rq/worker.py index 25b98e8..92e63b4 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -674,6 +674,9 @@ class Worker(object): either executes successfully or the status of the job is set to failed """ + + ret_val = None + job.started_at = job.started_at or utcnow() while True: try: with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException): @@ -683,6 +686,12 @@ class Worker(object): # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. self.heartbeat(self.job_monitoring_interval + 5) + + # Kill the job from this side if something is really wrong (interpreter lock/etc). + if (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): + self.kill_horse() + break + except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during diff --git a/tests/test_worker.py b/tests/test_worker.py index e983bf5..7d8ed5a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1055,6 +1055,33 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(job in failed_job_registry) self.assertEqual(fooq.count, 0) + @slow + def test_work_horse_force_death(self): + """Simulate a frozen worker that doesn't observe the timeout properly. + Fake it by artificially setting the timeout of the parent process to + something much smaller after the process is already forked. + """ + fooq = Queue('foo') + self.assertEqual(fooq.count, 0) + w = Worker(fooq) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' + if os.path.exists(sentinel_file): + os.remove(sentinel_file) + fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + job, queue = w.dequeue_job_and_maintain_ttl(5) + w.fork_work_horse(job, queue) + job.timeout = 5 + w.job_monitoring_interval = 1 + now = utcnow() + w.monitor_work_horse(job) + fudge_factor = 1 + total_time = w.job_monitoring_interval + 5 + fudge_factor + self.assertTrue((utcnow() - now).total_seconds() < total_time) + self.assertEqual(job.get_status(), JobStatus.FAILED) + failed_job_registry = FailedJobRegistry(queue=fooq) + self.assertTrue(job in failed_job_registry) + self.assertEqual(fooq.count, 0) + def schedule_access_self(): q = Queue('default', connection=get_current_connection())