diff --git a/rq/job.py b/rq/job.py index 782c3c7..3c3016d 100644 --- a/rq/job.py +++ b/rq/job.py @@ -485,6 +485,8 @@ class Job(object): # Job execution def perform(self): # noqa """Invokes the job function with the job arguments.""" + self.connection.persist(self.key) + self.ttl = -1 _job_stack.push(self.id) try: self._result = self.func(*self.args, **self.kwargs) diff --git a/rq/queue.py b/rq/queue.py index baf16d6..2187c7e 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -183,7 +183,7 @@ class Queue(object): job = self.job_class.create( func, args, kwargs, connection=self.connection, - result_ttl=result_ttl, status=JobStatus.QUEUED, + result_ttl=result_ttl, ttl=ttl, status=JobStatus.QUEUED, description=description, depends_on=depends_on, timeout=timeout, id=job_id, origin=self.name) diff --git a/tests/fixtures.py b/tests/fixtures.py index a0e8eba..4271373 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -85,5 +85,6 @@ with Connection(): return x + y -def long_running_job(): - time.sleep(10) +def long_running_job(timeout=10): + time.sleep(timeout) + return 'Done sleeping...' diff --git a/tests/test_job.py b/tests/test_job.py index 1a7d231..0f6ab30 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -4,11 +4,6 @@ from __future__ import (absolute_import, division, print_function, from datetime import datetime -from tests import RQTestCase -from tests.fixtures import (access_self, CallableObject, Number, say_hello, - some_calculation) -from tests.helpers import strip_microseconds - from rq.compat import as_text, PY2 from rq.exceptions import NoSuchJobError, UnpickleError from rq.job import get_current_job, Job @@ -16,6 +11,11 @@ from rq.queue import Queue from rq.registry import DeferredJobRegistry from rq.utils import utcformat +from tests import RQTestCase +from tests.fixtures import (access_self, CallableObject, Number, say_hello, + some_calculation, long_running_job) +from tests.helpers import strip_microseconds + try: from cPickle import loads, dumps except ImportError: @@ -334,6 +334,24 @@ class TestJob(RQTestCase): job.save() self.assertEqual(job.get_ttl(), None) + def test_ttl_via_enqueue(self): + ttl = 1 + queue = Queue(connection=self.testconn) + job = queue.enqueue(say_hello, ttl=ttl) + self.assertEqual(job.get_ttl(), ttl) + + def test_never_expire_during_execution(self): + """Test what happens when job expires during execution""" + ttl = 1 + queue = Queue(connection=self.testconn) + job = queue.enqueue(long_running_job, args=(2,), ttl=ttl) + self.assertEqual(job.get_ttl(), ttl) + job.save() + job.perform() + self.assertEqual(job.get_ttl(), -1) + self.assertTrue(job.exists(job.id)) + self.assertEqual(job.result, 'Done sleeping...') + def test_cleanup(self): """Test that jobs and results are expired properly.""" job = Job.create(func=say_hello)