Make "queue" argument in job decorator required.

job decorator now uses Queue's "enqueue_call" method.
main
Selwin Ong 13 years ago
parent ae97f862dc
commit 8c3292d35b

@ -1,20 +1,15 @@
from .connections import use_connection, get_current_connection
from .queue import Queue from .queue import Queue
class job(object): class job(object):
def __init__(self, queue=None): def __init__(self, queue, connection=None, timeout=None):
""" """
A decorator that adds a ``delay`` method to the decorated function, A decorator that adds a ``delay`` method to the decorated function,
which in turn creates a RQ job when called. Accepts a ``queue`` instance which in turn creates a RQ job when called. Accepts a ``queue`` instance
as an optional argument. For example: as an optional argument. For example:
from rq import Queue, use_connection @job(queue='default')
use_connection()
q = Queue()
@job(queue=q)
def simple_add(x, y): def simple_add(x, y):
return x + y return x + y
@ -22,14 +17,15 @@ class job(object):
""" """
self.queue = queue self.queue = queue
self.connection = connection
self.timeout = timeout
def __call__(self, f): def __call__(self, f):
def delay(*args, **kwargs): def delay(*args, **kwargs):
if self.queue is None: if isinstance(self.queue, basestring):
use_connection(get_current_connection()) queue = Queue(name=self.queue, connection=self.connection)
self.queue = Queue() else:
return self.queue.enqueue(f, *args, **kwargs) queue = self.queue
return queue.enqueue_call(f, args=args, kwargs=kwargs, timeout=self.timeout)
f.delay = delay f.delay = delay
return f return f

@ -51,6 +51,6 @@ class Calculator(object):
def calculate(x, y): def calculate(x, y):
return x * y / self.denominator return x * y / self.denominator
@job() @job(queue='default')
def decorated_job(x, y): def decorated_job(x, y):
return x + y return x + y

@ -27,3 +27,13 @@ class TestDecorator(RQTestCase):
# Ensure that job returns the right result when performed # Ensure that job returns the right result when performed
self.assertEqual(result.perform(), 3) self.assertEqual(result.perform(), 3)
def test_decorator_accepts_queue_name_as_argument(self):
"""
Ensure that passing in queue name to the decorator puts the job in the
right queue.
"""
@job(queue='queue_name')
def hello():
return 'Hi'
result = hello.delay()
self.assertEqual(result.origin, 'queue_name')

Loading…
Cancel
Save