Merge branch 'timeouts'

This fixes #12.
main
Vincent Driessen 13 years ago
commit df2b2838de

@ -75,6 +75,7 @@ class Job(object):
self.ended_at = None self.ended_at = None
self._result = None self._result = None
self.exc_info = None self.exc_info = None
self.timeout = None
# Data access # Data access
@ -143,10 +144,10 @@ class Job(object):
""" """
key = self.key key = self.key
properties = ['data', 'created_at', 'origin', 'description', properties = ['data', 'created_at', 'origin', 'description',
'enqueued_at', 'ended_at', 'result', 'exc_info'] 'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout']
data, created_at, origin, description, \ data, created_at, origin, description, \
enqueued_at, ended_at, result, \ enqueued_at, ended_at, result, \
exc_info = conn.hmget(key, properties) exc_info, timeout = conn.hmget(key, properties)
if data is None: if data is None:
raise NoSuchJobError('No such job: %s' % (key,)) raise NoSuchJobError('No such job: %s' % (key,))
@ -164,6 +165,10 @@ class Job(object):
self.ended_at = to_date(ended_at) self.ended_at = to_date(ended_at)
self._result = result self._result = result
self.exc_info = exc_info self.exc_info = exc_info
if timeout is None:
self.timeout = None
else:
self.timeout = int(timeout)
def save(self): def save(self):
"""Persists the current job instance to its corresponding Redis key.""" """Persists the current job instance to its corresponding Redis key."""
@ -186,6 +191,8 @@ class Job(object):
obj['result'] = self._result obj['result'] = self._result
if self.exc_info is not None: if self.exc_info is not None:
obj['exc_info'] = self.exc_info obj['exc_info'] = self.exc_info
if self.timeout is not None:
obj['timeout'] = self.timeout
conn.hmset(key, obj) conn.hmset(key, obj)

@ -32,10 +32,11 @@ class Queue(object):
name = queue_key[len(prefix):] name = queue_key[len(prefix):]
return Queue(name) return Queue(name)
def __init__(self, name='default'): def __init__(self, name='default', default_timeout=None):
prefix = self.redis_queue_namespace_prefix prefix = self.redis_queue_namespace_prefix
self.name = name self.name = name
self._key = '%s%s' % (prefix, name) self._key = '%s%s' % (prefix, name)
self._default_timeout = default_timeout
@property @property
def key(self): def key(self):
@ -99,24 +100,38 @@ class Queue(object):
Expects the function to call, along with the arguments and keyword Expects the function to call, along with the arguments and keyword
arguments. arguments.
The special keyword `timeout` is reserved for `enqueue()` itself and
it won't be passed to the actual job function.
""" """
if f.__module__ == '__main__': if f.__module__ == '__main__':
raise ValueError( raise ValueError(
'Functions from the __main__ module cannot be processed ' 'Functions from the __main__ module cannot be processed '
'by workers.') 'by workers.')
timeout = kwargs.pop('timeout', None)
job = Job.create(f, *args, **kwargs) job = Job.create(f, *args, **kwargs)
return self.enqueue_job(job) return self.enqueue_job(job, timeout=timeout)
def enqueue_job(self, job, set_meta_data=True): def enqueue_job(self, job, timeout=None, set_meta_data=True):
"""Enqueues a job for delayed execution. """Enqueues a job for delayed execution.
When the `timeout` argument is sent, it will overrides the default
timeout value of 180 seconds. `timeout` may either be a string or
integer.
If the `set_meta_data` argument is `True` (default), it will update If the `set_meta_data` argument is `True` (default), it will update
the properties `origin` and `enqueued_at`. the properties `origin` and `enqueued_at`.
""" """
if set_meta_data: if set_meta_data:
job.origin = self.name job.origin = self.name
job.enqueued_at = times.now() job.enqueued_at = times.now()
if timeout:
job.timeout = timeout # _timeout_in_seconds(timeout)
else:
job.timeout = 180 # default
job.save() job.save()
self.push_job_id(job.id) self.push_job_id(job.id)
return job return job

@ -0,0 +1,51 @@
import signal
class JobTimeoutException(Exception):
"""Raised when a job takes longer to complete than the allowed maximum
timeout value.
"""
pass
class death_pentalty_after(object):
def __init__(self, timeout):
self._timeout = timeout
def __enter__(self):
self.setup_death_penalty()
def __exit__(self, type, value, traceback):
# Always cancel immediately, since we're done
try:
self.cancel_death_penalty()
except JobTimeoutException:
# Weird case: we're done with the with body, but now the alarm is
# fired. We may safely ignore this situation and consider the
# body done.
pass
# __exit__ may return True to supress further exception handling. We
# don't want to suppress any exceptions here, since all errors should
# just pass through, JobTimeoutException being handled normally to the
# invoking context.
return False
def handle_death_penalty(self, signum, frame):
raise JobTimeoutException('Job exceeded maximum timeout '
'value (%d seconds).' % self._timeout)
def setup_death_penalty(self):
"""Sets up an alarm signal and a signal handler that raises
a JobTimeoutException after the timeout amount (expressed in
seconds).
"""
signal.signal(signal.SIGALRM, self.handle_death_penalty)
signal.alarm(self._timeout)
def cancel_death_penalty(self):
"""Removes the death penalty alarm and puts back the system into
default signal handling.
"""
signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL)

