diff --git a/rq/job.py b/rq/job.py index b645010..b229e2f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,7 +9,8 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', - 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance']) + 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', + 'result_ttl']) def unpickle(pickled_string): @@ -50,7 +51,8 @@ class Job(object): # Job construction @classmethod - def create(cls, func, args=None, kwargs=None, connection=None): + def create(cls, func, args=None, kwargs=None, connection=None, + result_ttl=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -71,6 +73,7 @@ class Job(object): job._args = args job._kwargs = kwargs job.description = job.get_call_string() + job.result_ttl = result_ttl return job @property @@ -134,6 +137,7 @@ class Job(object): self._result = None self.exc_info = None self.timeout = None + self.result_ttl = None # Data access @@ -219,6 +223,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -248,6 +253,8 @@ class Job(object): obj['exc_info'] = self.exc_info if self.timeout is not None: obj['timeout'] = self.timeout + if self.result_ttl is not None: + obj['result_ttl'] = self.result_ttl """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index b02e799..b790f04 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -104,7 +104,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None): + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa """Creates a job to represent the delayed function call and enqueues it. @@ -113,7 +113,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection) + job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -138,14 +138,16 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) timeout = None + result_ttl = None if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) args = kwargs.pop('args', None) + result_ttl = kwargs.pop('result_ttl', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout) + timeout=timeout, result_ttl=result_ttl) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/rq/worker.py b/rq/worker.py index 897c66c..9d5fa58 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -400,7 +400,10 @@ class Worker(object): if rv is not None: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - p.expire(job.key, self.rv_ttl) + if job.result_ttl is None: + p.expire(job.key, self.rv_ttl) + elif job.result_ttl >= 0: + p.expire(job.key, job.result_ttl) p.execute() else: # Cleanup immediately diff --git a/tests/test_job.py b/tests/test_job.py index f35ca76..916e94c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -189,3 +189,15 @@ class TestJob(RQTestCase): job2 = Job.fetch(job.id) job2.refresh() self.assertEqual(job2.foo, 'bar') + + def test_result_ttl_is_persisted(self): + """Ensure that job's result_ttl is set properly""" + job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, 10) + + job = Job.create(func=say_hello, args=('Lionel',)) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, None) diff --git a/tests/test_queue.py b/tests/test_queue.py index f667bdd..0bfabc3 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -252,3 +252,11 @@ class TestFailedQueue(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.timeout, 200) + + def test_enqueue_preserves_result_ttl(self): + """Ensure that result_ttl argument are properly persisted to Redis.""" + q = Queue() + job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10) + self.assertEqual(job.result_ttl, 10) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(int(job_from_queue.result_ttl), 10) diff --git a/tests/test_worker.py b/tests/test_worker.py index 9ea338b..5083a14 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -175,3 +175,19 @@ class TestWorker(RQTestCase): # TODO: Having to do the manual refresh() here is really ugly! res.refresh() self.assertIn('JobTimeoutException', res.exc_info) + + def test_worker_sets_result_ttl(self): + """Ensure that Worker properly sets result_ttl for individual jobs.""" + q = Queue() + job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.ttl(job.key), 10) + + # Job with -1 result_ttl don't expire + job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.ttl(job.key), None) + +