diff --git a/rq/worker.py b/rq/worker.py index 11ea290..cac6bdb 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -810,11 +810,7 @@ class Worker: self.wait_for_horse() break - with self.connection.pipeline() as pipeline: - self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) - ttl = self.get_heartbeat_ttl(job) - job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True) - pipeline.execute() + self.maintain_heartbeats(job) except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -832,7 +828,9 @@ class Worker: self._horse_pid = 0 # Set horse PID to 0, horse has finished working if ret_val == os.EX_OK: # The process exited normally. return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired return elif job_status == JobStatus.STOPPED: @@ -869,6 +867,27 @@ class Worker: self.monitor_work_horse(job, queue) self.set_state(WorkerStatus.IDLE) + def maintain_heartbeats(self, job): + """Updates worker and job's last heartbeat field. If job was + enqueued with `result_ttl=0`, a race condition could happen where this heartbeat + arrives after job has been deleted, leaving a job key that contains only + `last_heartbeat` field. + + hset() is used when updating job's timestamp. This command returns 1 if a new + Redis key is created, 0 otherwise. So in this case we check the return of job's + heartbeat() command. If a new key was created, this means the job was already + deleted. In this case, we simply send another delete command to remove the key. + + https://github.com/rq/rq/issues/1450 + """ + with self.connection.pipeline() as pipeline: + self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) + ttl = self.get_heartbeat_ttl(job) + job.heartbeat(utcnow(), ttl, pipeline=pipeline, xx=True) + results = pipeline.execute() + if results[2] == 1: + self.connection.delete(job.key) + def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse.""" # After fork()'ing, always assure we are generating random sequences diff --git a/tests/test_worker.py b/tests/test_worker.py index 327c617..0e944c2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -317,6 +317,20 @@ class TestWorker(RQTestCase): self.testconn.hdel(w.key, 'birth') w.refresh() + def test_maintain_heartbeats(self): + """worker.maintain_heartbeats() shouldn't create new job keys""" + queue = Queue(connection=self.testconn) + worker = Worker([queue], connection=self.testconn) + job = queue.enqueue(say_hello) + worker.maintain_heartbeats(job) + self.assertTrue(self.testconn.exists(worker.key)) + self.assertTrue(self.testconn.exists(job.key)) + + self.testconn.delete(job.key) + + worker.maintain_heartbeats(job) + self.assertFalse(self.testconn.exists(job.key)) + @slow def test_heartbeat_survives_lost_connection(self): with mock.patch.object(Worker, 'heartbeat') as mocked: @@ -1202,23 +1216,29 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): """ fooq = Queue('foo') self.assertEqual(fooq.count, 0) - w = Worker(fooq) + w = Worker([fooq], job_monitoring_interval=1) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' if os.path.exists(sentinel_file): os.remove(sentinel_file) - fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100) - job, queue = w.dequeue_job_and_maintain_ttl(5) + + job = fooq.enqueue(launch_process_within_worker_and_store_pid, sentinel_file, 100) + + _, queue = w.dequeue_job_and_maintain_ttl(5) + w.prepare_job_execution(job) w.fork_work_horse(job, queue) job.timeout = 5 - w.job_monitoring_interval = 1 - now = utcnow() + time.sleep(1) with open(sentinel_file) as f: subprocess_pid = int(f.read().strip()) self.assertTrue(psutil.pid_exists(subprocess_pid)) + w.monitor_work_horse(job, queue) fudge_factor = 1 total_time = w.job_monitoring_interval + 65 + fudge_factor + + now = utcnow() self.assertTrue((utcnow() - now).total_seconds() < total_time) self.assertEqual(job.get_status(), JobStatus.FAILED) failed_job_registry = FailedJobRegistry(queue=fooq)