diff --git a/rq/worker.py b/rq/worker.py index 23fb1fc..362d719 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -29,6 +29,7 @@ class Worker(object): def validate_queues(self): + """Sanity check for the given queues.""" if not iterable(self.queues): raise ValueError('Argument queues not iterable.') for queue in self.queues: @@ -36,45 +37,110 @@ class Worker(object): raise NoQueueError('Give each worker at least one Queue.') def queue_names(self): + """Returns the queue names of this worker's queues.""" return map(lambda q: q.name, self.queues) def queue_keys(self): + """Returns the Redis keys representing this worker's queues.""" return map(lambda q: q.key, self.queues) @property def name(self): + """Returns the name of the worker, under which it is registered to the + monitoring system. + + By default, the name of the worker is constructed from the current + (short) host name and the current PID. + """ if self._name is None: hostname = socket.gethostname() shortname, _, _ = hostname.partition('.') self._name = '%s.%s' % (shortname, self.pid) return self._name + @property + def key(self): + """Returns the worker's Redis hash key.""" + return self.redis_worker_namespace_prefix + self.name + @property def pid(self): + """The current process ID.""" return os.getpid() def procline(self, message): + """Changes the current procname for the process. + + This can be used to make `ps -ef` output more readable. + """ self.log.debug(message) procname.setprocname('rq: %s' % (message,)) + def register_birth(self): + """Registers its own birth.""" + if conn.exists(self.key) and not conn.hexists(self.key, 'death'): + raise ValueError('There exists an active worker named \'%s\' alread.' % (self.name,)) + with conn.pipeline() as p: + p.delete(self.key) + p.hset(self.key, 'birth', time.time()) + p.execute() + + def register_death(self): + """Registers its own birth.""" + self.log.error('Registering death') + with conn.pipeline() as p: + # We cannot use self.state = 'dead' here, because that would + # rollback the pipeline + p.hset(self.key, 'state', 'dead') + p.hset(self.key, 'death', time.time()) + p.expire(self.key, 60) + p.execute() + + def set_state(self, new_state): + self._state = new_state + conn.hset(self.key, 'state', new_state) + + def get_state(self): + return self._state + + state = property(get_state, set_state) + def _work(self, quit_when_done=False): + """This method starts the work loop. + """ did_work = False - while True: - self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) - wait_for_job = not quit_when_done - job = Queue.dequeue_any(self.queues, wait_for_job) - if job is None: - break - did_work = True - self.fork_and_perform_job(job) + self.register_birth() + self.state = 'starting' + try: + while True: + self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) + wait_for_job = not quit_when_done + job = Queue.dequeue_any(self.queues, wait_for_job) + if job is None: + break + self.state = 'busy' + self.fork_and_perform_job(job) + did_work = True + self.state = 'idle' + finally: + self.register_death() return did_work def work(self): + """Pop and perform all jobs on the current list of queues. When all + queues are empty, block and wait for new jobs to arrive on any of the + queues. + """ self._work(False) def work_burst(self): + """Pop and perform all jobs on the current list of queues. When all + queues are empty, return. + + The return value indicates whether any jobs were processed. + """ return self._work(True) def fork_and_perform_job(self, job):