Merge branch 'igungor-job-description'

main
Vincent Driessen 11 years ago
commit 135b1ff10c

@ -68,7 +68,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None): result_ttl=None, status=None, description=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -88,7 +88,7 @@ class Job(object):
job._func_name = func job._func_name = func
job._args = args job._args = args
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job._status = status job._status = status
return job return job

@ -129,7 +129,8 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): # noqa def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -138,7 +139,7 @@ class Queue(object):
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection, job = Job.create(func, args, kwargs, description=description, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED) result_ttl=result_ttl, status=Status.QUEUED)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
@ -157,22 +158,23 @@ class Queue(object):
meaningful to the import context of the workers) meaningful to the import context of the workers)
""" """
if not isinstance(f, string_types) and f.__module__ == '__main__': if not isinstance(f, string_types) and f.__module__ == '__main__':
raise ValueError( raise ValueError('Functions from the __main__ module cannot be processed '
'Functions from the __main__ module cannot be processed '
'by workers.') 'by workers.')
# Detect explicit invocations, i.e. of the form: # Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = None timeout = None
description = None
result_ttl = None result_ttl = None
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None)
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs, description=description,
timeout=timeout, result_ttl=result_ttl) timeout=timeout, result_ttl=result_ttl)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):

@ -88,9 +88,9 @@ class TestJob(RQTestCase):
"""Fetching jobs.""" """Fetching jobs."""
# Prepare test # Prepare test
self.testconn.hset('rq:job:some_id', 'data', self.testconn.hset('rq:job:some_id', 'data',
"(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.") # noqa "(S'tests.fixtures.some_calculation'\nN(I3\nI4\nt(dp1\nS'z'\nI2\nstp2\n.")
self.testconn.hset('rq:job:some_id', 'created_at', self.testconn.hset('rq:job:some_id', 'created_at',
"2012-02-07 22:13:24+0000") '2012-02-07 22:13:24+0000')
# Fetch returns a job # Fetch returns a job
job = Job.fetch('some_id') job = Job.fetch('some_id')
@ -195,14 +195,27 @@ class TestJob(RQTestCase):
"""Ensure that job's result_ttl is set properly""" """Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10)
job.save() job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10) self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',)) job = Job.create(func=say_hello, args=('Lionel',))
job.save() job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn) Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None) self.assertEqual(job.result_ttl, None)
def test_description_is_persisted(self):
"""Ensure that job's custom description is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), description=u'Say hello!')
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, u'Say hello!')
# Ensure job description is constructed from function call string
job = Job.create(func=say_hello, args=('Lionel',))
job.save()
Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.description, "tests.fixtures.say_hello('Lionel')")
def test_job_access_within_job_function(self): def test_job_access_within_job_function(self):
"""The current job is accessible within the job function.""" """The current job is accessible within the job function."""
# Executing the job function from outside of RQ throws an exception # Executing the job function from outside of RQ throws an exception

Loading…
Cancel
Save