From ed264f08bb53581bf4b2eb3f1e013b9615c37c52 Mon Sep 17 00:00:00 2001 From: Ruslan Mullakhmetov Date: Mon, 26 Oct 2020 14:42:02 +0100 Subject: [PATCH] feat: added job heartbeat to track whether job is actually executing (#1349) * feat: added job heartbeat to track whether job is actually executing heartbeat might be needed in cases when worker was hardkilled or the whole VM/docker was forcibly rebooted. * fixed tests * fixed test coverage issue * chore: renamed job.heartbeat stuff according to review feedback * chore: pipelined worker heartbeat and job heartbeat * docs: documented job.heartbeat property * fixes after review * docs: updated last_heartbeat description * chore: review Co-authored-by: Ruslan Mullakhmetov --- docs/docs/jobs.md | 1 + rq/job.py | 8 ++++++++ rq/worker.py | 8 +++++++- tests/test_job.py | 13 ++++++++++--- 4 files changed, 26 insertions(+), 4 deletions(-) diff --git a/docs/docs/jobs.md b/docs/docs/jobs.md index 4720b63..7ede815 100644 --- a/docs/docs/jobs.md +++ b/docs/docs/jobs.md @@ -121,6 +121,7 @@ Some interesting job attributes include: * `job.started_at` * `job.ended_at` * `job.exc_info` stores exception information if job doesn't finish successfully. +* `job.last_heartbeat` the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active. If you want to efficiently fetch a large number of jobs, use `Job.fetch_many()`. diff --git a/rq/job.py b/rq/job.py index 6084d92..868a44b 100644 --- a/rq/job.py +++ b/rq/job.py @@ -351,6 +351,7 @@ class Job(object): # retry_intervals is a list of int e.g [60, 120, 240] self.retry_intervals = None self.redis_server_version = None + self.last_heartbeat = None def __repr__(self): # noqa # pragma: no cover return '{0}({1!r}, enqueued_at={2!r})'.format(self.__class__.__name__, @@ -384,6 +385,11 @@ class Job(object): raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value + def heartbeat(self, heartbeat, pipeline=None): + self.last_heartbeat = heartbeat + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) + id = property(get_id, set_id) @classmethod @@ -477,6 +483,7 @@ class Job(object): self.enqueued_at = str_to_date(obj.get('enqueued_at')) self.started_at = str_to_date(obj.get('started_at')) self.ended_at = str_to_date(obj.get('ended_at')) + self.last_heartbeat = str_to_date(obj.get('last_heartbeat')) result = obj.get('result') if result: try: @@ -530,6 +537,7 @@ class Job(object): 'data': zlib.compress(self.data), 'started_at': utcformat(self.started_at) if self.started_at else '', 'ended_at': utcformat(self.ended_at) if self.ended_at else '', + 'last_heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', } if self.retries_left is not None: diff --git a/rq/worker.py b/rq/worker.py index 13ed298..cfbc675 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -751,14 +751,19 @@ class Worker(object): except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. - self.heartbeat(self.job_monitoring_interval + 60) # Kill the job from this side if something is really wrong (interpreter lock/etc). if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60): + self.heartbeat(self.job_monitoring_interval + 60) self.kill_horse() self.wait_for_horse() break + with self.connection.pipeline() as pipeline: + self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) + job.heartbeat(utcnow(), pipeline=pipeline) + pipeline.execute() + except OSError as e: # In case we encountered an OSError due to EINTR (which is # caused by a SIGINT or SIGTERM signal during @@ -853,6 +858,7 @@ class Worker(object): job_class=self.job_class) registry.add(job, timeout, pipeline=pipeline) job.set_status(JobStatus.STARTED, pipeline=pipeline) + job.heartbeat(utcnow(), pipeline=pipeline) pipeline.hset(job.key, 'started_at', utcformat(utcnow())) pipeline.execute() diff --git a/tests/test_job.py b/tests/test_job.py index a993c60..2987c5b 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -222,8 +222,15 @@ class TestJob(RQTestCase): # ... and no other keys are stored self.assertEqual( sorted(self.testconn.hkeys(job.key)), - [b'created_at', b'data', b'description', b'ended_at', b'started_at']) - + [b'created_at', b'data', b'description', b'ended_at', b'last_heartbeat', b'started_at']) + + self.assertEqual(job.last_heartbeat, None) + self.assertEqual(job.last_heartbeat, None) + + ts = utcnow() + job.heartbeat(ts) + self.assertEqual(job.last_heartbeat, ts) + def test_persistence_of_retry_data(self): """Retry related data is stored and restored properly""" job = Job.create(func=fixtures.some_calculation) @@ -979,4 +986,4 @@ class TestJob(RQTestCase): self.assertEqual(job.get_retry_interval(), 2) job.retries_left = 1 self.assertEqual(job.get_retry_interval(), 3) - \ No newline at end of file +