diff --git a/rq/job.py b/rq/job.py index e8080f8..8067a01 100644 --- a/rq/job.py +++ b/rq/job.py @@ -92,7 +92,7 @@ class Job(object): # Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, - result_ttl=None, status=None, description=None, depends_on=None, timeout=None, + result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. @@ -131,6 +131,7 @@ class Job(object): # Extra meta data job.description = description or job.get_call_string() job.result_ttl = result_ttl + job.ttl = ttl job.timeout = timeout job._status = status @@ -311,6 +312,7 @@ class Job(object): self.exc_info = None self.timeout = None self.result_ttl = None + self.ttl = None self._status = None self._dependency_id = None self.meta = {} @@ -455,6 +457,7 @@ class Job(object): connection = pipeline if pipeline is not None else self.connection connection.hmset(key, self.to_dict()) + self.cleanup(self.ttl) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -491,8 +494,15 @@ class Job(object): return self._result def get_ttl(self, default_ttl=None): - """Returns ttl for a job that determines how long a job and its result - will be persisted. In the future, this method will also be responsible + """Returns ttl for a job that determines how long a job will be + persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.ttl is None else self.ttl + + def get_result_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a jobs result will + be persisted. In the future, this method will also be responsible for determining ttl for repeated jobs. """ return default_ttl if self.result_ttl is None else self.result_ttl @@ -513,14 +523,16 @@ class Job(object): def cleanup(self, ttl=None, pipeline=None): """Prepare job for eventual deletion (if needed). This method is usually called after successful execution. How long we persist the job and its - result depends on the value of result_ttl: - - If result_ttl is 0, cleanup the job immediately. + result depends on the value of ttl: + - If ttl is 0, cleanup the job immediately. - If it's a positive number, set the job to expire in X seconds. - - If result_ttl is negative, don't set an expiry to it (persist + - If ttl is negative, don't set an expiry to it (persist forever) """ if ttl == 0: self.cancel() + elif not ttl: + return elif ttl > 0: connection = pipeline if pipeline is not None else self.connection connection.expire(self.key, ttl) diff --git a/rq/queue.py b/rq/queue.py index 7dfb0f8..a91b4a2 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -167,8 +167,8 @@ class Queue(object): connection.rpush(self.key, job_id) def enqueue_call(self, func, args=None, kwargs=None, timeout=None, - result_ttl=None, description=None, depends_on=None, - job_id=None): + result_ttl=None, ttl=None, description=None, + depends_on=None, job_id=None): """Creates a job to represent the delayed function call and enqueues it. @@ -180,7 +180,7 @@ class Queue(object): # TODO: job with dependency shouldn't have "queued" as status job = self.job_class.create(func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=Status.QUEUED, + result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id) @@ -229,6 +229,7 @@ class Queue(object): timeout = kwargs.pop('timeout', None) description = kwargs.pop('description', None) result_ttl = kwargs.pop('result_ttl', None) + ttl = kwargs.pop('ttl', None) depends_on = kwargs.pop('depends_on', None) job_id = kwargs.pop('job_id', None) @@ -238,7 +239,7 @@ class Queue(object): kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout, result_ttl=result_ttl, + timeout=timeout, result_ttl=result_ttl, ttl=ttl, description=description, depends_on=depends_on, job_id=job_id) diff --git a/rq/worker.py b/rq/worker.py index bf40a65..20cd833 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -508,7 +508,7 @@ class Worker(object): self.set_current_job_id(None, pipeline=pipeline) - result_ttl = job.get_ttl(self.default_result_ttl) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl != 0: job.ended_at = utcnow() job._status = Status.FINISHED diff --git a/tests/test_job.py b/tests/test_job.py index 28fad40..34859a7 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -290,17 +290,27 @@ class TestJob(RQTestCase): self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) - def test_get_ttl(self): - """Getting job TTL.""" - job_ttl = 1 + def test_get_result_ttl(self): + """Getting job result TTL.""" + job_result_ttl = 1 default_ttl = 2 - job = Job.create(func=say_hello, result_ttl=job_ttl) + job = Job.create(func=say_hello, result_ttl=job_result_ttl) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), job_result_ttl) + self.assertEqual(job.get_result_ttl(), job_result_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_result_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_result_ttl(), None) + + def test_get_job_ttl(self): + """Getting job TTL.""" + ttl = 1 + job = Job.create(func=say_hello, ttl=ttl) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) - self.assertEqual(job.get_ttl(), job_ttl) + self.assertEqual(job.get_ttl(), ttl) job = Job.create(func=say_hello) job.save() - self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) self.assertEqual(job.get_ttl(), None) def test_cleanup(self):