From a5e67659908a4f7d741abb984aec3a8f6cb57f45 Mon Sep 17 00:00:00 2001 From: Selwin Ong Date: Tue, 7 Aug 2012 14:34:51 +0700 Subject: [PATCH 1/5] Added "result_ttl" property on jobs that determines how long job results are persisted in Redis. --- rq/job.py | 11 +++++++++-- rq/queue.py | 8 +++++--- rq/worker.py | 5 ++++- tests/test_job.py | 12 ++++++++++++ tests/test_queue.py | 8 ++++++++ tests/test_worker.py | 16 ++++++++++++++++ 6 files changed, 54 insertions(+), 6 deletions(-) diff --git a/rq/job.py b/rq/job.py index b645010..b229e2f 100644 --- a/rq/job.py +++ b/rq/job.py @@ -9,7 +9,8 @@ from .exceptions import UnpickleError, NoSuchJobError JOB_ATTRS = set(['origin', '_func_name', 'ended_at', 'description', '_args', 'created_at', 'enqueued_at', 'connection', '_result', - 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance']) + 'timeout', '_kwargs', 'exc_info', '_id', 'data', '_instance', + 'result_ttl']) def unpickle(pickled_string): @@ -50,7 +51,8 @@ class Job(object): # Job construction @classmethod - def create(cls, func, args=None, kwargs=None, connection=None): + def create(cls, func, args=None, kwargs=None, connection=None, + result_ttl=None): """Creates a new Job instance for the given function, arguments, and keyword arguments. """ @@ -71,6 +73,7 @@ class Job(object): job._args = args job._kwargs = kwargs job.description = job.get_call_string() + job.result_ttl = result_ttl return job @property @@ -134,6 +137,7 @@ class Job(object): self._result = None self.exc_info = None self.timeout = None + self.result_ttl = None # Data access @@ -219,6 +223,7 @@ class Job(object): self._result = unpickle(obj.get('result')) if obj.get('result') else None # noqa self.exc_info = obj.get('exc_info') self.timeout = int(obj.get('timeout')) if obj.get('timeout') else None + self.result_ttl = int(obj.get('result_ttl')) if obj.get('result_ttl') else None # noqa # Overwrite job's additional attrs (those not in JOB_ATTRS), if any additional_attrs = set(obj.keys()).difference(JOB_ATTRS) @@ -248,6 +253,8 @@ class Job(object): obj['exc_info'] = self.exc_info if self.timeout is not None: obj['timeout'] = self.timeout + if self.result_ttl is not None: + obj['result_ttl'] = self.result_ttl """ Store additional attributes from job instance into Redis. This is done so that third party libraries using RQ can store additional data diff --git a/rq/queue.py b/rq/queue.py index b02e799..b790f04 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -104,7 +104,7 @@ class Queue(object): """Pushes a job ID on the corresponding Redis queue.""" self.connection.rpush(self.key, job_id) - def enqueue_call(self, func, args=None, kwargs=None, timeout=None): + def enqueue_call(self, func, args=None, kwargs=None, timeout=None, result_ttl=None): #noqa """Creates a job to represent the delayed function call and enqueues it. @@ -113,7 +113,7 @@ class Queue(object): contain options for RQ itself. """ timeout = timeout or self._default_timeout - job = Job.create(func, args, kwargs, connection=self.connection) + job = Job.create(func, args, kwargs, connection=self.connection, result_ttl=result_ttl) return self.enqueue_job(job, timeout=timeout) def enqueue(self, f, *args, **kwargs): @@ -138,14 +138,16 @@ class Queue(object): # Detect explicit invocations, i.e. of the form: # q.enqueue(foo, args=(1, 2), kwargs={'a': 1}, timeout=30) timeout = None + result_ttl = None if 'args' in kwargs or 'kwargs' in kwargs: assert args == (), 'Extra positional arguments cannot be used when using explicit args and kwargs.' # noqa timeout = kwargs.pop('timeout', None) args = kwargs.pop('args', None) + result_ttl = kwargs.pop('result_ttl', None) kwargs = kwargs.pop('kwargs', None) return self.enqueue_call(func=f, args=args, kwargs=kwargs, - timeout=timeout) + timeout=timeout, result_ttl=result_ttl) def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. diff --git a/rq/worker.py b/rq/worker.py index 897c66c..9d5fa58 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -400,7 +400,10 @@ class Worker(object): if rv is not None: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - p.expire(job.key, self.rv_ttl) + if job.result_ttl is None: + p.expire(job.key, self.rv_ttl) + elif job.result_ttl >= 0: + p.expire(job.key, job.result_ttl) p.execute() else: # Cleanup immediately diff --git a/tests/test_job.py b/tests/test_job.py index f35ca76..916e94c 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -189,3 +189,15 @@ class TestJob(RQTestCase): job2 = Job.fetch(job.id) job2.refresh() self.assertEqual(job2.foo, 'bar') + + def test_result_ttl_is_persisted(self): + """Ensure that job's result_ttl is set properly""" + job = Job.create(func=say_hello, args=('Lionel',), result_ttl=10) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, 10) + + job = Job.create(func=say_hello, args=('Lionel',)) + job.save() + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job.result_ttl, None) diff --git a/tests/test_queue.py b/tests/test_queue.py index f667bdd..0bfabc3 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -252,3 +252,11 @@ class TestFailedQueue(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.timeout, 200) + + def test_enqueue_preserves_result_ttl(self): + """Ensure that result_ttl argument are properly persisted to Redis.""" + q = Queue() + job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10) + self.assertEqual(job.result_ttl, 10) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(int(job_from_queue.result_ttl), 10) diff --git a/tests/test_worker.py b/tests/test_worker.py index 9ea338b..5083a14 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -175,3 +175,19 @@ class TestWorker(RQTestCase): # TODO: Having to do the manual refresh() here is really ugly! res.refresh() self.assertIn('JobTimeoutException', res.exc_info) + + def test_worker_sets_result_ttl(self): + """Ensure that Worker properly sets result_ttl for individual jobs.""" + q = Queue() + job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.ttl(job.key), 10) + + # Job with -1 result_ttl don't expire + job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) + w = Worker([q]) + w.work(burst=True) + self.assertEqual(self.testconn.ttl(job.key), None) + + From 5fcedbcdad51c82b5f3112bc3dc24e8e0a6f2f69 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Aug 2012 11:05:50 +0200 Subject: [PATCH 2/5] Change assertEqual to assertGreaterThan, to make the test a little less brittle (on slow machines). --- tests/test_queue.py | 2 +- tests/test_worker.py | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/test_queue.py b/tests/test_queue.py index 0bfabc3..115e8eb 100644 --- a/tests/test_queue.py +++ b/tests/test_queue.py @@ -254,7 +254,7 @@ class TestFailedQueue(RQTestCase): self.assertEquals(job.timeout, 200) def test_enqueue_preserves_result_ttl(self): - """Ensure that result_ttl argument are properly persisted to Redis.""" + """Enqueueing persists result_ttl.""" q = Queue() job = q.enqueue(div_by_zero, args=(1, 2, 3), result_ttl=10) self.assertEqual(job.result_ttl, 10) diff --git a/tests/test_worker.py b/tests/test_worker.py index 5083a14..a4f47e6 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -96,7 +96,6 @@ class TestWorker(RQTestCase): self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info - def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs.""" @@ -147,7 +146,6 @@ class TestWorker(RQTestCase): assert self.testconn.ttl(job_with_rv.key) > 0 assert not self.testconn.exists(job_without_rv.key) - @slow # noqa def test_timeouts(self): """Worker kills jobs after timeout.""" @@ -182,12 +180,10 @@ class TestWorker(RQTestCase): job = q.enqueue(say_hello, args=('Frank',), result_ttl=10) w = Worker([q]) w.work(burst=True) - self.assertEqual(self.testconn.ttl(job.key), 10) + self.assertNotEqual(self.testconn.ttl(job.key), 0) # Job with -1 result_ttl don't expire job = q.enqueue(say_hello, args=('Frank',), result_ttl=-1) w = Worker([q]) w.work(burst=True) self.assertEqual(self.testconn.ttl(job.key), None) - - From 4b2838943be0a867b0ffab00ce3143f276012981 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Aug 2012 11:06:20 +0200 Subject: [PATCH 3/5] Rename rv_ttl to default_result_ttl. --- rq/worker.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 9d5fa58..9eeb9a6 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -94,7 +94,8 @@ class Worker(object): return worker - def __init__(self, queues, name=None, rv_ttl=500, connection=None): # noqa + def __init__(self, queues, name=None, default_result_ttl=500, + connection=None): # noqa if connection is None: connection = get_current_connection() self.connection = connection @@ -103,7 +104,7 @@ class Worker(object): self._name = name self.queues = queues self.validate_queues() - self.rv_ttl = rv_ttl + self.default_result_ttl = default_result_ttl self._state = 'starting' self._is_horse = False self._horse_pid = 0 @@ -401,7 +402,7 @@ class Worker(object): p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) if job.result_ttl is None: - p.expire(job.key, self.rv_ttl) + p.expire(job.key, self.default_result_ttl) elif job.result_ttl >= 0: p.expire(job.key, job.result_ttl) p.execute() From 5963ec6d771c2e3f5a523d45b22352b858cea4be Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Aug 2012 11:06:57 +0200 Subject: [PATCH 4/5] Try to make the expiration code a bit more readable. --- rq/worker.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 9eeb9a6..67a2bed 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -398,13 +398,20 @@ class Worker(object): else: self.log.info('Job OK, result = %s' % (yellow(unicode(rv)),)) - if rv is not None: + # Expire results + has_result = rv is not None + explicit_ttl_requested = job.result_ttl is not None + should_expire = has_result or explicit_ttl_requested + if should_expire: p = self.connection.pipeline() p.hset(job.key, 'result', pickled_rv) - if job.result_ttl is None: - p.expire(job.key, self.default_result_ttl) - elif job.result_ttl >= 0: - p.expire(job.key, job.result_ttl) + + if explicit_ttl_requested: + ttl = job.result_ttl + else: + ttl = self.default_result_ttl + if ttl >= 0: + p.expire(job.key, ttl) p.execute() else: # Cleanup immediately From a9cb201d395ac64613fd06a028b61796a3f3075c Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Tue, 7 Aug 2012 11:07:03 +0200 Subject: [PATCH 5/5] PEP8ify. --- rq/worker.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 67a2bed..d527957 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -250,8 +250,9 @@ class Worker(object): msg = 'Warm shut down requested.' self.log.warning(msg) - # If shutdown is requested in the middle of a job, wait until finish - # before shutting down + + # If shutdown is requested in the middle of a job, wait until + # finish before shutting down if self.state == 'busy': self._stopped = True self.log.debug('Stopping after current horse is finished. '