diff --git a/rq/job.py b/rq/job.py index c91c781..50ab65e 100644 --- a/rq/job.py +++ b/rq/job.py @@ -75,6 +75,7 @@ class Job(object): self.ended_at = None self._result = None self.exc_info = None + self.timeout = None # Data access @@ -143,10 +144,10 @@ class Job(object): """ key = self.key properties = ['data', 'created_at', 'origin', 'description', - 'enqueued_at', 'ended_at', 'result', 'exc_info'] + 'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout'] data, created_at, origin, description, \ enqueued_at, ended_at, result, \ - exc_info = conn.hmget(key, properties) + exc_info, timeout = conn.hmget(key, properties) if data is None: raise NoSuchJobError('No such job: %s' % (key,)) @@ -164,6 +165,10 @@ class Job(object): self.ended_at = to_date(ended_at) self._result = result self.exc_info = exc_info + if timeout is None: + self.timeout = None + else: + self.timeout = int(timeout) def save(self): """Persists the current job instance to its corresponding Redis key.""" @@ -186,6 +191,8 @@ class Job(object): obj['result'] = self._result if self.exc_info is not None: obj['exc_info'] = self.exc_info + if self.timeout is not None: + obj['timeout'] = self.timeout conn.hmset(key, obj) diff --git a/rq/queue.py b/rq/queue.py index 7ec1efe..f783be7 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -32,10 +32,11 @@ class Queue(object): name = queue_key[len(prefix):] return Queue(name) - def __init__(self, name='default'): + def __init__(self, name='default', default_timeout=None): prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) + self._default_timeout = default_timeout @property def key(self): @@ -99,24 +100,38 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. + + The special keyword `timeout` is reserved for `enqueue()` itself and + it won't be passed to the actual job function. """ if f.__module__ == '__main__': raise ValueError( 'Functions from the __main__ module cannot be processed ' 'by workers.') + timeout = kwargs.pop('timeout', None) job = Job.create(f, *args, **kwargs) - return self.enqueue_job(job) + return self.enqueue_job(job, timeout=timeout) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. + When the `timeout` argument is sent, it will overrides the default + timeout value of 180 seconds. `timeout` may either be a string or + integer. + If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. """ if set_meta_data: job.origin = self.name job.enqueued_at = times.now() + + if timeout: + job.timeout = timeout # _timeout_in_seconds(timeout) + else: + job.timeout = 180 # default + job.save() self.push_job_id(job.id) return job diff --git a/rq/timeouts.py b/rq/timeouts.py new file mode 100644 index 0000000..72b827e --- /dev/null +++ b/rq/timeouts.py @@ -0,0 +1,51 @@ +import signal + + +class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed maximum + timeout value. + """ + pass + + +class death_pentalty_after(object): + def __init__(self, timeout): + self._timeout = timeout + + def __enter__(self): + self.setup_death_penalty() + + def __exit__(self, type, value, traceback): + # Always cancel immediately, since we're done + try: + self.cancel_death_penalty() + except JobTimeoutException: + # Weird case: we're done with the with body, but now the alarm is + # fired. We may safely ignore this situation and consider the + # body done. + pass + + # __exit__ may return True to supress further exception handling. We + # don't want to suppress any exceptions here, since all errors should + # just pass through, JobTimeoutException being handled normally to the + # invoking context. + return False + + def handle_death_penalty(self, signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % self._timeout) + + def setup_death_penalty(self): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the timeout amount (expressed in + seconds). + """ + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 25830aa..142948a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,4 +1,3 @@ -import sys import os import errno import random @@ -17,6 +16,7 @@ from .queue import Queue, FailedQueue from .proxy import conn from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +from .timeouts import death_pentalty_after green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -295,14 +295,14 @@ class Worker(object): return did_perform_work def fork_and_perform_job(self, job): + """Spawns a work horse to perform the actual work and passes it a job. + The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ child_pid = os.fork() if child_pid == 0: - self._is_horse = True - random.seed() - self.log = Logger('horse') - - success = self.perform_job(job) - sys.exit(int(not success)) + self.main_work_horse(job) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -320,12 +320,31 @@ class Worker(object): if e.errno != errno.EINTR: raise + def main_work_horse(self, job): + """This is the entry point of the newly spawned work horse.""" + # After fork()'ing, always assure we are generating random sequences + # that are different from the worker. + random.seed() + self._is_horse = True + self.log = Logger('horse') + + success = self.perform_job(job) + + # os._exit() is the way to exit from childs after a fork(), in + # constrast to the regular sys.exit() + os._exit(int(not success)) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ self.procline('Processing %s from %s since %s' % ( job.func.__name__, job.origin, time.time())) + try: - rv = job.perform() + with death_pentalty_after(job.timeout or 180): + rv = job.perform() except Exception as e: fq = self.failed_queue self.log.exception(red(str(e))) diff --git a/run_tests b/run_tests index 4ac4f7e..670b438 100755 --- a/run_tests +++ b/run_tests @@ -11,6 +11,11 @@ else safe_rg=cat fi +export ONLY_RUN_FAST_TESTS=1 +if [ "$1" == '-f' ]; then # Poor man's argparse + unset ONLY_RUN_FAST_TESTS +fi + if check_redis_running; then /usr/bin/env python -m unittest discover -v -s tests $@ 2>&1 | egrep -v '^test_' | $safe_rg else diff --git a/tests/__init__.py b/tests/__init__.py index 76895a4..1c24bba 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -3,6 +3,7 @@ from redis import Redis from logbook import NullHandler from rq import conn + def find_empty_redis_database(): """Tries to connect to a random Redis database (starting from 4), and will use/connect it when no keys are in there. @@ -15,6 +16,18 @@ def find_empty_redis_database(): assert False, 'No empty Redis database found to run tests in.' +def slow(f): + import os + from functools import wraps + + @wraps(f) + def _inner(*args, **kwargs): + if os.environ.get('ONLY_RUN_FAST_TESTS'): + f(*args, **kwargs) + + return _inner + + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. @@ -52,5 +65,5 @@ class RQTestCase(unittest.TestCase): # Pop the connection to Redis testconn = conn.pop() - assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' - + assert testconn == cls.testconn, 'Wow, something really nasty ' \ + 'happened to the Redis connection stack. Check your setup.' diff --git a/tests/fixtures.py b/tests/fixtures.py index 917073a..36d0c8a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -2,6 +2,8 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ +import time + def say_hello(name=None): """A job with a single argument and a return value.""" @@ -9,20 +11,24 @@ def say_hello(name=None): name = 'Stranger' return 'Hi there, %s!' % (name,) + def do_nothing(): """The best job in the world.""" pass + def div_by_zero(x): """Prepare for a division-by-zero exception.""" return x / 0 + def some_calculation(x, y, z=1): """Some arbitrary calculation with three numbers. Choose z smartly if you want a division by zero exception. """ return x * y / z + def create_file(path): """Creates a file at the given path. Actually, leaves evidence that the job ran.""" @@ -30,3 +36,6 @@ def create_file(path): f.write('Just a sentinel.') +def create_file_after_timeout(path, timeout): + time.sleep(timeout) + create_file(path) diff --git a/tests/test_job.py b/tests/test_job.py index 5669618..ff260d5 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -110,6 +110,7 @@ class TestJob(RQTestCase): ['created_at', 'data', 'description']) def test_store_then_fetch(self): + """Store, then fetch.""" job = Job.create(some_calculation, 3, 4, z=2) job.save() @@ -149,4 +150,3 @@ class TestJob(RQTestCase): self.testconn.hset(job.key, 'data', unimportable_data) with self.assertRaises(UnpickleError): job.refresh() - diff --git a/tests/test_worker.py b/tests/test_worker.py index 19086e2..92f177c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,6 +1,7 @@ import os -from tests import RQTestCase -from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file +from tests import RQTestCase, slow +from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ + create_file_after_timeout from tests.helpers import strip_milliseconds from rq import Queue, Worker, Job @@ -16,10 +17,12 @@ class TestWorker(RQTestCase): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) - self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.') + self.assertEquals(w.work(burst=True), False, + 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') - self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') + self.assertEquals(w.work(burst=True), True, + 'Expected at least some work done.') def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" @@ -78,13 +81,13 @@ class TestWorker(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.origin, q.name) - # should be the original enqueued_at date, not the date of enqueueing to - # the failed queue + # Should be the original enqueued_at date, not the date of enqueueing + # to the failed queue self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info - def test_cancelled_jobs_arent_executed(self): + def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs.""" SENTINEL_FILE = '/tmp/rq-tests.txt' @@ -129,3 +132,31 @@ class TestWorker(RQTestCase): # results are immediately removed assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.exists(job_without_rv.key) == False + + + @slow # noqa + def test_timeouts(self): + """Worker kills jobs after timeout.""" + sentinel_file = '/tmp/.rq_sentinel' + + q = Queue() + w = Worker([q]) + + # Put it on the queue with a timeout value + res = q.enqueue( + create_file_after_timeout, sentinel_file, 4, + timeout=1) + + try: + os.unlink(sentinel_file) + except OSError as e: + if e.errno == 2: + pass + + self.assertEquals(os.path.exists(sentinel_file), False) + w.work(burst=True) + self.assertEquals(os.path.exists(sentinel_file), False) + + # TODO: Having to do the manual refresh() here is really ugly! + res.refresh() + self.assertIn('JobTimeoutException', res.exc_info)