Worker key TTLs are set to be a bit longer to account for system hiccups (#1279)

* Worker key TTLs are set to be a bit longer to account for system hiccups

* Fix test_work_horse_force_death
main
Selwin Ong 5 years ago committed by GitHub
parent 6eec0065df
commit 1d8ea8e7a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -606,7 +606,7 @@ class Worker(object):
If no timeout is given, the default_worker_ttl will be used to update If no timeout is given, the default_worker_ttl will be used to update
the expiration time of the worker. the expiration time of the worker.
""" """
timeout = timeout or self.default_worker_ttl timeout = timeout or self.default_worker_ttl + 60
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, timeout) connection.expire(self.key, timeout)
connection.hset(self.key, 'last_heartbeat', utcformat(utcnow())) connection.hset(self.key, 'last_heartbeat', utcformat(utcnow()))
@ -689,10 +689,10 @@ class Worker(object):
except HorseMonitorTimeoutException: except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running. # Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive. # Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 30) self.heartbeat(self.job_monitoring_interval + 60)
# Kill the job from this side if something is really wrong (interpreter lock/etc). # Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 1): if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60):
self.kill_horse() self.kill_horse()
break break
@ -776,7 +776,7 @@ class Worker(object):
timeout = job.timeout or 180 timeout = job.timeout or 180
if heartbeat_ttl is None: if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 5 heartbeat_ttl = self.job_monitoring_interval + 60
with self.connection.pipeline() as pipeline: with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
@ -1000,7 +1000,7 @@ class SimpleWorker(Worker):
def execute_job(self, job, queue): def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()""" """Execute job in same thread/process, do not fork()"""
timeout = (job.timeout or DEFAULT_WORKER_TTL) + 5 timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60
return self.perform_job(job, queue, heartbeat_ttl=timeout) return self.perform_job(job, queue, heartbeat_ttl=timeout)

@ -1084,7 +1084,7 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
now = utcnow() now = utcnow()
w.monitor_work_horse(job) w.monitor_work_horse(job)
fudge_factor = 1 fudge_factor = 1
total_time = w.job_monitoring_interval + 5 + fudge_factor total_time = w.job_monitoring_interval + 65 + fudge_factor
self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertTrue((utcnow() - now).total_seconds() < total_time)
self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertEqual(job.get_status(), JobStatus.FAILED)
failed_job_registry = FailedJobRegistry(queue=fooq) failed_job_registry = FailedJobRegistry(queue=fooq)

Loading…
Cancel
Save