@ -1,4 +1,3 @@
import sys
import os import os
import errno import errno
import random import random
@ -17,6 +16,7 @@ from .queue import Queue, FailedQueue
from .proxy import conn from .proxy import conn
from .utils import make_colorizer from .utils import make_colorizer
from .exceptions import NoQueueError, UnpickleError from .exceptions import NoQueueError, UnpickleError
from .timeouts import death_pentalty_after
green = make_colorizer('darkgreen') green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow') yellow = make_colorizer('darkyellow')
@ -295,14 +295,14 @@ class Worker(object):
return did_perform_work return did_perform_work
def fork_and_perform_job(self, job): def fork_and_perform_job(self, job):
"""Spawns a work horse to perform the actual work and passes it a job.
The worker will wait for the work horse and make sure it executes
within the given timeout bounds, or will end the work horse with
SIGALRM.
"""
child_pid = os.fork() child_pid = os.fork()
if child_pid == 0: if child_pid == 0:
self._is_horse = True self.main_work_horse(job)
random.seed()
self.log = Logger('horse')
success = self.perform_job(job)
sys.exit(int(not success))
else: else:
self._horse_pid = child_pid self._horse_pid = child_pid
self.procline('Forked %d at %d' % (child_pid, time.time())) self.procline('Forked %d at %d' % (child_pid, time.time()))
@ -320,11 +320,30 @@ class Worker(object):
if e.errno != errno.EINTR: if e.errno != errno.EINTR:
raise raise
def main_work_horse(self, job):
"""This is the entry point of the newly spawned work horse."""
# After fork()'ing, always assure we are generating random sequences
# that are different from the worker.
random.seed()
self._is_horse = True
self.log = Logger('horse')
success = self.perform_job(job)
# os._exit() is the way to exit from childs after a fork(), in
# constrast to the regular sys.exit()
os._exit(int(not success))
def perform_job(self, job): def perform_job(self, job):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.procline('Processing %s from %s since %s' % ( self.procline('Processing %s from %s since %s' % (
job.func.__name__, job.func.__name__,
job.origin, time.time())) job.origin, time.time()))
try: try:
with death_pentalty_after(job.timeout or 180):
rv = job.perform() rv = job.perform()
except Exception as e: except Exception as e:
fq = self.failed_queue fq = self.failed_queue

@ -11,6 +11,11 @@ else
safe_rg=cat safe_rg=cat
fi fi
export ONLY_RUN_FAST_TESTS=1
if [ "$1" == '-f' ]; then # Poor man's argparse
unset ONLY_RUN_FAST_TESTS
fi
if check_redis_running; then if check_redis_running; then
/usr/bin/env python -m unittest discover -v -s tests $@ 2>&1 | egrep -v '^test_' | $safe_rg /usr/bin/env python -m unittest discover -v -s tests $@ 2>&1 | egrep -v '^test_' | $safe_rg
else else

@ -3,6 +3,7 @@ from redis import Redis
from logbook import NullHandler from logbook import NullHandler
from rq import conn from rq import conn
def find_empty_redis_database(): def find_empty_redis_database():
"""Tries to connect to a random Redis database (starting from 4), and """Tries to connect to a random Redis database (starting from 4), and
will use/connect it when no keys are in there. will use/connect it when no keys are in there.
@ -15,6 +16,18 @@ def find_empty_redis_database():
assert False, 'No empty Redis database found to run tests in.' assert False, 'No empty Redis database found to run tests in.'
def slow(f):
import os
from functools import wraps
@wraps(f)
def _inner(*args, **kwargs):
if os.environ.get('ONLY_RUN_FAST_TESTS'):
f(*args, **kwargs)
return _inner
class RQTestCase(unittest.TestCase): class RQTestCase(unittest.TestCase):
"""Base class to inherit test cases from for RQ. """Base class to inherit test cases from for RQ.
@ -52,5 +65,5 @@ class RQTestCase(unittest.TestCase):
# Pop the connection to Redis # Pop the connection to Redis
testconn = conn.pop() testconn = conn.pop()
assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' assert testconn == cls.testconn, 'Wow, something really nasty ' \
'happened to the Redis connection stack. Check your setup.'

