Fixes a bug that causes leftover job keys when result_ttl=0 (#1591)

* Fixes a bug that causes leftover job keys when result_ttl=0

* Fixed a buggy worker.maintain_heartbeats() behavior

* Fixed a bug in worker.maintain_heartbeats().
main
Selwin Ong 3 years ago committed by GitHub
parent 76ba690aaf
commit 0147b30f2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -810,11 +810,7 @@ class Worker:
self.wait_for_horse() self.wait_for_horse()
break break
with self.connection.pipeline() as pipeline: self.maintain_heartbeats(job)
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
pipeline.execute()
except OSError as e: except OSError as e:
# In case we encountered an OSError due to EINTR (which is # In case we encountered an OSError due to EINTR (which is
@ -832,7 +828,9 @@ class Worker:
self._horse_pid = 0 # Set horse PID to 0, horse has finished working self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally. if ret_val == os.EX_OK: # The process exited normally.
return return
job_status = job.get_status() job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired if job_status is None: # Job completed and its ttl has expired
return return
elif job_status == JobStatus.STOPPED: elif job_status == JobStatus.STOPPED:
@ -869,6 +867,27 @@ class Worker:
self.monitor_work_horse(job, queue) self.monitor_work_horse(job, queue)
self.set_state(WorkerStatus.IDLE) self.set_state(WorkerStatus.IDLE)
def maintain_heartbeats(self, job):
"""Updates worker and job's last heartbeat field. If job was
enqueued with `result_ttl=0`, a race condition could happen where this heartbeat
arrives after job has been deleted, leaving a job key that contains only
`last_heartbeat` field.
hset() is used when updating job's timestamp. This command returns 1 if a new
Redis key is created, 0 otherwise. So in this case we check the return of job's
heartbeat() command. If a new key was created, this means the job was already
deleted. In this case, we simply send another delete command to remove the key.
https://github.com/rq/rq/issues/1450
"""
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True)
results = pipeline.execute()
if results[2] == 1:
self.connection.delete(job.key)
def main_work_horse(self, job, queue): def main_work_horse(self, job, queue):
"""This is the entry point of the newly spawned work horse.""" """This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences # After fork()'ing, always assure we are generating random sequences

@ -317,6 +317,20 @@ class TestWorker(RQTestCase):
self.testconn.hdel(w.key, 'birth') self.testconn.hdel(w.key, 'birth')
w.refresh() w.refresh()
def test_maintain_heartbeats(self):
"""worker.maintain_heartbeats() shouldn't create new job keys"""
queue = Queue(connection=self.testconn)
worker = Worker([queue], connection=self.testconn)
job = queue.enqueue(say_hello)
worker.maintain_heartbeats(job)
self.assertTrue(self.testconn.exists(worker.key))
self.assertTrue(self.testconn.exists(job.key))
self.testconn.delete(job.key)
worker.maintain_heartbeats(job)
self.assertFalse(self.testconn.exists(job.key))
@slow @slow
def test_heartbeat_survives_lost_connection(self): def test_heartbeat_survives_lost_connection(self):
with mock.patch.object(Worker, 'heartbeat') as mocked: with mock.patch.object(Worker, 'heartbeat') as mocked:
@ -1202,23 +1216,29 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
""" """
fooq = Queue('foo') fooq = Queue('foo')
self.assertEqual(fooq.count, 0) self.assertEqual(fooq.count, 0)
w = Worker(fooq) w = Worker([fooq], job_monitoring_interval=1)
sentinel_file = '/tmp/.rq_sentinel_work_horse_death' sentinel_file = '/tmp/.rq_sentinel_work_horse_death'
if os.path.exists(sentinel_file): if os.path.exists(sentinel_file):
os.remove(sentinel_file) os.remove(sentinel_file)
fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
job, queue = w.dequeue_job_and_maintain_ttl(5) job = fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100)
_, queue = w.dequeue_job_and_maintain_ttl(5)
w.prepare_job_execution(job)
w.fork_work_horse(job, queue) w.fork_work_horse(job, queue)
job.timeout = 5 job.timeout = 5
w.job_monitoring_interval = 1
now = utcnow()
time.sleep(1) time.sleep(1)
with open(sentinel_file) as f: with open(sentinel_file) as f:
subprocess_pid = int(f.read().strip()) subprocess_pid = int(f.read().strip())
self.assertTrue(psutil.pid_exists(subprocess_pid)) self.assertTrue(psutil.pid_exists(subprocess_pid))
w.monitor_work_horse(job, queue) w.monitor_work_horse(job, queue)
fudge_factor = 1 fudge_factor = 1
total_time = w.job_monitoring_interval + 65 + fudge_factor total_time = w.job_monitoring_interval + 65 + fudge_factor
now = utcnow()
self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertTrue((utcnow() - now).total_seconds() < total_time)
self.assertEqual(job.get_status(), JobStatus.FAILED) self.assertEqual(job.get_status(), JobStatus.FAILED)
failed_job_registry = FailedJobRegistry(queue=fooq) failed_job_registry = FailedJobRegistry(queue=fooq)

Loading…
Cancel
Save