|
|
@ -326,9 +326,9 @@ class Worker(object):
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
self.log.info('%s: %s (%s)' % (green(queue.name),
|
|
|
|
blue(job.description), job.id))
|
|
|
|
blue(job.description), job.id))
|
|
|
|
|
|
|
|
|
|
|
|
self.connection.expire(self.key, (job.timeout or 180) + 60)
|
|
|
|
self.heartbeat((job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
self.connection.expire(self.key, self.default_worker_ttl)
|
|
|
|
self.heartbeat()
|
|
|
|
|
|
|
|
|
|
|
|
did_perform_work = True
|
|
|
|
did_perform_work = True
|
|
|
|
finally:
|
|
|
|
finally:
|
|
|
@ -338,16 +338,34 @@ class Worker(object):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
def dequeue_job_and_maintain_ttl(self, timeout):
|
|
|
|
|
|
|
|
result = None
|
|
|
|
while True:
|
|
|
|
while True:
|
|
|
|
|
|
|
|
self.heartbeat()
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return Queue.dequeue_any(self.queues, timeout,
|
|
|
|
result = Queue.dequeue_any(self.queues, timeout,
|
|
|
|
connection=self.connection)
|
|
|
|
connection=self.connection)
|
|
|
|
|
|
|
|
break
|
|
|
|
except DequeueTimeout:
|
|
|
|
except DequeueTimeout:
|
|
|
|
pass
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
self.log.debug('Sending heartbeat to prevent worker timeout.')
|
|
|
|
self.heartbeat()
|
|
|
|
self.connection.expire(self.key, self.default_worker_ttl)
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def heartbeat(self, timeout=0):
|
|
|
|
|
|
|
|
"""Specifies a new worker timeout, typically by extending the
|
|
|
|
|
|
|
|
expiration time of the worker, effectively making this a "heartbeat"
|
|
|
|
|
|
|
|
to not expire the worker until the timeout passes.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The next heartbeat should come before this time, or the worker will
|
|
|
|
|
|
|
|
die (at least from the monitoring dashboards).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
The effective timeout can never be shorter than default_worker_ttl,
|
|
|
|
|
|
|
|
only larger.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
timeout = max(timeout, self.default_worker_ttl)
|
|
|
|
|
|
|
|
self.connection.expire(self.key, timeout)
|
|
|
|
|
|
|
|
self.log.debug('Sent heartbeat to prevent worker timeout. '
|
|
|
|
|
|
|
|
'Next one should arrive within {} seconds.'.format(timeout))
|
|
|
|
|
|
|
|
|
|
|
|
def fork_and_perform_job(self, job):
|
|
|
|
def fork_and_perform_job(self, job):
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|
"""Spawns a work horse to perform the actual work and passes it a job.
|
|
|
|