added job ttl to queue.enqueue()

main
glaslos 10 years ago
parent 786d3c5887
commit 55c541bc59

@ -92,7 +92,7 @@ class Job(object):
# Job construction
@classmethod
def create(cls, func, args=None, kwargs=None, connection=None,
result_ttl=None, status=None, description=None, depends_on=None, timeout=None,
result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None,
id=None):
"""Creates a new Job instance for the given function, arguments, and
keyword arguments.
@ -131,6 +131,7 @@ class Job(object):
# Extra meta data
job.description = description or job.get_call_string()
job.result_ttl = result_ttl
job.job_ttl = job_ttl
job.timeout = timeout
job._status = status
@ -311,6 +312,7 @@ class Job(object):
self.exc_info = None
self.timeout = None
self.result_ttl = None
self.job_ttl = None
self._status = None
self._dependency_id = None
self.meta = {}
@ -455,6 +457,8 @@ class Job(object):
connection = pipeline if pipeline is not None else self.connection
connection.hmset(key, self.to_dict())
if self.job_ttl:
connection.expire(key, self.job_ttl)
def cancel(self):
"""Cancels the given job, which will prevent the job from ever being
@ -491,8 +495,15 @@ class Job(object):
return self._result
def get_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a job and its result
will be persisted. In the future, this method will also be responsible
"""Returns ttl for a job that determines how long a job will be
persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.job_ttl is None else self.job_ttl
def get_result_ttl(self, default_ttl=None):
"""Returns ttl for a job that determines how long a jobs result will
be persisted. In the future, this method will also be responsible
for determining ttl for repeated jobs.
"""
return default_ttl if self.result_ttl is None else self.result_ttl

@ -167,8 +167,8 @@ class Queue(object):
connection.rpush(self.key, job_id)
def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
result_ttl=None, description=None, depends_on=None,
job_id=None):
result_ttl=None, job_ttl=None, description=None,
depends_on=None, job_id=None):
"""Creates a job to represent the delayed function call and enqueues
it.
@ -180,7 +180,7 @@ class Queue(object):
# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)
@ -229,6 +229,7 @@ class Queue(object):
timeout = kwargs.pop('timeout', None)
description = kwargs.pop('description', None)
result_ttl = kwargs.pop('result_ttl', None)
job_ttl = kwargs.pop('job_ttl', None)
depends_on = kwargs.pop('depends_on', None)
job_id = kwargs.pop('job_id', None)
@ -238,7 +239,7 @@ class Queue(object):
kwargs = kwargs.pop('kwargs', None)
return self.enqueue_call(func=f, args=args, kwargs=kwargs,
timeout=timeout, result_ttl=result_ttl,
timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl,
description=description, depends_on=depends_on,
job_id=job_id)

@ -508,7 +508,7 @@ class Worker(object):
self.set_current_job_id(None, pipeline=pipeline)
result_ttl = job.get_ttl(self.default_result_ttl)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
job._status = Status.FINISHED

@ -290,17 +290,27 @@ class TestJob(RQTestCase):
self.assertEqual(job.id, id)
self.assertEqual(job.func, access_self)
def test_get_ttl(self):
def test_get_result_ttl(self):
"""Getting job result TTL."""
job_result_ttl = 1
default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_result_ttl)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl)
self.assertEqual(job.get_result_ttl(), job_result_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_result_ttl(), None)
def test_get_job_ttl(self):
"""Getting job TTL."""
job_ttl = 1
default_ttl = 2
job = Job.create(func=say_hello, result_ttl=job_ttl)
job = Job.create(func=say_hello, job_ttl=job_ttl)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl)
self.assertEqual(job.get_ttl(), job_ttl)
job = Job.create(func=say_hello)
job.save()
self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl)
self.assertEqual(job.get_ttl(), None)
def test_cleanup(self):

Loading…
Cancel
Save