Cleanup zombie worker leftovers as part of StartedJobRegistry's cleanup() (#1372)

* cleanup jobs that are not really running due to zombie workers

* remove registry entries for zombie jobs

* return only the job ids on cleanup

* test zombie job cleanup

* format code

* rename variable to explain that second element in tuple is expiry, not score

* remove worker_key

* detect zombie jobs using old heartbeats

* reuse get_expired_job_ids

* set score using current_timestamp

* test idle jobs using stale heartbeats

* extract timeout into variable

* move heartbeats into StartedJobRegistry

* use registry.heartbeat in tests

* remove heartbeats when job removed from StartedJobRegistry

* remove idle and expired jobs from both wip and heartbeats set

* send heartbeat_ttl to registry.add

* typo

* revert everything 😶

* only keep job heartbeats as score (and get rid of job timeouts as scores

* calculate heartbeat_ttl in an overrideable function + override it in SimpleWorker + move storing StartedJobRegistry scores to job.heartbeat()

* set heartbeat to monitoring interval for infinite timeouts

* track elapsed_execution_time as part of worker

* reset current job working time when work on a job is done

* persisting the job working time as part of monitoring
main
Omer Lachish 4 years ago committed by GitHub
parent a3ed2db4ea
commit 76ac0afbcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -353,7 +353,7 @@ class Job(object):
self.ttl = None
self.worker_name = None
self._status = None
self._dependency_ids = []
self._dependency_ids = []
self.meta = {}
self.serializer = resolve_serializer(serializer)
self.retries_left = None
@ -394,10 +394,11 @@ class Job(object):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value
def heartbeat(self, heartbeat, pipeline=None):
def heartbeat(self, heartbeat, ttl, pipeline=None):
self.last_heartbeat = heartbeat
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
self.started_job_registry.add(self, ttl, pipeline=pipeline)
id = property(get_id, set_id)
@ -783,12 +784,18 @@ class Job(object):
connection.expire(self.dependents_key, ttl)
connection.expire(self.dependencies_key, ttl)
@property
def started_job_registry(self):
from .registry import StartedJobRegistry
return StartedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)
@property
def failed_job_registry(self):
from .registry import FailedJobRegistry
return FailedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)
def get_retry_interval(self):
"""Returns the desired retry interval.
If number of retries is bigger than length of intervals, the first
@ -875,7 +882,7 @@ class Retry(object):
super().__init__()
if max < 1:
raise ValueError('max: please enter a value greater than 0')
if isinstance(interval, int):
if interval < 0:
raise ValueError('interval: negative numbers are not allowed')
@ -885,6 +892,6 @@ class Retry(object):
if i < 0:
raise ValueError('interval: negative numbers are not allowed')
intervals = interval
self.max = max
self.intervals = intervals

@ -218,6 +218,7 @@ class Worker(object):
self.successful_job_count = 0
self.failed_job_count = 0
self.total_working_time = 0
self.current_job_working_time = 0
self.birth_date = None
self.scheduler = None
self.pubsub = None
@ -376,6 +377,11 @@ class Worker(object):
state = property(_get_state, _set_state)
def set_current_job_working_time(self, current_job_working_time, pipeline=None):
self.current_job_working_time = current_job_working_time
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'current_job_working_time', current_job_working_time)
def set_current_job_id(self, job_id, pipeline=None):
connection = pipeline if pipeline is not None else self.connection
@ -700,10 +706,10 @@ class Worker(object):
data = self.connection.hmget(
self.key, 'queues', 'state', 'current_job', 'last_heartbeat',
'birth', 'failed_job_count', 'successful_job_count',
'total_working_time', 'hostname', 'pid', 'version', 'python_version',
'total_working_time', 'current_job_working_time', 'hostname', 'pid', 'version', 'python_version',
)
(queues, state, job_id, last_heartbeat, birth, failed_job_count,
successful_job_count, total_working_time, hostname, pid, version, python_version) = data
successful_job_count, total_working_time, current_job_working_time, hostname, pid, version, python_version) = data
queues = as_text(queues)
self.hostname = as_text(hostname)
self.pid = int(pid) if pid else None
@ -725,6 +731,8 @@ class Worker(object):
self.successful_job_count = int(as_text(successful_job_count))
if total_working_time:
self.total_working_time = float(as_text(total_working_time))
if current_job_working_time:
self.current_job_working_time = float(as_text(current_job_working_time))
if queues:
self.queues = [self.queue_class(queue,
@ -758,6 +766,13 @@ class Worker(object):
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
def get_heartbeat_ttl(self, job):
if job.timeout and job.timeout > 0:
remaining_execution_time = job.timeout - self.current_job_working_time
return min(remaining_execution_time, self.job_monitoring_interval) + 60
else:
return self.job_monitoring_interval + 60
def monitor_work_horse(self, job, queue):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
@ -774,9 +789,10 @@ class Worker(object):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60):
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60):
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
@ -784,7 +800,8 @@ class Worker(object):
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline)
pipeline.execute()
except OSError as e:
@ -799,6 +816,7 @@ class Worker(object):
# Send a heartbeat to keep the worker alive.
self.heartbeat()
self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return
@ -867,25 +885,20 @@ class Worker(object):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
def prepare_job_execution(self, job, heartbeat_ttl=None):
def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
if job.timeout == -1:
timeout = -1
else:
timeout = job.timeout or 180
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 60
with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.set_current_job_working_time(0, pipeline=pipeline)
heartbeat_ttl = self.get_heartbeat_ttl(job)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, timeout, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()
@ -991,7 +1004,7 @@ class Worker(object):
except redis.exceptions.WatchError:
continue
def perform_job(self, job, queue, heartbeat_ttl=None):
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
@ -1000,7 +1013,7 @@ class Worker(object):
started_job_registry = queue.started_job_registry
try:
self.prepare_job_execution(job, heartbeat_ttl)
self.prepare_job_execution(job)
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
@ -1115,12 +1128,14 @@ class SimpleWorker(Worker):
def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()"""
return self.perform_job(job, queue)
def get_heartbeat_ttl(self, job):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1:
timeout = DEFAULT_WORKER_TTL
return DEFAULT_WORKER_TTL
else:
timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60
return self.perform_job(job, queue, heartbeat_ttl=timeout)
return (job.timeout or DEFAULT_WORKER_TTL) + 60
class HerokuWorker(Worker):

@ -219,7 +219,7 @@ class TestJob(RQTestCase):
self.assertEqual(job.last_heartbeat, None)
ts = utcnow()
job.heartbeat(ts)
job.heartbeat(ts, 0)
self.assertEqual(job.last_heartbeat, ts)
def test_persistence_of_retry_data(self):

@ -772,22 +772,6 @@ class TestWorker(RQTestCase):
self.assertEqual(job._status, JobStatus.STARTED)
self.assertEqual(job.worker_name, worker.name)
def test_prepare_job_execution_inf_timeout(self):
"""Prepare job execution handles infinite job timeout"""
queue = Queue(connection=self.testconn)
job = queue.enqueue(long_running_job,
args=(1,),
job_timeout=-1)
worker = Worker([queue])
worker.prepare_job_execution(job)
# Updates working queue
registry = StartedJobRegistry(connection=self.testconn)
self.assertEqual(registry.get_job_ids(), [job.id])
# Score in queue is +inf
self.assertEqual(self.testconn.zscore(registry.key, job.id), float('Inf'))
def test_work_unicode_friendly(self):
"""Worker processes work with unicode description, then quits."""
q = Queue('foo')

Loading…
Cancel
Save