diff --git a/CHANGES.md b/CHANGES.md index 32b1fc5..f4943a9 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -19,6 +19,13 @@ - Fix: `Queue.empty()` now correctly deletes job hashes from Redis. +### 0.3.12 +(December 16th, 2013) + +- Bug fix where a worker could time out before the job was done, removing it + from any monitor overviews (#288). + + ### 0.3.11 (August 23th, 2013) diff --git a/rq/version.py b/rq/version.py index efdcdac..3b59d7f 100644 --- a/rq/version.py +++ b/rq/version.py @@ -1 +1 @@ -VERSION = '0.3.11' +VERSION = '0.3.12' diff --git a/rq/worker.py b/rq/worker.py index 6a32f8a..6a56da0 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -322,9 +322,9 @@ class Worker(object): self.log.info('%s: %s (%s)' % (green(queue.name), blue(job.description), job.id)) - self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) + self.heartbeat((job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.fork_and_perform_job(job) - self.connection.expire(self.key, self.default_worker_ttl) + self.heartbeat() if job.status == Status.FINISHED: queue.enqueue_dependents(job) @@ -335,15 +335,34 @@ class Worker(object): return did_perform_work def dequeue_job_and_maintain_ttl(self, timeout): + result = None while True: + self.heartbeat() try: - return Queue.dequeue_any(self.queues, timeout, - connection=self.connection) + result = Queue.dequeue_any(self.queues, timeout, + connection=self.connection) + break except DequeueTimeout: pass - self.log.debug('Sending heartbeat to prevent worker timeout.') - self.connection.expire(self.key, self.default_worker_ttl) + self.heartbeat() + return result + + def heartbeat(self, timeout=0): + """Specifies a new worker timeout, typically by extending the + expiration time of the worker, effectively making this a "heartbeat" + to not expire the worker until the timeout passes. + + The next heartbeat should come before this time, or the worker will + die (at least from the monitoring dashboards). + + The effective timeout can never be shorter than default_worker_ttl, + only larger. + """ + timeout = max(timeout, self.default_worker_ttl) + self.connection.expire(self.key, timeout) + self.log.debug('Sent heartbeat to prevent worker timeout. ' + 'Next one should arrive within {0} seconds.'.format(timeout)) def fork_and_perform_job(self, job): """Spawns a work horse to perform the actual work and passes it a job.