@ -2,6 +2,8 @@
This file contains all jobs that are used in tests. Each of these test This file contains all jobs that are used in tests. Each of these test
fixtures has a slighty different characteristics. fixtures has a slighty different characteristics.
""" """
import time
def say_hello(name=None): def say_hello(name=None):
"""A job with a single argument and a return value.""" """A job with a single argument and a return value."""
@ -9,20 +11,24 @@ def say_hello(name=None):
name = 'Stranger' name = 'Stranger'
return 'Hi there, %s!' % (name,) return 'Hi there, %s!' % (name,)
def do_nothing(): def do_nothing():
"""The best job in the world.""" """The best job in the world."""
pass pass
def div_by_zero(x): def div_by_zero(x):
"""Prepare for a division-by-zero exception.""" """Prepare for a division-by-zero exception."""
return x / 0 return x / 0
def some_calculation(x, y, z=1): def some_calculation(x, y, z=1):
"""Some arbitrary calculation with three numbers. Choose z smartly if you """Some arbitrary calculation with three numbers. Choose z smartly if you
want a division by zero exception. want a division by zero exception.
""" """
return x * y / z return x * y / z
def create_file(path): def create_file(path):
"""Creates a file at the given path. Actually, leaves evidence that the """Creates a file at the given path. Actually, leaves evidence that the
job ran.""" job ran."""
@ -30,3 +36,6 @@ def create_file(path):
f.write('Just a sentinel.') f.write('Just a sentinel.')
def create_file_after_timeout(path, timeout):
time.sleep(timeout)
create_file(path)

@ -110,6 +110,7 @@ class TestJob(RQTestCase):
['created_at', 'data', 'description']) ['created_at', 'data', 'description'])
def test_store_then_fetch(self): def test_store_then_fetch(self):
"""Store, then fetch."""
job = Job.create(some_calculation, 3, 4, z=2) job = Job.create(some_calculation, 3, 4, z=2)
job.save() job.save()
@ -149,4 +150,3 @@ class TestJob(RQTestCase):
self.testconn.hset(job.key, 'data', unimportable_data) self.testconn.hset(job.key, 'data', unimportable_data)
with self.assertRaises(UnpickleError): with self.assertRaises(UnpickleError):
job.refresh() job.refresh()

@ -1,6 +1,7 @@
import os import os
from tests import RQTestCase from tests import RQTestCase, slow
from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \
create_file_after_timeout
from tests.helpers import strip_milliseconds from tests.helpers import strip_milliseconds
from rq import Queue, Worker, Job from rq import Queue, Worker, Job
@ -16,10 +17,12 @@ class TestWorker(RQTestCase):
"""Worker processes work, then quits.""" """Worker processes work, then quits."""
fooq, barq = Queue('foo'), Queue('bar') fooq, barq = Queue('foo'), Queue('bar')
w = Worker([fooq, barq]) w = Worker([fooq, barq])
self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.') self.assertEquals(w.work(burst=True), False,
'Did not expect any work on the queue.')
fooq.enqueue(say_hello, name='Frank') fooq.enqueue(say_hello, name='Frank')
self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') self.assertEquals(w.work(burst=True), True,
'Expected at least some work done.')
def test_work_is_unreadable(self): def test_work_is_unreadable(self):
"""Unreadable jobs are put on the failed queue.""" """Unreadable jobs are put on the failed queue."""
@ -78,13 +81,13 @@ class TestWorker(RQTestCase):
job = Job.fetch(job.id) job = Job.fetch(job.id)
self.assertEquals(job.origin, q.name) self.assertEquals(job.origin, q.name)
# should be the original enqueued_at date, not the date of enqueueing to # Should be the original enqueued_at date, not the date of enqueueing
# the failed queue # to the failed queue
self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertEquals(job.enqueued_at, enqueued_at_date)
self.assertIsNotNone(job.exc_info) # should contain exc_info self.assertIsNotNone(job.exc_info) # should contain exc_info
def test_cancelled_jobs_arent_executed(self): def test_cancelled_jobs_arent_executed(self): # noqa
"""Cancelling jobs.""" """Cancelling jobs."""
SENTINEL_FILE = '/tmp/rq-tests.txt' SENTINEL_FILE = '/tmp/rq-tests.txt'
@ -129,3 +132,31 @@ class TestWorker(RQTestCase):
# results are immediately removed # results are immediately removed
assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.ttl(job_with_rv.key) > 0
assert self.testconn.exists(job_without_rv.key) == False assert self.testconn.exists(job_without_rv.key) == False
@slow # noqa
def test_timeouts(self):
"""Worker kills jobs after timeout."""
sentinel_file = '/tmp/.rq_sentinel'
q = Queue()
w = Worker([q])
# Put it on the queue with a timeout value
res = q.enqueue(
create_file_after_timeout, sentinel_file, 4,
timeout=1)
try:
os.unlink(sentinel_file)
except OSError as e:
if e.errno == 2:
pass
self.assertEquals(os.path.exists(sentinel_file), False)
w.work(burst=True)
self.assertEquals(os.path.exists(sentinel_file), False)
# TODO: Having to do the manual refresh() here is really ugly!
res.refresh()
self.assertIn('JobTimeoutException', res.exc_info)

Loading…
Cancel
Save