diff --git a/rq/job.py b/rq/job.py index b645010..b229e2f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,7 +9,8 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', - 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance']) + 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', + 'result_ttl']) def unpickle(pickled_string): @@ -50,7 +51,8 @@ class Job(object): # Job construction @classmethod - def create(cls, func, args=None, kwargs=None, connection=None): + def create(cls, func, args=None, kwargs=None, connection=None, + result_ttl=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -71,6 +73,7 @@ class Job(object): job._args = args job._kwargs = kwargs job.description = job.get_call_string() + job.result_ttl = result_ttl return job @property @@ -134,6 +137,7 @@ class Job(object): self._result = None self.exc_info = None self.timeout = None + self.result_ttl = None # Data access @@ -219,6 +223,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -248,6 +253,8 @@ class Job(object): obj['exc_info'] = self.exc_info if self.timeout is not None: obj['timeout'] = self.timeout + if self.result_ttl is not None: + obj['result_ttl'] = self.result_ttl """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index b02e799..b790f04 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -104,7 +104,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None): + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa """Creates a job to represent the delayed function call and enqueues it. @@ -113,7 +113,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection) + job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -138,14 +138,16 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) timeout = None + result_ttl = None if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) args = kwargs.pop('args', None) + result_ttl = kwargs.pop('result_ttl', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout) + timeout=timeout, result_ttl=result_ttl) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/rq/worker.py b/rq/worker.py index 3306be6..b7a158c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -94,7 +94,8 @@ class Worker(object): return worker - def __init__(self, queues, name=None, rv_ttl=500, connection=None): # noqa + def __init__(self, queues, name=None, default_result_ttl=500, + connection=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -103,7 +104,7 @@ class Worker(object): self._name = name self.queues = queues self.validate_queues() - self.rv_ttl = rv_ttl + self.default_result_ttl = default_result_ttl self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -398,10 +399,20 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - if rv is not None: + # Expire results + has_result = rv is not None + explicit_ttl_requested = job.result_ttl is not None + should_expire = has_result or explicit_ttl_requested + if should_expire: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - p.expire(job.key, self.rv_ttl) + + if explicit_ttl_requested: + ttl = job.result_ttl + else: + ttl = self.default_result_ttl + if ttl >= 0: + p.expire(job.key, ttl) p.execute() else: # Cleanup immediately diff --git a/tests/test_job.py b/tests/test_job.py index f35ca76..916e94c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -189,3 +189,15 @@ class TestJob(RQTestCase): job2 = Job.fetch(job.id) job2.refresh() self.assertEqual(job2.foo, 'bar') + + def test_result_ttl_is_persisted(self): + """Ensure that job's result_ttl is set properly""" + job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, 10) + + job = Job.create(func=say_hello, args=('Lionel',)) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, None) diff --git a/tests/test_queue.py b/tests/test_queue.py index f667bdd..115e8eb 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -252,3 +252,11 @@ class TestFailedQueue(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.timeout, 200) + + def test_enqueue_preserves_result_ttl(self): + """Enqueueing persists result_ttl.""" + q = Queue() + job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10) + self.assertEqual(job.result_ttl, 10) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(int(job_from_queue.result_ttl), 10) diff --git a/tests/test_worker.py b/tests/test_worker.py index 9ea338b..a4f47e6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -96,7 +96,6 @@ class TestWorker(RQTestCase): self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info - def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs.""" @@ -147,7 +146,6 @@ class TestWorker(RQTestCase): assert self.testconn.ttl(job_with_rv.key) > 0 assert not self.testconn.exists(job_without_rv.key) - @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -175,3 +173,17 @@ class TestWorker(RQTestCase): # TODO: Having to do the manual refresh() here is really ugly! res.refresh() self.assertIn('JobTimeoutException', res.exc_info) + + def test_worker_sets_result_ttl(self): + """Ensure that Worker properly sets result_ttl for individual jobs.""" + q = Queue() + job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) + w = Worker([q]) + w.work(burst=True) + self.assertNotEqual(self.testconn.ttl(job.key), 0) + + # Job with -1 result_ttl don't expire + job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.ttl(job.key), None)