diff --git a/rq/worker.py b/rq/worker.py index 91ffc23..341ac8f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -606,7 +606,7 @@ class Worker(object): If no timeout is given, the default_worker_ttl will be used to update the expiration time of the worker. """ - timeout = timeout or self.default_worker_ttl + timeout = timeout or self.default_worker_ttl + 60 connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, timeout) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) @@ -689,10 +689,10 @@ class Worker(object): except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 30) + self.heartbeat(self.job_monitoring_interval + 60) # Kill the job from this side if something is really wrong (interpreter lock/etc). - if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): + if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60): self.kill_horse() break @@ -776,7 +776,7 @@ class Worker(object): timeout = job.timeout or 180 if heartbeat_ttl is None: - heartbeat_ttl = self.job_monitoring_interval + 5 + heartbeat_ttl = self.job_monitoring_interval + 60 with self.connection.pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) @@ -1000,7 +1000,7 @@ class SimpleWorker(Worker): def execute_job(self, job, queue): """Execute job in same thread/process, do not fork()""" - timeout = (job.timeout or DEFAULT_WORKER_TTL) + 5 + timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60 return self.perform_job(job, queue, heartbeat_ttl=timeout) diff --git a/tests/test_worker.py b/tests/test_worker.py index 088213d..8e8564f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1084,7 +1084,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): now = utcnow() w.monitor_work_horse(job) fudge_factor = 1 - total_time = w.job_monitoring_interval + 5 + fudge_factor + total_time = w.job_monitoring_interval + 65 + fudge_factor self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertEqual(job.get_status(), JobStatus.FAILED) failed_job_registry = FailedJobRegistry(queue=fooq)