Merge pull request #453 from glaslos/job_ttl

TTL for jobs
main
Selwin Ong 10 years ago
commit e1801c5c44

@ -92,7 +92,7 @@ class Job(object):
# Job construction # Job construction
@classmethod @classmethod
def create(cls, func, args=None, kwargs=None, connection=None, def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None): id=None):
"""Creates a new Job instance for the given function, arguments, and """Creates a new Job instance for the given function, arguments, and
keyword arguments. keyword arguments.
@ -131,6 +131,7 @@ class Job(object):
# Extra meta data # Extra meta data
job.description = description or job.get_call_string() job.description = description or job.get_call_string()
job.result_ttl = result_ttl job.result_ttl = result_ttl
job.ttl = ttl
job.timeout = timeout job.timeout = timeout
job._status = status job._status = status
@ -311,6 +312,7 @@ class Job(object):
self.exc_info = None self.exc_info = None
self.timeout = None self.timeout = None
self.result_ttl = None self.result_ttl = None
self.ttl = None
self._status = None self._status = None
self._dependency_id = None self._dependency_id = None
self.meta = {} self.meta = {}
@ -455,6 +457,7 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict()) connection.hmset(key, self.to_dict())
self.cleanup(self.ttl)
def cancel(self): def cancel(self):
"""Cancels the given job, which will prevent the job from ever being """Cancels the given job, which will prevent the job from ever being
@ -491,8 +494,15 @@ class Job(object):
return self._result return self._result
def get_ttl(self, default_ttl=None): def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result """Returns ttl for a job that determines how long a job will be
will be persisted. In the future, this method will also be responsible persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.ttl is None else self.ttl
def get_result_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs. for determining ttl for repeated jobs.
""" """
return default_ttl if self.result_ttl is None else self.result_ttl return default_ttl if self.result_ttl is None else self.result_ttl
@ -513,14 +523,16 @@ class Job(object):
def cleanup(self, ttl=None, pipeline=None): def cleanup(self, ttl=None, pipeline=None):
"""Prepare job for eventual deletion (if needed). This method is usually """Prepare job for eventual deletion (if needed). This method is usually
called after successful execution. How long we persist the job and its called after successful execution. How long we persist the job and its
result depends on the value of result_ttl: result depends on the value of ttl:
- If result_ttl is 0, cleanup the job immediately. - If ttl is 0, cleanup the job immediately.
- If it's a positive number, set the job to expire in X seconds. - If it's a positive number, set the job to expire in X seconds.
- If result_ttl is negative, don't set an expiry to it (persist - If ttl is negative, don't set an expiry to it (persist
forever) forever)
""" """
if ttl == 0: if ttl == 0:
self.cancel() self.cancel()
elif not ttl:
return
elif ttl > 0: elif ttl > 0:
connection = pipeline if pipeline is not None else self.connection connection = pipeline if pipeline is not None else self.connection
connection.expire(self.key, ttl) connection.expire(self.key, ttl)

@ -167,8 +167,8 @@ class Queue(object):
connection.rpush(self.key, job_id) connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None, def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None, result_ttl=None, ttl=None, description=None,
job_id=None): depends_on=None, job_id=None):
"""Creates a job to represent the delayed function call and enqueues """Creates a job to represent the delayed function call and enqueues
it. it.
@ -180,7 +180,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status # TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection, job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED, result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout, description=description, depends_on=depends_on, timeout=timeout,
id=job_id) id=job_id)
@ -229,6 +229,7 @@ class Queue(object):
timeout = kwargs.pop('timeout', None) timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None) description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None) result_ttl = kwargs.pop('result_ttl', None)
ttl = kwargs.pop('ttl', None)
depends_on = kwargs.pop('depends_on', None) depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None) job_id = kwargs.pop('job_id', None)
@ -238,7 +239,7 @@ class Queue(object):
kwargs = kwargs.pop('kwargs', None) kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs, return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl, timeout=timeout, result_ttl=result_ttl, ttl=ttl,
description=description, depends_on=depends_on, description=description, depends_on=depends_on,
job_id=job_id) job_id=job_id)

@ -508,7 +508,7 @@ class Worker(object):
self.set_current_job_id(None, pipeline=pipeline) self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl) result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0: if result_ttl != 0:
job.ended_at = utcnow() job.ended_at = utcnow()
job._status = Status.FINISHED job._status = Status.FINISHED

@ -290,17 +290,27 @@ class TestJob(RQTestCase):
self.assertEqual(job.id, id) self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self) self.assertEqual(job.func, access_self)
def test_get_ttl(self): def test_get_result_ttl(self):
"""Getting job TTL.""" """Getting job result TTL."""
job_ttl = 1 job_result_ttl = 1
default_ttl = 2 default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl) job = Job.create(func=say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""
ttl = 1
job = Job.create(func=say_hello, ttl=ttl)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), ttl)
self.assertEqual(job.get_ttl(), job_ttl)
job = Job.create(func=say_hello) job = Job.create(func=say_hello)
job.save() job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None) self.assertEqual(job.get_ttl(), None)
def test_cleanup(self): def test_cleanup(self):

Loading…
Cancel
Save