|
|
|
@ -49,6 +49,8 @@ class Worker(object):
|
|
|
|
|
self.queues = queues
|
|
|
|
|
self.validate_queues()
|
|
|
|
|
self.rv_ttl = rv_ttl
|
|
|
|
|
self._state = 'starting'
|
|
|
|
|
self._is_horse = False
|
|
|
|
|
self.log = Logger('worker')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -93,6 +95,11 @@ class Worker(object):
|
|
|
|
|
"""The current process ID."""
|
|
|
|
|
return os.getpid()
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_horse(self):
|
|
|
|
|
"""Returns whether or not this is the worker or the work horse."""
|
|
|
|
|
return self._is_horse
|
|
|
|
|
|
|
|
|
|
def procline(self, message):
|
|
|
|
|
"""Changes the current procname for the process.
|
|
|
|
|
|
|
|
|
@ -144,6 +151,7 @@ class Worker(object):
|
|
|
|
|
self.state = 'starting'
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
|
|
|
|
self.state = 'idle'
|
|
|
|
|
self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
|
|
|
|
|
wait_for_job = not quit_when_done
|
|
|
|
|
job = Queue.dequeue_any(self.queues, wait_for_job)
|
|
|
|
@ -152,9 +160,9 @@ class Worker(object):
|
|
|
|
|
self.state = 'busy'
|
|
|
|
|
self.fork_and_perform_job(job)
|
|
|
|
|
did_work = True
|
|
|
|
|
self.state = 'idle'
|
|
|
|
|
finally:
|
|
|
|
|
self.register_death()
|
|
|
|
|
if not self._is_horse:
|
|
|
|
|
self.register_death()
|
|
|
|
|
return did_work
|
|
|
|
|
|
|
|
|
|
def work(self):
|
|
|
|
@ -175,6 +183,7 @@ class Worker(object):
|
|
|
|
|
def fork_and_perform_job(self, job):
|
|
|
|
|
child_pid = os.fork()
|
|
|
|
|
if child_pid == 0:
|
|
|
|
|
self._is_horse = True
|
|
|
|
|
random.seed()
|
|
|
|
|
self.log = Logger('horse')
|
|
|
|
|
try:
|
|
|
|
|