From 76ac0afbcd07c2a13b17ba92ddb63ca44e3b25aa Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Tue, 20 Apr 2021 12:05:26 +0300 Subject: [PATCH] Cleanup zombie worker leftovers as part of StartedJobRegistry's cleanup() (#1372) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cleanup jobs that are not really running due to zombie workers * remove registry entries for zombie jobs * return only the job ids on cleanup * test zombie job cleanup * format code * rename variable to explain that second element in tuple is expiry, not score * remove worker_key * detect zombie jobs using old heartbeats * reuse get_expired_job_ids * set score using current_timestamp * test idle jobs using stale heartbeats * extract timeout into variable * move heartbeats into StartedJobRegistry * use registry.heartbeat in tests * remove heartbeats when job removed from StartedJobRegistry * remove idle and expired jobs from both wip and heartbeats set * send heartbeat_ttl to registry.add * typo * revert everything 😶 * only keep job heartbeats as score (and get rid of job timeouts as scores * calculate heartbeat_ttl in an overrideable function + override it in SimpleWorker + move storing StartedJobRegistry scores to job.heartbeat() * set heartbeat to monitoring interval for infinite timeouts * track elapsed_execution_time as part of worker * reset current job working time when work on a job is done * persisting the job working time as part of monitoring --- rq/job.py | 17 ++++++++++---- rq/worker.py | 55 ++++++++++++++++++++++++++++---------------- tests/test_job.py | 2 +- tests/test_worker.py | 16 ------------- 4 files changed, 48 insertions(+), 42 deletions(-) diff --git a/rq/job.py b/rq/job.py index 2c0f596..2296b9a 100644 --- a/rq/job.py +++ b/rq/job.py @@ -353,7 +353,7 @@ class Job(object): self.ttl = None self.worker_name = None self._status = None - self._dependency_ids = [] + self._dependency_ids = [] self.meta = {} self.serializer = resolve_serializer(serializer) self.retries_left = None @@ -394,10 +394,11 @@ class Job(object): raise TypeError('id must be a string, not {0}'.format(type(value))) self._id = value - def heartbeat(self, heartbeat, pipeline=None): + def heartbeat(self, heartbeat, ttl, pipeline=None): self.last_heartbeat = heartbeat connection = pipeline if pipeline is not None else self.connection connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat)) + self.started_job_registry.add(self, ttl, pipeline=pipeline) id = property(get_id, set_id) @@ -783,12 +784,18 @@ class Job(object): connection.expire(self.dependents_key, ttl) connection.expire(self.dependencies_key, ttl) + @property + def started_job_registry(self): + from .registry import StartedJobRegistry + return StartedJobRegistry(self.origin, connection=self.connection, + job_class=self.__class__) + @property def failed_job_registry(self): from .registry import FailedJobRegistry return FailedJobRegistry(self.origin, connection=self.connection, job_class=self.__class__) - + def get_retry_interval(self): """Returns the desired retry interval. If number of retries is bigger than length of intervals, the first @@ -875,7 +882,7 @@ class Retry(object): super().__init__() if max < 1: raise ValueError('max: please enter a value greater than 0') - + if isinstance(interval, int): if interval < 0: raise ValueError('interval: negative numbers are not allowed') @@ -885,6 +892,6 @@ class Retry(object): if i < 0: raise ValueError('interval: negative numbers are not allowed') intervals = interval - + self.max = max self.intervals = intervals diff --git a/rq/worker.py b/rq/worker.py index cb1ebf4..e211766 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -218,6 +218,7 @@ class Worker(object): self.successful_job_count = 0 self.failed_job_count = 0 self.total_working_time = 0 + self.current_job_working_time = 0 self.birth_date = None self.scheduler = None self.pubsub = None @@ -376,6 +377,11 @@ class Worker(object): state = property(_get_state, _set_state) + def set_current_job_working_time(self, current_job_working_time, pipeline=None): + self.current_job_working_time = current_job_working_time + connection = pipeline if pipeline is not None else self.connection + connection.hset(self.key, 'current_job_working_time', current_job_working_time) + def set_current_job_id(self, job_id, pipeline=None): connection = pipeline if pipeline is not None else self.connection @@ -700,10 +706,10 @@ class Worker(object): data = self.connection.hmget( self.key, 'queues', 'state', 'current_job', 'last_heartbeat', 'birth', 'failed_job_count', 'successful_job_count', - 'total_working_time', 'hostname', 'pid', 'version', 'python_version', + 'total_working_time', 'current_job_working_time', 'hostname', 'pid', 'version', 'python_version', ) (queues, state, job_id, last_heartbeat, birth, failed_job_count, - successful_job_count, total_working_time, hostname, pid, version, python_version) = data + successful_job_count, total_working_time, current_job_working_time, hostname, pid, version, python_version) = data queues = as_text(queues) self.hostname = as_text(hostname) self.pid = int(pid) if pid else None @@ -725,6 +731,8 @@ class Worker(object): self.successful_job_count = int(as_text(successful_job_count)) if total_working_time: self.total_working_time = float(as_text(total_working_time)) + if current_job_working_time: + self.current_job_working_time = float(as_text(current_job_working_time)) if queues: self.queues = [self.queue_class(queue, @@ -758,6 +766,13 @@ class Worker(object): self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) + def get_heartbeat_ttl(self, job): + if job.timeout and job.timeout > 0: + remaining_execution_time = job.timeout - self.current_job_working_time + return min(remaining_execution_time, self.job_monitoring_interval) + 60 + else: + return self.job_monitoring_interval + 60 + def monitor_work_horse(self, job, queue): """The worker will monitor the work horse and make sure that it either executes successfully or the status of the job is set to @@ -774,9 +789,10 @@ class Worker(object): except HorseMonitorTimeoutException: # Horse has not exited yet and is still running. # Send a heartbeat to keep the worker alive. + self.set_current_job_working_time((utcnow() - job.started_at).total_seconds()) # Kill the job from this side if something is really wrong (interpreter lock/etc). - if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60): + if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): self.heartbeat(self.job_monitoring_interval + 60) self.kill_horse() self.wait_for_horse() @@ -784,7 +800,8 @@ class Worker(object): with self.connection.pipeline() as pipeline: self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) - job.heartbeat(utcnow(), pipeline=pipeline) + ttl = self.get_heartbeat_ttl(job) + job.heartbeat(utcnow(), ttl, pipeline=pipeline) pipeline.execute() except OSError as e: @@ -799,6 +816,7 @@ class Worker(object): # Send a heartbeat to keep the worker alive. self.heartbeat() + self.set_current_job_working_time(0) self._horse_pid = 0 # Set horse PID to 0, horse has finished working if ret_val == os.EX_OK: # The process exited normally. return @@ -867,25 +885,20 @@ class Worker(object): signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGTERM, signal.SIG_DFL) - def prepare_job_execution(self, job, heartbeat_ttl=None): + def prepare_job_execution(self, job): """Performs misc bookkeeping like updating states prior to job execution. """ - if job.timeout == -1: - timeout = -1 - else: - timeout = job.timeout or 180 - - if heartbeat_ttl is None: - heartbeat_ttl = self.job_monitoring_interval + 60 with self.connection.pipeline() as pipeline: self.set_state(WorkerStatus.BUSY, pipeline=pipeline) self.set_current_job_id(job.id, pipeline=pipeline) + self.set_current_job_working_time(0, pipeline=pipeline) + + heartbeat_ttl = self.get_heartbeat_ttl(job) self.heartbeat(heartbeat_ttl, pipeline=pipeline) - registry = StartedJobRegistry(job.origin, self.connection, - job_class=self.job_class) - registry.add(job, timeout, pipeline=pipeline) + job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline) + job.prepare_for_execution(self.name, pipeline=pipeline) pipeline.execute() @@ -991,7 +1004,7 @@ class Worker(object): except redis.exceptions.WatchError: continue - def perform_job(self, job, queue, heartbeat_ttl=None): + def perform_job(self, job, queue): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ @@ -1000,7 +1013,7 @@ class Worker(object): started_job_registry = queue.started_job_registry try: - self.prepare_job_execution(job, heartbeat_ttl) + self.prepare_job_execution(job) job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT @@ -1115,12 +1128,14 @@ class SimpleWorker(Worker): def execute_job(self, job, queue): """Execute job in same thread/process, do not fork()""" + return self.perform_job(job, queue) + + def get_heartbeat_ttl(self, job): # "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL. if job.timeout == -1: - timeout = DEFAULT_WORKER_TTL + return DEFAULT_WORKER_TTL else: - timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60 - return self.perform_job(job, queue, heartbeat_ttl=timeout) + return (job.timeout or DEFAULT_WORKER_TTL) + 60 class HerokuWorker(Worker): diff --git a/tests/test_job.py b/tests/test_job.py index fa7eff1..a65ca86 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -219,7 +219,7 @@ class TestJob(RQTestCase): self.assertEqual(job.last_heartbeat, None) ts = utcnow() - job.heartbeat(ts) + job.heartbeat(ts, 0) self.assertEqual(job.last_heartbeat, ts) def test_persistence_of_retry_data(self): diff --git a/tests/test_worker.py b/tests/test_worker.py index c7d9ae6..1f11bfb 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -772,22 +772,6 @@ class TestWorker(RQTestCase): self.assertEqual(job._status, JobStatus.STARTED) self.assertEqual(job.worker_name, worker.name) - def test_prepare_job_execution_inf_timeout(self): - """Prepare job execution handles infinite job timeout""" - queue = Queue(connection=self.testconn) - job = queue.enqueue(long_running_job, - args=(1,), - job_timeout=-1) - worker = Worker([queue]) - worker.prepare_job_execution(job) - - # Updates working queue - registry = StartedJobRegistry(connection=self.testconn) - self.assertEqual(registry.get_job_ids(), [job.id]) - - # Score in queue is +inf - self.assertEqual(self.testconn.zscore(registry.key, job.id), float('Inf')) - def test_work_unicode_friendly(self): """Worker processes work with unicode description, then quits.""" q = Queue('foo')