feat: added job heartbeat to track whether job is actually executing (#1349)

* feat: added job heartbeat to track whether job is actually executing

heartbeat might be needed in cases when worker was hardkilled or the whole VM/docker was forcibly rebooted.

* fixed tests

* fixed test coverage issue

* chore: renamed job.heartbeat stuff according to review feedback

* chore: pipelined worker heartbeat and job heartbeat

* docs: documented job.heartbeat property

* fixes after review

* docs: updated last_heartbeat description

* chore: review

Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
main
Ruslan Mullakhmetov 4 years ago committed by GitHub
parent a721db34b1
commit ed264f08bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -121,6 +121,7 @@ Some interesting job attributes include:
* `job.started_at`
* `job.ended_at`
* `job.exc_info` stores exception information if job doesn't finish successfully.
* `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active.
If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`.

@ -351,6 +351,7 @@ class Job(object):
# retry_intervals is a list of int e.g [60, 120, 240]
self.retry_intervals = None
self.redis_server_version = None
self.last_heartbeat = None
def __repr__(self): # noqa # pragma: no cover
return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__,
@ -384,6 +385,11 @@ class Job(object):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
def heartbeat(self, heartbeat, pipeline=None):
self.last_heartbeat = heartbeat
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
id = property(get_id, set_id)
@classmethod
@ -477,6 +483,7 @@ class Job(object):
self.enqueued_at = str_to_date(obj.get('enqueued_at'))
self.started_at = str_to_date(obj.get('started_at'))
self.ended_at = str_to_date(obj.get('ended_at'))
self.last_heartbeat = str_to_date(obj.get('last_heartbeat'))
result = obj.get('result')
if result:
try:
@ -530,6 +537,7 @@ class Job(object):
'data': zlib.compress(self.data),
'started_at': utcformat(self.started_at) if self.started_at else '',
'ended_at': utcformat(self.ended_at) if self.ended_at else '',
'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '',
}
if self.retries_left is not None:

@ -751,14 +751,19 @@ class Worker(object):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 60)
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60):
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
pipeline.execute()
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
@ -853,6 +858,7 @@ class Worker(object):
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.set_status(JobStatus.STARTED, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
pipeline.hset(job.key, 'started_at', utcformat(utcnow()))
pipeline.execute()

@ -222,8 +222,15 @@ class TestJob(RQTestCase):
# ... and no other keys are stored
self.assertEqual(
sorted(self.testconn.hkeys(job.key)),
[b'created_at', b'data', b'description', b'ended_at', b'started_at'])
[b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at'])
self.assertEqual(job.last_heartbeat, None)
self.assertEqual(job.last_heartbeat, None)
ts = utcnow()
job.heartbeat(ts)
self.assertEqual(job.last_heartbeat, ts)
def test_persistence_of_retry_data(self):
"""Retry related data is stored and restored properly"""
job = Job.create(func=fixtures.some_calculation)
@ -979,4 +986,4 @@ class TestJob(RQTestCase):
self.assertEqual(job.get_retry_interval(), 2)
job.retries_left = 1
self.assertEqual(job.get_retry_interval(), 3)

Loading…
Cancel
Save