Factor out a Queue object.

It might be useful to add some methods to that object, later.
main
Vincent Driessen 13 years ago
parent 834a79814e
commit 5eadd5ef52

@ -0,0 +1,17 @@
def to_queue_key(queue_name):
return 'rq:%s' % (queue_name,)
class Queue(object):
def __init__(self, friendly_name):
if not friendly_name:
raise ValueError("Please specify a valid queue name (Got '%s')." % friendly_name)
self.name = friendly_name
self._key = to_queue_key(friendly_name)
@property
def key(self):
return self._key
def __str__(self):
return self.name

@ -6,25 +6,27 @@ import procname
from logbook import Logger from logbook import Logger
from pickle import loads, dumps from pickle import loads, dumps
from rdb import conn from rdb import conn
from . import to_queue_key from .queue import Queue
class NoQueueError(Exception): pass class NoQueueError(Exception): pass
class Worker(object): class Worker(object):
def __init__(self, queue_names, rv_ttl=500): def __init__(self, queue_names, rv_ttl=500):
self.queue_names = queue_names self.queues = map(Queue, queue_names)
self.rv_ttl = rv_ttl self.rv_ttl = rv_ttl
self._working = False self._working = False
self.log = Logger('worker') self.log = Logger('worker')
self.validate_queues() self.validate_queues()
def validate_queues(self): def validate_queues(self):
if not self.queue_names: if not self.queues:
raise NoQueueError('Give each worker at least one queue.') raise NoQueueError('Give each worker at least one queue.')
@property def queue_names(self):
return map(lambda q: q.name, self.queues)
def queue_keys(self): def queue_keys(self):
return map(to_queue_key, self.queue_names) return map(lambda q: q.key, self.queues)
def is_idle(self): def is_idle(self):
return not self.is_working() return not self.is_working()
@ -42,8 +44,8 @@ class Worker(object):
def work(self): def work(self):
while True: while True:
self.procline('Waiting on %s' % (', '.join(self.queue_names),)) self.procline('Waiting on %s' % (', '.join(self.queue_names()),))
queue, msg = conn.blpop(self.queue_keys) queue, msg = conn.blpop(self.queue_keys())
self.fork_and_perform_job(queue, msg) self.fork_and_perform_job(queue, msg)
def fork_and_perform_job(self, queue, msg): def fork_and_perform_job(self, queue, msg):

Loading…
Cancel
Save