diff --git a/rq/job.py b/rq/job.py index a0367ee..98ddfec 100644 --- a/rq/job.py +++ b/rq/job.py @@ -287,9 +287,10 @@ class Job(object): self._status = obj.get('status') if obj.get('status') else None self.meta = unpickle(obj.get('meta')) if obj.get('meta') else {} - def save(self): + def save(self, pipeline=None): """Persists the current job instance to its corresponding Redis key.""" key = self.key + connection = pipeline if pipeline is not None else self.connection obj = {} obj['created_at'] = times.format(self.created_at or times.now(), 'UTC') @@ -317,7 +318,7 @@ class Job(object): if self.meta: obj['meta'] = dumps(self.meta) - self.connection.hmset(key, obj) + connection.hmset(key, obj) def cancel(self): """Cancels the given job, which will prevent the job from ever being @@ -346,6 +347,13 @@ class Job(object): return self._result + def get_ttl(self, default_ttl=None): + """Returns ttl for a job that determines how long a job and its result + will be persisted. In the future, this method will also be responsible + for determining ttl for repeated jobs. + """ + return default_ttl if self.result_ttl is None else self.result_ttl + # Representation def get_call_string(self): # noqa """Returns a string representation of the call, formatted as a regular @@ -359,6 +367,22 @@ class Job(object): args = ', '.join(arg_list) return '%s(%s)' % (self.func_name, args) + def cleanup(self, ttl=None, pipeline=None): + """Prepare job for eventual deletion (if needed). This method is usually + called after successful execution. How long we persist the job and its + result depends on the value of result_ttl: + - If result_ttl is 0, cleanup the job immediately. + - If it's a positive number, set the job to expire in X seconds. + - If result_ttl is negative, don't set an expiry to it (persist + forever) + """ + if ttl == 0: + self.cancel() + elif ttl > 0: + connection = pipeline if pipeline is not None else self.connection + connection.expire(self.key, ttl) + + def __str__(self): return '' % (self.id, self.description) diff --git a/rq/worker.py b/rq/worker.py index b11bec9..4a66686 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -412,9 +412,17 @@ class Worker(object): # Pickle the result in the same try-except block since we need to # use the same exc handling when pickling fails - pickled_rv = dumps(rv) + job._result = rv job._status = Status.FINISHED job.ended_at = times.now() + + result_ttl = job.get_ttl(self.default_result_ttl) + pipeline = self.connection.pipeline() + if result_ttl != 0: + job.save(pipeline=pipeline) + job.cleanup(result_ttl, pipeline=pipeline) + pipeline.execute() + except: # Use the public setter here, to immediately update Redis job.status = Status.FAILED @@ -426,27 +434,12 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - # How long we persist the job result depends on the value of - # result_ttl: - # - If result_ttl is 0, cleanup the job immediately. - # - If it's a positive number, set the job to expire in X seconds. - # - If result_ttl is negative, don't set an expiry to it (persist - # forever) - result_ttl = self.default_result_ttl if job.result_ttl is None else job.result_ttl # noqa if result_ttl == 0: - job.delete() self.log.info('Result discarded immediately.') + elif result_ttl > 0: + self.log.info('Result is kept for %d seconds.' % result_ttl) else: - p = self.connection.pipeline() - p.hset(job.key, 'result', pickled_rv) - p.hset(job.key, 'status', job._status) - p.hset(job.key, 'ended_at', times.format(job.ended_at, 'UTC')) - if result_ttl > 0: - p.expire(job.key, result_ttl) - self.log.info('Result is kept for %d seconds.' % result_ttl) - else: - self.log.warning('Result will never expire, clean up result key manually.') - p.execute() + self.log.warning('Result will never expire, clean up result key manually.') return True diff --git a/tests/test_job.py b/tests/test_job.py index 13af0b5..8b1d137 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -219,3 +219,33 @@ class TestJob(RQTestCase): id = job.perform() self.assertEqual(job.id, id) self.assertEqual(job.func, access_self) + + def test_get_ttl(self): + """Getting job TTL.""" + job_ttl = 1 + default_ttl = 2 + job = Job.create(func=say_hello, result_ttl=job_ttl) + job.save() + self.assertEqual(job.get_ttl(default_ttl=default_ttl), job_ttl) + self.assertEqual(job.get_ttl(), job_ttl) + job = Job.create(func=say_hello) + job.save() + self.assertEqual(job.get_ttl(default_ttl=default_ttl), default_ttl) + self.assertEqual(job.get_ttl(), None) + + def test_cleanup(self): + """Test that jobs and results are expired properly.""" + job = Job.create(func=say_hello) + job.save() + + # Jobs with negative TTLs don't expire + job.cleanup(ttl=-1) + self.assertEqual(self.testconn.ttl(job.key), -1) + + # Jobs with positive TTLs are eventually deleted + job.cleanup(ttl=100) + self.assertEqual(self.testconn.ttl(job.key), 100) + + # Jobs with 0 TTL are immediately deleted + job.cleanup(ttl=0) + self.assertRaises(NoSuchJobError, Job.fetch, job.id, self.testconn)