Updated Worker API

main
RyanMTB 10 years ago
parent d51f0200d7
commit 9a00b0eca6

@ -70,6 +70,9 @@ class Queue(object):
def __len__(self): def __len__(self):
return self.count return self.count
def __iter__(self):
yield self
@property @property
def key(self): def key(self):
"""Returns the Redis key for this Queue.""" """Returns the Redis key for this Queue."""

@ -125,8 +125,7 @@ class Worker(object):
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
if isinstance(queues, self.queue_class): queues = self.process_queue_args(queues)
queues = [queues]
self._name = name self._name = name
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
@ -160,11 +159,18 @@ class Worker(object):
def validate_queues(self): def validate_queues(self):
"""Sanity check for the given queues.""" """Sanity check for the given queues."""
if not iterable(self.queues):
raise ValueError('Argument queues not iterable.')
for queue in self.queues: for queue in self.queues:
if not isinstance(queue, self.queue_class): if not isinstance(queue, self.queue_class):
raise NoQueueError('Give each worker at least one Queue.') raise NoQueueError('{0} is not a queue'.format(queue))
def process_queue_args(self, queue_args):
""" allow for a string, a queue an iterable of strings
or an iterable of queues"""
if isinstance(queue_args, text_type):
return self.queue_class(name = queue_args)
else:
return [self.queue_class(name=queue_arg) if isinstance(queue_arg, text_type)
else queue_arg for queue_arg in queue_args]
def queue_names(self): def queue_names(self):
"""Returns the queue names of this worker's queues.""" """Returns the queue names of this worker's queues."""

@ -12,6 +12,7 @@ from tests.helpers import strip_microseconds
from rq import get_failed_queue, Queue, SimpleWorker, Worker from rq import get_failed_queue, Queue, SimpleWorker, Worker
from rq.compat import as_text from rq.compat import as_text
from rq.exceptions import NoQueueError
from rq.job import Job, JobStatus from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend from rq.suspension import resume, suspend
@ -28,6 +29,40 @@ class TestWorker(RQTestCase):
w = Worker([fooq, barq]) w = Worker([fooq, barq])
self.assertEquals(w.queues, [fooq, barq]) self.assertEquals(w.queues, [fooq, barq])
def test_create_worker_args_single_queue(self):
"""Test Worker creation with single queue instance arg"""
fooq = Queue('foo')
w = Worker(fooq)
self.assertEquals(w.queue_keys(), ['rq:queue:foo'])
def test_create_worker_args_single_string(self):
""" Test Worker creation with single string arg"""
w = Worker('foo')
self.assertEquals(w.queue_keys(),['rq:queue:foo'])
def test_create_worker_args_iterable_strings(self):
""" Test Worker creation with iterable of strings"""
w = Worker(['foo', 'bar'])
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_args_iterable_queues(self):
""" Test Worker test worker creation
with an iterable of queue instance args"""
w = Worker(map(Queue, ['foo', 'bar']))
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_args_list_map(self):
""" Test Worker test worker creation
with a list of queue from map"""
w = Worker(list(map(Queue, ['foo', 'bar'])))
self.assertEquals(w.queue_keys(),['rq:queue:foo', 'rq:queue:bar'])
def test_create_worker_raises_noqueue_error(self):
""" make sure raises noqueue error if a
a non string or queue is passed"""
with self.assertRaises(NoQueueError):
w = Worker([1])
def test_work_and_quit(self): def test_work_and_quit(self):
"""Worker processes work, then quits.""" """Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar') fooq, barq = Queue('foo'), Queue('bar')

Loading…
Cancel
Save