From 55c541bc5914cc09cb634c815337e533c37180db Mon Sep 17 00:00:00 2001 From: glaslos Date: Mon, 24 Nov 2014 14:12:51 +0100 Subject: [PATCH] added job ttl to queue.enqueue() --- rq/job.py | 17 ++++++++++++++--- rq/queue.py | 9 +++++---- rq/worker.py | 2 +- tests/test_job.py | 20 +++++++++++++++----- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/rq/job.py b/rq/job.py index e8080f8..a026fda 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, job_ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,6 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.job_ttl = job_ttl job.timeout = timeout job._status = status @@ -311,6 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.job_ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -455,6 +457,8 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) + if self.job_ttl: + connection.expire(key, self.job_ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -491,8 +495,15 @@ class Job(object): return self._result def get_ttl(self, default_ttl=None): - """Returns ttl for a job that determines how long a job and its result - will be persisted. In the future, this method will also be responsible + """Returns ttl for a job that determines how long a job will be + persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.job_ttl is None else self.job_ttl + + def get_result_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a jobs result will + be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.result_ttl is None else self.result_ttl diff --git a/rq/queue.py b/rq/queue.py index ff5f860..9a2c9af 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,8 +167,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None, - job_id=None): + result_ttl=None, job_ttl=None, description=None, + depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED, + result_ttl=result_ttl, job_ttl=job_ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,6 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) + job_ttl = kwargs.pop('job_ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -238,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, + timeout=timeout, result_ttl=result_ttl, job_ttl=job_ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..20cd833 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -508,7 +508,7 @@ class Worker(object): self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_ttl(self.default_result_ttl) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() job._status = Status.FINISHED diff --git a/tests/test_job.py b/tests/test_job.py index 28fad40..27cebb0 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -290,17 +290,27 @@ class TestJob(RQTestCase): self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) - def test_get_ttl(self): + def test_get_result_ttl(self): + """Getting job result TTL.""" + job_result_ttl = 1 + default_ttl = 2 + job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) + self.assertEqual(job.get_result_ttl(), job_result_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_result_ttl(), None) + + def test_get_job_ttl(self): """Getting job TTL.""" job_ttl = 1 - default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_ttl) + job = Job.create(func=say_hello, job_ttl=job_ttl) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) self.assertEqual(job.get_ttl(), job_ttl) job = Job.create(func=say_hello) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_ttl(), None) def test_cleanup(self):