Test the timing out of jobs.

Really looking for a way to speed up this test.  It takes up a whole
second doing nothing now, really.
main
Vincent Driessen 13 years ago
parent 9ac9c23412
commit e807748ee6

@ -2,6 +2,7 @@
This file contains all jobs that are used in tests. Each of these test This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics. fixtures has a slighty different characteristics.
""" """
import time
def say_hello(name=None): def say_hello(name=None):
@ -33,3 +34,8 @@ def create_file(path):
job ran.""" job ran."""
with open(path, 'w') as f: with open(path, 'w') as f:
f.write('Just a sentinel.') f.write('Just a sentinel.')
def create_file_after_timeout(path, timeout):
time.sleep(timeout)
create_file(path)

@ -110,6 +110,7 @@ class TestJob(RQTestCase):
['created_at', 'data', 'description']) ['created_at', 'data', 'description'])
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
@ -149,4 +150,3 @@ class TestJob(RQTestCase):
self.testconn.hset(job.key, 'data', unimportable_data) self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
job.refresh() job.refresh()

@ -1,6 +1,7 @@
import os import os
from tests import RQTestCase from tests import RQTestCase
from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from rq import Queue, Worker, Job from rq import Queue, Worker, Job
@ -129,3 +130,30 @@ class TestWorker(RQTestCase):
# results are immediately removed # results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.ttl(job_with_rv.key) > 0
assert self.testconn.exists(job_without_rv.key) == False assert self.testconn.exists(job_without_rv.key) == False
def test_timeouts(self):
"""Worker kills jobs after timeout."""
sentinel_file = '/tmp/.rq_sentinel'
q = Queue()
w = Worker([q])
# Put it on the queue with a timeout value
res = q.enqueue(
create_file_after_timeout, sentinel_file, 4,
timeout=1)
try:
os.unlink(sentinel_file)
except OSError as e:
if e.errno == 2:
pass
self.assertEquals(os.path.exists(sentinel_file), False)
w.work(burst=True)
self.assertEquals(os.path.exists(sentinel_file), False)
# TODO: Having to do the manual refresh() here is really ugly!
res.refresh()
self.assertIn('JobTimeoutException', res.exc_info)

Loading…
Cancel
Save