Document methods.

main
Vincent Driessen 13 years ago
parent 6013227f4c
commit 9e8a4d15be

@ -29,6 +29,7 @@ class Worker(object):
def validate_queues(self): def validate_queues(self):
"""Sanity check for the given queues."""
if not iterable(self.queues): if not iterable(self.queues):
raise ValueError('Argument queues not iterable.') raise ValueError('Argument queues not iterable.')
for queue in self.queues: for queue in self.queues:
@ -36,45 +37,110 @@ class Worker(object):
raise NoQueueError('Give each worker at least one Queue.') raise NoQueueError('Give each worker at least one Queue.')
def queue_names(self): def queue_names(self):
"""Returns the queue names of this worker's queues."""
return map(lambda q: q.name, self.queues) return map(lambda q: q.name, self.queues)
def queue_keys(self): def queue_keys(self):
"""Returns the Redis keys representing this worker's queues."""
return map(lambda q: q.key, self.queues) return map(lambda q: q.key, self.queues)
@property @property
def name(self): def name(self):
"""Returns the name of the worker, under which it is registered to the
monitoring system.
By default, the name of the worker is constructed from the current
(short) host name and the current PID.
"""
if self._name is None: if self._name is None:
hostname = socket.gethostname() hostname = socket.gethostname()
shortname, _, _ = hostname.partition('.') shortname, _, _ = hostname.partition('.')
self._name = '%s.%s' % (shortname, self.pid) self._name = '%s.%s' % (shortname, self.pid)
return self._name return self._name
@property
def key(self):
"""Returns the worker's Redis hash key."""
return self.redis_worker_namespace_prefix + self.name
@property @property
def pid(self): def pid(self):
"""The current process ID."""
return os.getpid() return os.getpid()
def procline(self, message): def procline(self, message):
"""Changes the current procname for the process.
This can be used to make `ps -ef` output more readable.
"""
self.log.debug(message) self.log.debug(message)
procname.setprocname('rq: %s' % (message,)) procname.setprocname('rq: %s' % (message,))
def register_birth(self):
"""Registers its own birth."""
if conn.exists(self.key) and not conn.hexists(self.key, 'death'):
raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,))
with conn.pipeline() as p:
p.delete(self.key)
p.hset(self.key, 'birth', time.time())
p.execute()
def register_death(self):
"""Registers its own birth."""
self.log.error('Registering death')
with conn.pipeline() as p:
# We cannot use self.state = 'dead' here, because that would
# rollback the pipeline
p.hset(self.key, 'state', 'dead')
p.hset(self.key, 'death', time.time())
p.expire(self.key, 60)
p.execute()
def set_state(self, new_state):
self._state = new_state
conn.hset(self.key, 'state', new_state)
def get_state(self):
return self._state
state = property(get_state, set_state)
def _work(self, quit_when_done=False): def _work(self, quit_when_done=False):
"""This method starts the work loop.
"""
did_work = False did_work = False
self.register_birth()
self.state = 'starting'
try:
while True: while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
wait_for_job = not quit_when_done wait_for_job = not quit_when_done
job = Queue.dequeue_any(self.queues, wait_for_job) job = Queue.dequeue_any(self.queues, wait_for_job)
if job is None: if job is None:
break break
did_work = True self.state = 'busy'
self.fork_and_perform_job(job) self.fork_and_perform_job(job)
did_work = True
self.state = 'idle'
finally:
self.register_death()
return did_work return did_work
def work(self): def work(self):
"""Pop and perform all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues.
"""
self._work(False) self._work(False)
def work_burst(self): def work_burst(self):
"""Pop and perform all jobs on the current list of queues. When all
queues are empty, return.
The return value indicates whether any jobs were processed.
"""
return self._work(True) return self._work(True)
def fork_and_perform_job(self, job): def fork_and_perform_job(self, job):

Loading…
Cancel
Save