Refactor the .enqueue() API to not gobble the timeout kwargs.

This fixes #98.
main
Vincent Driessen 13 years ago
parent 8436d9b2c8
commit f6e67431d7

@ -50,11 +50,16 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, *args, **kwargs): def create(cls, func, args=None, kwargs=None, connection=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
connection = kwargs.pop('connection', None) if args is None:
args = ()
if kwargs is None:
kwargs = {}
assert isinstance(args, tuple), '%r is not a valid args list.' % (args,)
assert isinstance(kwargs, dict), '%r is not a valid kwargs dict.' % (kwargs,)
job = cls(connection=connection) job = cls(connection=connection)
if inspect.ismethod(func): if inspect.ismethod(func):
job._instance = func.im_self job._instance = func.im_self

@ -107,25 +107,43 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args, kwargs, **options):
"""Creates a job to represent the delayed function call and enqueues
it.
It is much like `.enqueue()`, except that it takes the function's args
and kwargs as explicit arguments. Any kwargs passed to this function
contain options for RQ itself.
"""
timeout = options.get('timeout', self._default_timeout)
job = Job.create(func, args, kwargs, connection=self.connection)
return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
Expects the function to call, along with the arguments and keyword Expects the function to call, along with the arguments and keyword
arguments. May be a fully qualified string of the function instance, arguments.
in which case the function must be meaningful to the worker.
The function argument `f` may be any of the following:
The special keyword `timeout` is reserved for `enqueue()` itself and * A reference to a function
it won't be passed to the actual job function. * A reference to an object's instance method
* A string, representing the location of a function (must be
meaningful to the import context of the workers)
""" """
if not isinstance(f, basestring) and f.__module__ == '__main__': if not isinstance(f, basestring) and f.__module__ == '__main__':
raise ValueError( raise ValueError(
'Functions from the __main__ module cannot be processed ' 'Functions from the __main__ module cannot be processed '
'by workers.') 'by workers.')
timeout = kwargs.pop('timeout', self._default_timeout) options = {}
job = Job.create(f, *args, connection=self.connection, **kwargs) try:
return self.enqueue_job(job, timeout=timeout) options['timeout'] = kwargs.pop('timeout')
except KeyError:
pass
return self.enqueue_call(func=f, args=args, kwargs=kwargs, **options)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.

