diff --git a/queue.py b/queue.py new file mode 100644 index 0000000..f5e015b --- /dev/null +++ b/queue.py @@ -0,0 +1,17 @@ +def to_queue_key(queue_name): + return 'rq:%s' % (queue_name,) + + +class Queue(object): + def __init__(self, friendly_name): + if not friendly_name: + raise ValueError("Please specify a valid queue name (Got '%s')." % friendly_name) + self.name = friendly_name + self._key = to_queue_key(friendly_name) + + @property + def key(self): + return self._key + + def __str__(self): + return self.name diff --git a/worker.py b/worker.py index eee1c0d..92799ad 100644 --- a/worker.py +++ b/worker.py @@ -6,25 +6,27 @@ import procname from logbook import Logger from pickle import loads, dumps from rdb import conn -from . import to_queue_key +from .queue import Queue class NoQueueError(Exception): pass class Worker(object): def __init__(self, queue_names, rv_ttl=500): - self.queue_names = queue_names + self.queues = map(Queue, queue_names) self.rv_ttl = rv_ttl self._working = False self.log = Logger('worker') self.validate_queues() def validate_queues(self): - if not self.queue_names: + if not self.queues: raise NoQueueError('Give each worker at least one queue.') - @property + def queue_names(self): + return map(lambda q: q.name, self.queues) + def queue_keys(self): - return map(to_queue_key, self.queue_names) + return map(lambda q: q.key, self.queues) def is_idle(self): return not self.is_working() @@ -42,8 +44,8 @@ class Worker(object): def work(self): while True: - self.procline('Waiting on %s' % (', '.join(self.queue_names),)) - queue, msg = conn.blpop(self.queue_keys) + self.procline('Waiting on %s' % (', '.join(self.queue_names()),)) + queue, msg = conn.blpop(self.queue_keys()) self.fork_and_perform_job(queue, msg) def fork_and_perform_job(self, queue, msg):