Added "result_ttl" property on jobs that determines how long job results are persisted in Redis.

main
Selwin Ong 13 years ago
parent 5cbc59205a
commit a5e6765990

@ -9,7 +9,8 @@ from .exceptions import UnpickleError, NoSuchJobError
JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result',
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance'])
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance',
'result_ttl'])
def unpickle(pickled_string):
@ -50,7 +51,8 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None):
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
"""
@ -71,6 +73,7 @@ class Job(object):
job._args = args
job._kwargs = kwargs
job.description = job.get_call_string()
job.result_ttl = result_ttl
return job
@property
@ -134,6 +137,7 @@ class Job(object):
self._result = None
self.exc_info = None
self.timeout = None
self.result_ttl = None
# Data access
@ -219,6 +223,7 @@ class Job(object):
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
# Overwrite job's additional attrs (those not in JOB_ATTRS), if any
additional_attrs = set(obj.keys()).difference(JOB_ATTRS)
@ -248,6 +253,8 @@ class Job(object):
obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
"""
Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data

@ -104,7 +104,7 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None):
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa
"""Creates a job to represent the delayed function call and enqueues
it.
@ -113,7 +113,7 @@ class Queue(object):
contain options for RQ itself.
"""
timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection)
job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl)
return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs):
@ -138,14 +138,16 @@ class Queue(object):
# Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = None
result_ttl = None
if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None)
args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None)
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout)
timeout=timeout, result_ttl=result_ttl)
def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution.

@ -400,7 +400,10 @@ class Worker(object):
if rv is not None:
p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv)
p.expire(job.key, self.rv_ttl)
if job.result_ttl is None:
p.expire(job.key, self.rv_ttl)
elif job.result_ttl >= 0:
p.expire(job.key, job.result_ttl)
p.execute()
else:
# Cleanup immediately

@ -189,3 +189,15 @@ class TestJob(RQTestCase):
job2 = Job.fetch(job.id)
job2.refresh()
self.assertEqual(job2.foo, 'bar')
def test_result_ttl_is_persisted(self):
"""Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10)
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',))
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)

@ -252,3 +252,11 @@ class TestFailedQueue(RQTestCase):
job = Job.fetch(job.id)
self.assertEquals(job.timeout, 200)
def test_enqueue_preserves_result_ttl(self):
"""Ensure that result_ttl argument are properly persisted to Redis."""
q = Queue()
job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10)
self.assertEqual(job.result_ttl, 10)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(int(job_from_queue.result_ttl), 10)

@ -175,3 +175,19 @@ class TestWorker(RQTestCase):
# TODO: Having to do the manual refresh() here is really ugly!
res.refresh()
self.assertIn('JobTimeoutException', res.exc_info)
def test_worker_sets_result_ttl(self):
"""Ensure that Worker properly sets result_ttl for individual jobs."""
q = Queue()
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q])
w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), 10)
# Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q])
w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None)

Loading…
Cancel
Save