@ -30,7 +30,7 @@ class TestJob(RQTestCase):
def test_create_typical_job(self): def test_create_typical_job(self):
"""Creation of jobs for function calls.""" """Creation of jobs for function calls."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
# Jobs have a random UUID # Jobs have a random UUID
self.assertIsNotNone(job.id) self.assertIsNotNone(job.id)
@ -51,7 +51,7 @@ class TestJob(RQTestCase):
def test_create_instance_method_job(self): def test_create_instance_method_job(self):
"""Creation of jobs for instance methods.""" """Creation of jobs for instance methods."""
c = Calculator(2) c = Calculator(2)
job = Job.create(c.calculate, 3, 4) job = Job.create(func=c.calculate, args=(3, 4))
# Job data is set # Job data is set
self.assertEquals(job.func, c.calculate) self.assertEquals(job.func, c.calculate)
@ -60,7 +60,7 @@ class TestJob(RQTestCase):
def test_create_job_from_string_function(self): def test_create_job_from_string_function(self):
"""Creation of jobs using string specifier.""" """Creation of jobs using string specifier."""
job = Job.create('tests.fixtures.say_hello', 'World') job = Job.create(func='tests.fixtures.say_hello', args=('World',))
# Job data is set # Job data is set
self.assertEquals(job.func, say_hello) self.assertEquals(job.func, say_hello)
@ -69,7 +69,7 @@ class TestJob(RQTestCase):
def test_save(self): # noqa def test_save(self): # noqa
"""Storing jobs.""" """Storing jobs."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
# Saving creates a Redis hash # Saving creates a Redis hash
self.assertEquals(self.testconn.exists(job.key), False) self.assertEquals(self.testconn.exists(job.key), False)
@ -116,7 +116,7 @@ class TestJob(RQTestCase):
def test_persistence_of_typical_jobs(self): def test_persistence_of_typical_jobs(self):
"""Storing typical jobs.""" """Storing typical jobs."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
expected_date = strip_milliseconds(job.created_at) expected_date = strip_milliseconds(job.created_at)
@ -132,7 +132,7 @@ class TestJob(RQTestCase):
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch.""" """Store, then fetch."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
@ -151,7 +151,7 @@ class TestJob(RQTestCase):
def test_fetching_unreadable_data(self): def test_fetching_unreadable_data(self):
"""Fetching fails on unreadable data.""" """Fetching fails on unreadable data."""
# Set up # Set up
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(func=some_calculation, args=(3, 4), kwargs=dict(z=2))
job.save() job.save()
# Just replace the data hkey with some random noise # Just replace the data hkey with some random noise
@ -161,7 +161,7 @@ class TestJob(RQTestCase):
def test_job_is_unimportable(self): def test_job_is_unimportable(self):
"""Jobs that cannot be imported throw exception on access.""" """Jobs that cannot be imported throw exception on access."""
job = Job.create(say_hello, 'Lionel') job = Job.create(func=say_hello, args=('Lionel',))
job.save() job.save()
# Now slightly modify the job to make it unimportable (this is # Now slightly modify the job to make it unimportable (this is
@ -181,7 +181,7 @@ class TestJob(RQTestCase):
- Saved in Redis when job.save() is called - Saved in Redis when job.save() is called
- Attached back to job instance when job.refresh() is called - Attached back to job instance when job.refresh() is called
""" """
job = Job.create(say_hello, 'Lionel') job = Job.create(func=say_hello, args=('Lionel',))
job.foo = 'bar' job.foo = 'bar'
job.save() job.save()
self.assertEqual(self.testconn.hget(job.key, 'foo'), 'bar') self.assertEqual(self.testconn.hget(job.key, 'foo'), 'bar')

@ -86,7 +86,7 @@ class TestQueue(RQTestCase):
def test_enqueue_sets_metadata(self): def test_enqueue_sets_metadata(self):
"""Enqueueing job onto queues modifies meta data.""" """Enqueueing job onto queues modifies meta data."""
q = Queue() q = Queue()
job = Job.create(say_hello, 'Nick', foo='bar') job = Job.create(func=say_hello, args=('Nick',), kwargs=dict(foo='bar'))
# Preconditions # Preconditions
self.assertIsNone(job.origin) self.assertIsNone(job.origin)
@ -209,7 +209,7 @@ class TestQueue(RQTestCase):
class TestFailedQueue(RQTestCase): class TestFailedQueue(RQTestCase):
def test_requeue_job(self): def test_requeue_job(self):
"""Requeueing existing jobs.""" """Requeueing existing jobs."""
job = Job.create(div_by_zero, 1, 2, 3) job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake' job.origin = 'fake'
job.save() job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa
@ -233,7 +233,7 @@ class TestFailedQueue(RQTestCase):
def test_quarantine_preserves_timeout(self): def test_quarantine_preserves_timeout(self):
"""Quarantine preserves job timeout.""" """Quarantine preserves job timeout."""
job = Job.create(div_by_zero, 1, 2, 3) job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake' job.origin = 'fake'
job.timeout = 200 job.timeout = 200
job.save() job.save()
@ -243,7 +243,7 @@ class TestFailedQueue(RQTestCase):
def test_requeueing_preserves_timeout(self): def test_requeueing_preserves_timeout(self):
"""Requeueing preserves job timeout.""" """Requeueing preserves job timeout."""
job = Job.create(div_by_zero, 1, 2, 3) job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake' job.origin = 'fake'
job.timeout = 200 job.timeout = 200
job.save() job.save()

@ -45,7 +45,7 @@ class TestWorker(RQTestCase):
# NOTE: We have to fake this enqueueing for this test case. # NOTE: We have to fake this enqueueing for this test case.
# What we're simulating here is a call to a function that is not # What we're simulating here is a call to a function that is not
# importable from the worker process. # importable from the worker process.
job = Job.create(div_by_zero, 3) job = Job.create(func=div_by_zero, args=(3,))
job.save() job.save()
data = self.testconn.hget(job.key, 'data') data = self.testconn.hget(job.key, 'data')
invalid_data = data.replace('div_by_zero', 'nonexisting_job') invalid_data = data.replace('div_by_zero', 'nonexisting_job')

Loading…
Cancel
Save