Merge branch 'selwin-async-2'

This fixes #114.
main
Vincent Driessen 13 years ago
commit f06ef9c975

@ -292,7 +292,8 @@ class Job(object):
def perform(self): # noqa def perform(self): # noqa
"""Invokes the job function with the job arguments. """Invokes the job function with the job arguments.
""" """
return self.func(*self.args, **self.kwargs) self._result = self.func(*self.args, **self.kwargs)
return self._result
# Representation # Representation

@ -41,12 +41,14 @@ class Queue(object):
name = queue_key[len(prefix):] name = queue_key[len(prefix):]
return cls(name, connection=connection) return cls(name, connection=connection)
def __init__(self, name='default', default_timeout=None, connection=None): def __init__(self, name='default', default_timeout=None, connection=None,
async=True):
self.connection = resolve_connection(connection) self.connection = resolve_connection(connection)
prefix = self.redis_queue_namespace_prefix prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = '%s%s' % (prefix, name) self._key = '%s%s' % (prefix, name)
self._default_timeout = default_timeout self._default_timeout = default_timeout
self._async = async
@property @property
def key(self): def key(self):
@ -158,6 +160,8 @@ class Queue(object):
If the `set_meta_data` argument is `True` (default), it will update If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`. the properties `origin` and `enqueued_at`.
If Queue is instantiated with async=False, job is executed immediately.
""" """
if set_meta_data: if set_meta_data:
job.origin = self.name job.origin = self.name
@ -168,8 +172,12 @@ class Queue(object):
else: else:
job.timeout = 180 # default job.timeout = 180 # default
if self._async:
job.save() job.save()
self.push_job_id(job.id) self.push_job_id(job.id)
else:
job.perform()
job.save()
return job return job
def pop_job_id(self): def pop_job_id(self):

@ -1,5 +1,5 @@
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import Calculator, say_hello, div_by_zero from tests.fixtures import Calculator, div_by_zero, say_hello, some_calculation
from rq import Queue, get_failed_queue from rq import Queue, get_failed_queue
from rq.job import Job from rq.job import Job
from rq.exceptions import InvalidJobOperationError from rq.exceptions import InvalidJobOperationError
@ -260,3 +260,9 @@ class TestFailedQueue(RQTestCase):
self.assertEqual(job.result_ttl, 10) self.assertEqual(job.result_ttl, 10)
job_from_queue = Job.fetch(job.id, connection=self.testconn) job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(int(job_from_queue.result_ttl), 10) self.assertEqual(int(job_from_queue.result_ttl), 10)
def test_async_false(self):
"""Executes a job immediately if async=False."""
q = Queue(async=False)
job = q.enqueue(some_calculation, args=(2, 3))
self.assertEqual(job.return_value, 6)
Loading…
Cancel
Save