From 8414a67d65176a131cb13db04e1c246df917b288 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 16 Dec 2013 10:59:42 +0100 Subject: [PATCH 1/3] Fix bug where worker died from monitoring views. Fixes #288. Conflicts: rq/worker.py --- rq/worker.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3ba4250..1f01b00 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -326,9 +326,9 @@ class Worker(object): self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) - self.connection.expire(self.key, (job.timeout or 180) + 60) + self.heartbeat((job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) - self.connection.expire(self.key, self.default_worker_ttl) + self.heartbeat() did_perform_work = True finally: @@ -338,16 +338,34 @@ class Worker(object): def dequeue_job_and_maintain_ttl(self, timeout): + result = None while True: + self.heartbeat() try: - return Queue.dequeue_any(self.queues, timeout, - connection=self.connection) + result = Queue.dequeue_any(self.queues, timeout, + connection=self.connection) + break except DequeueTimeout: pass - self.log.debug('Sending heartbeat to prevent worker timeout.') - self.connection.expire(self.key, self.default_worker_ttl) + self.heartbeat() + return result + + def heartbeat(self, timeout=0): + """Specifies a new worker timeout, typically by extending the + expiration time of the worker, effectively making this a "heartbeat" + to not expire the worker until the timeout passes. + The next heartbeat should come before this time, or the worker will + die (at least from the monitoring dashboards). + + The effective timeout can never be shorter than default_worker_ttl, + only larger. + """ + timeout = max(timeout, self.default_worker_ttl) + self.connection.expire(self.key, timeout) + self.log.debug('Sent heartbeat to prevent worker timeout. ' + 'Next one should arrive within {} seconds.'.format(timeout)) def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. From d0fa35dc666296f9ad824f013f0f22ba6517f0e7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 16 Dec 2013 11:17:22 +0100 Subject: [PATCH 2/3] Python 2.6 fix. --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 1f01b00..8cf9d14 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -365,7 +365,7 @@ class Worker(object): timeout = max(timeout, self.default_worker_ttl) self.connection.expire(self.key, timeout) self.log.debug('Sent heartbeat to prevent worker timeout. ' - 'Next one should arrive within {} seconds.'.format(timeout)) + 'Next one should arrive within {0} seconds.'.format(timeout)) def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job. From b5e1377023a725f9796c1c7d025ca09771a54aa1 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Mon, 16 Dec 2013 11:01:18 +0100 Subject: [PATCH 3/3] Bump version to 0.3.12. Conflicts: CHANGES.md --- CHANGES.md | 7 +++++++ rq/version.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 7c7662d..184dda9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,10 @@ +### 0.3.12 +(December 16th, 2013) + +- Bug fix where a worker could time out before the job was done, removing it + from any monitor overviews (#288). + + ### 0.3.11 (August 23th, 2013) diff --git a/rq/version.py b/rq/version.py index efdcdac..3b59d7f 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.11' +VERSION = '0.3.12'