Merge branch 'selwin-result_ttl'

main
Vincent Driessen 13 years ago
commit 720011c33e

@ -9,7 +9,8 @@ from .exceptions import UnpickleError, NoSuchJobError
JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args',
'created_at', 'enqueued_at', 'connection', '_result', 'created_at', 'enqueued_at', 'connection', '_result',
'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance']) 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance',
'result_ttl'])
def unpickle(pickled_string): def unpickle(pickled_string):
@ -50,7 +51,8 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None): def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
""" """
@ -71,6 +73,7 @@ class Job(object):
job._args = args job._args = args
job._kwargs = kwargs job._kwargs = kwargs
job.description = job.get_call_string() job.description = job.get_call_string()
job.result_ttl = result_ttl
return job return job
@property @property
@ -134,6 +137,7 @@ class Job(object):
self._result = None self._result = None
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None
# Data access # Data access
@ -219,6 +223,7 @@ class Job(object):
self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa
self.exc_info = obj.get('exc_info') self.exc_info = obj.get('exc_info')
self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None
self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa
# Overwrite job's additional attrs (those not in JOB_ATTRS), if any # Overwrite job's additional attrs (those not in JOB_ATTRS), if any
additional_attrs = set(obj.keys()).difference(JOB_ATTRS) additional_attrs = set(obj.keys()).difference(JOB_ATTRS)
@ -248,6 +253,8 @@ class Job(object):
obj['exc_info'] = self.exc_info obj['exc_info'] = self.exc_info
if self.timeout is not None: if self.timeout is not None:
obj['timeout'] = self.timeout obj['timeout'] = self.timeout
if self.result_ttl is not None:
obj['result_ttl'] = self.result_ttl
""" """
Store additional attributes from job instance into Redis. This is done Store additional attributes from job instance into Redis. This is done
so that third party libraries using RQ can store additional data so that third party libraries using RQ can store additional data

@ -104,7 +104,7 @@ class Queue(object):
"""Pushes a job ID on the corresponding Redis queue.""" """Pushes a job ID on the corresponding Redis queue."""
self.connection.rpush(self.key, job_id) self.connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None): def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -113,7 +113,7 @@ class Queue(object):
contain options for RQ itself. contain options for RQ itself.
""" """
timeout = timeout or self._default_timeout timeout = timeout or self._default_timeout
job = Job.create(func, args, kwargs, connection=self.connection) job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl)
return self.enqueue_job(job, timeout=timeout) return self.enqueue_job(job, timeout=timeout)
def enqueue(self, f, *args, **kwargs): def enqueue(self, f, *args, **kwargs):
@ -138,14 +138,16 @@ class Queue(object):
# Detect explicit invocations, i.e. of the form: # Detect explicit invocations, i.e. of the form:
# q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30)
timeout = None timeout = None
result_ttl = None
if 'args' in kwargs or 'kwargs' in kwargs: if 'args' in kwargs or 'kwargs' in kwargs:
assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
args = kwargs.pop('args', None) args = kwargs.pop('args', None)
result_ttl = kwargs.pop('result_ttl', None)
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout) timeout=timeout, result_ttl=result_ttl)
def enqueue_job(self, job, timeout=None, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.

@ -94,7 +94,8 @@ class Worker(object):
return worker return worker
def __init__(self, queues, name=None, rv_ttl=500, connection=None): # noqa def __init__(self, queues, name=None, default_result_ttl=500,
connection=None): # noqa
if connection is None: if connection is None:
connection = get_current_connection() connection = get_current_connection()
self.connection = connection self.connection = connection
@ -103,7 +104,7 @@ class Worker(object):
self._name = name self._name = name
self.queues = queues self.queues = queues
self.validate_queues() self.validate_queues()
self.rv_ttl = rv_ttl self.default_result_ttl = default_result_ttl
self._state = 'starting' self._state = 'starting'
self._is_horse = False self._is_horse = False
self._horse_pid = 0 self._horse_pid = 0
@ -398,10 +399,20 @@ class Worker(object):
else: else:
self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),))
if rv is not None: # Expire results
has_result = rv is not None
explicit_ttl_requested = job.result_ttl is not None
should_expire = has_result or explicit_ttl_requested
if should_expire:
p = self.connection.pipeline() p = self.connection.pipeline()
p.hset(job.key, 'result', pickled_rv) p.hset(job.key, 'result', pickled_rv)
p.expire(job.key, self.rv_ttl)
if explicit_ttl_requested:
ttl = job.result_ttl
else:
ttl = self.default_result_ttl
if ttl >= 0:
p.expire(job.key, ttl)
p.execute() p.execute()
else: else:
# Cleanup immediately # Cleanup immediately

@ -189,3 +189,15 @@ class TestJob(RQTestCase):
job2 = Job.fetch(job.id) job2 = Job.fetch(job.id)
job2.refresh() job2.refresh()
self.assertEqual(job2.foo, 'bar') self.assertEqual(job2.foo, 'bar')
def test_result_ttl_is_persisted(self):
"""Ensure that job's result_ttl is set properly"""
job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10)
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, 10)
job = Job.create(func=say_hello, args=('Lionel',))
job.save()
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(job.result_ttl, None)

@ -252,3 +252,11 @@ class TestFailedQueue(RQTestCase):
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEquals(job.timeout, 200) self.assertEquals(job.timeout, 200)
def test_enqueue_preserves_result_ttl(self):
"""Enqueueing persists result_ttl."""
q = Queue()
job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10)
self.assertEqual(job.result_ttl, 10)
job_from_queue = Job.fetch(job.id, connection=self.testconn)
self.assertEqual(int(job_from_queue.result_ttl), 10)

@ -96,7 +96,6 @@ class TestWorker(RQTestCase):
self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info self.assertIsNotNone(job.exc_info) # should contain exc_info
def test_cancelled_jobs_arent_executed(self): # noqa def test_cancelled_jobs_arent_executed(self): # noqa
"""Cancelling jobs.""" """Cancelling jobs."""
@ -147,7 +146,6 @@ class TestWorker(RQTestCase):
assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.ttl(job_with_rv.key) > 0
assert not self.testconn.exists(job_without_rv.key) assert not self.testconn.exists(job_without_rv.key)
@slow # noqa @slow # noqa
def test_timeouts(self): def test_timeouts(self):
"""Worker kills jobs after timeout.""" """Worker kills jobs after timeout."""
@ -175,3 +173,17 @@ class TestWorker(RQTestCase):
# TODO: Having to do the manual refresh() here is really ugly! # TODO: Having to do the manual refresh() here is really ugly!
res.refresh() res.refresh()
self.assertIn('JobTimeoutException', res.exc_info) self.assertIn('JobTimeoutException', res.exc_info)
def test_worker_sets_result_ttl(self):
"""Ensure that Worker properly sets result_ttl for individual jobs."""
q = Queue()
job = q.enqueue(say_hello, args=('Frank',), result_ttl=10)
w = Worker([q])
w.work(burst=True)
self.assertNotEqual(self.testconn.ttl(job.key), 0)
# Job with -1 result_ttl don't expire
job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1)
w = Worker([q])
w.work(burst=True)
self.assertEqual(self.testconn.ttl(job.key), None)

Loading…
Cancel
Save