Merge branch 'hotfix-0.3.12'

Conflicts:
	CHANGES.md
	rq/worker.py
main
Vincent Driessen 11 years ago
commit cf98591ca5

@ -19,6 +19,13 @@
- Fix: `Queue.empty()` now correctly deletes job hashes from Redis. - Fix: `Queue.empty()` now correctly deletes job hashes from Redis.
### 0.3.12
(December 16th, 2013)
- Bug fix where a worker could time out before the job was done, removing it
from any monitor overviews (#288).
### 0.3.11 ### 0.3.11
(August 23th, 2013) (August 23th, 2013)

@ -1 +1 @@
VERSION = '0.3.11' VERSION = '0.3.12'

@ -322,9 +322,9 @@ class Worker(object):
self.log.info('%s: %s (%s)' % (green(queue.name), self.log.info('%s: %s (%s)' % (green(queue.name),
blue(job.description), job.id)) blue(job.description), job.id))
self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60) self.heartbeat((job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl) self.heartbeat()
if job.status == Status.FINISHED: if job.status == Status.FINISHED:
queue.enqueue_dependents(job) queue.enqueue_dependents(job)
@ -335,15 +335,34 @@ class Worker(object):
return did_perform_work return did_perform_work
def dequeue_job_and_maintain_ttl(self, timeout): def dequeue_job_and_maintain_ttl(self, timeout):
result = None
while True: while True:
self.heartbeat()
try: try:
return Queue.dequeue_any(self.queues, timeout, result = Queue.dequeue_any(self.queues, timeout,
connection=self.connection) connection=self.connection)
break
except DequeueTimeout: except DequeueTimeout:
pass pass
self.log.debug('Sending heartbeat to prevent worker timeout.') self.heartbeat()
self.connection.expire(self.key, self.default_worker_ttl) return result
def heartbeat(self, timeout=0):
"""Specifies a new worker timeout, typically by extending the
expiration time of the worker, effectively making this a "heartbeat"
to not expire the worker until the timeout passes.
The next heartbeat should come before this time, or the worker will
die (at least from the monitoring dashboards).
The effective timeout can never be shorter than default_worker_ttl,
only larger.
"""
timeout = max(timeout, self.default_worker_ttl)
self.connection.expire(self.key, timeout)
self.log.debug('Sent heartbeat to prevent worker timeout. '
'Next one should arrive within {0} seconds.'.format(timeout))
def fork_and_perform_job(self, job): def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job. """Spawns a work horse to perform the actual work and passes it a job.

Loading…
Cancel
Save