Simplify calling .work() or .work(burst=True).

main
Vincent Driessen 13 years ago
parent 636b6690d6
commit aecb0a1bf0

@ -64,10 +64,7 @@ def main():
queues = map(Queue, args) queues = map(Queue, args)
w = Worker(queues, name=opts.name) w = Worker(queues, name=opts.name)
if opts.burst: w.work(burst=opts.burst)
w.work_burst()
else:
w.work()
if __name__ == '__main__': if __name__ == '__main__':
main() main()

@ -219,8 +219,14 @@ class Worker(object):
signal.signal(signal.SIGTERM, request_stop) signal.signal(signal.SIGTERM, request_stop)
def _work(self, quit_when_done=False): def work(self, burst=False):
"""This method starts the work loop. """Starts the work loop.
Pops and performs all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues, unless `burst` is True.
The return value indicates whether any jobs were processed.
""" """
self._install_signal_handlers() self._install_signal_handlers()
@ -236,7 +242,7 @@ class Worker(object):
qnames = self.queue_names() qnames = self.queue_names()
self.procline('Listening on %s' % (','.join(qnames))) self.procline('Listening on %s' % (','.join(qnames)))
self.log.info('*** Listening for work on %s' % (', '.join(qnames))) self.log.info('*** Listening for work on %s' % (', '.join(qnames)))
wait_for_job = not quit_when_done wait_for_job = not burst
job = Queue.dequeue_any(self.queues, wait_for_job) job = Queue.dequeue_any(self.queues, wait_for_job)
if job is None: if job is None:
break break
@ -250,21 +256,6 @@ class Worker(object):
self.register_death() self.register_death()
return did_work return did_work
def work(self):
"""Pop and perform all jobs on the current list of queues. When all
queues are empty, block and wait for new jobs to arrive on any of the
queues.
"""
self._work(False)
def work_burst(self):
"""Pop and perform all jobs on the current list of queues. When all
queues are empty, return.
The return value indicates whether any jobs were processed.
"""
return self._work(True)
def fork_and_perform_job(self, job): def fork_and_perform_job(self, job):
child_pid = os.fork() child_pid = os.fork()
if child_pid == 0: if child_pid == 0:

@ -139,10 +139,10 @@ class TestWorker(RQTestCase):
"""Worker processes work, then quits.""" """Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar') fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq]) w = Worker([fooq, barq])
self.assertEquals(w.work_burst(), False, 'Did not expect any work on the queue.') self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.')
fooq.enqueue(testjob, name='Frank') fooq.enqueue(testjob, name='Frank')
self.assertEquals(w.work_burst(), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.')
if __name__ == '__main__': if __name__ == '__main__':

Loading…
Cancel
Save