From 8a856e79eae98e887594f6804e6052697e2be425 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 16:54:02 +0100 Subject: [PATCH 1/8] Initial attempt at job timeouts. --- rq/job.py | 10 +++++++-- rq/queue.py | 21 +++++++++++++++--- rq/worker.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 81 insertions(+), 12 deletions(-) diff --git a/rq/job.py b/rq/job.py index c91c781..0939eed 100644 --- a/rq/job.py +++ b/rq/job.py @@ -143,10 +143,10 @@ class Job(object): """ key = self.key properties = ['data', 'created_at', 'origin', 'description', - 'enqueued_at', 'ended_at', 'result', 'exc_info'] + 'enqueued_at', 'ended_at', 'result', 'exc_info', 'timeout'] data, created_at, origin, description, \ enqueued_at, ended_at, result, \ - exc_info = conn.hmget(key, properties) + exc_info, timeout = conn.hmget(key, properties) if data is None: raise NoSuchJobError('No such job: %s' % (key,)) @@ -164,6 +164,10 @@ class Job(object): self.ended_at = to_date(ended_at) self._result = result self.exc_info = exc_info + if timeout is None: + self.timeout = None + else: + self.timeout = int(timeout) def save(self): """Persists the current job instance to its corresponding Redis key.""" @@ -186,6 +190,8 @@ class Job(object): obj['result'] = self._result if self.exc_info is not None: obj['exc_info'] = self.exc_info + if self.timeout is not None: + obj['timeout'] = self.timeout conn.hmset(key, obj) diff --git a/rq/queue.py b/rq/queue.py index 7ec1efe..f783be7 100644 --- a/rq/queue.py +++ b/rq/queue.py @@ -32,10 +32,11 @@ class Queue(object): name = queue_key[len(prefix):] return Queue(name) - def __init__(self, name='default'): + def __init__(self, name='default', default_timeout=None): prefix = self.redis_queue_namespace_prefix self.name = name self._key = '%s%s' % (prefix, name) + self._default_timeout = default_timeout @property def key(self): @@ -99,24 +100,38 @@ class Queue(object): Expects the function to call, along with the arguments and keyword arguments. + + The special keyword `timeout` is reserved for `enqueue()` itself and + it won't be passed to the actual job function. """ if f.__module__ == '__main__': raise ValueError( 'Functions from the __main__ module cannot be processed ' 'by workers.') + timeout = kwargs.pop('timeout', None) job = Job.create(f, *args, **kwargs) - return self.enqueue_job(job) + return self.enqueue_job(job, timeout=timeout) - def enqueue_job(self, job, set_meta_data=True): + def enqueue_job(self, job, timeout=None, set_meta_data=True): """Enqueues a job for delayed execution. + When the `timeout` argument is sent, it will overrides the default + timeout value of 180 seconds. `timeout` may either be a string or + integer. + If the `set_meta_data` argument is `True` (default), it will update the properties `origin` and `enqueued_at`. """ if set_meta_data: job.origin = self.name job.enqueued_at = times.now() + + if timeout: + job.timeout = timeout # _timeout_in_seconds(timeout) + else: + job.timeout = 180 # default + job.save() self.push_job_id(job.id) return job diff --git a/rq/worker.py b/rq/worker.py index 25830aa..71b887d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -1,4 +1,3 @@ -import sys import os import errno import random @@ -295,14 +294,14 @@ class Worker(object): return did_perform_work def fork_and_perform_job(self, job): + """Spawns a work horse to perform the actual work and passes it a job. + The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ child_pid = os.fork() if child_pid == 0: - self._is_horse = True - random.seed() - self.log = Logger('horse') - - success = self.perform_job(job) - sys.exit(int(not success)) + self.main_work_horse(job) else: self._horse_pid = child_pid self.procline('Forked %d at %d' % (child_pid, time.time())) @@ -320,13 +319,62 @@ class Worker(object): if e.errno != errno.EINTR: raise + def main_work_horse(self, job): + """This is the entry point of the newly spawned work horse.""" + # After fork()'ing, always assure we are generating random sequences + # that are different from the worker. + random.seed() + self._is_horse = True + self.log = Logger('horse') + + success = self.perform_job(job) + + # os._exit() is the way to exit from childs after a fork(), in + # constrast to the regular sys.exit() + os._exit(int(not success)) + + def raise_death_penalty_after(self, timeout): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the given `timeout` amount (expressed + in seconds). + """ + + class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed + maximum time. + """ + pass + + # Setup a timeout handler + def timeout_handler(signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % timeout) + + signal.signal(signal.SIGALRM, timeout_handler) + signal.alarm(timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) + def perform_job(self, job): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ self.procline('Processing %s from %s since %s' % ( job.func.__name__, job.origin, time.time())) + + # Set up death penalty + self.raise_death_penalty_after(job.timeout or 180) try: rv = job.perform() + self.cancel_death_penalty() except Exception as e: + self.cancel_death_penalty() fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) From b8305a818f8558266c8973b6680d2cb87b125f1d Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 16:54:27 +0100 Subject: [PATCH 2/8] Safer, and shorter, version of the death penalty. This case protects against JobTimeoutExceptions being raised immediately after the job body has been (successfully) executed. Still, JobTimeoutExceptions pass through naturally, like any other exception, to be handled by the default exception handler that writes failed jobs to the failed queue. Timeouts therefore are reported like any other exception. --- rq/timeouts.py | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++ rq/worker.py | 35 +++------------------------------- 2 files changed, 54 insertions(+), 32 deletions(-) create mode 100644 rq/timeouts.py diff --git a/rq/timeouts.py b/rq/timeouts.py new file mode 100644 index 0000000..83be1e6 --- /dev/null +++ b/rq/timeouts.py @@ -0,0 +1,51 @@ +import signal + + +class JobTimeoutException(Exception): + """Raised when a job takes longer to complete than the allowed maximum + timeout value. + """ + pass + + +class death_pentalty_after(object): + def __init__(self, timeout): + self._timeout = timeout + + def __enter__(self): + self.setup_death_penalty() + + def __exit__(self, type, value, traceback): + # Always cancel immediately, since we're done + try: + self.cancel_death_penalty() + except JobTimeoutException: + # Weird case: we're done with the with body, but now the alarm is + # fired. We may safely ignore this situation and consider the + # body done. + pass + + # __exit__ may return True to supress further exception handling. We + # don't want to suppress any exceptions here, since all errors should + # just pass through, JobTimeoutException being handled as just one of + # them. + return False + + def handle_death_penalty(self, signum, frame): + raise JobTimeoutException('Job exceeded maximum timeout ' + 'value (%d seconds).' % self._timeout) + + def setup_death_penalty(self): + """Sets up an alarm signal and a signal handler that raises + a JobTimeoutException after the timeout amount (expressed in + seconds). + """ + signal.signal(signal.SIGALRM, self.handle_death_penalty) + signal.alarm(self._timeout) + + def cancel_death_penalty(self): + """Removes the death penalty alarm and puts back the system into + default signal handling. + """ + signal.alarm(0) + signal.signal(signal.SIGALRM, signal.SIG_DFL) diff --git a/rq/worker.py b/rq/worker.py index 71b887d..142948a 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -16,6 +16,7 @@ from .queue import Queue, FailedQueue from .proxy import conn from .utils import make_colorizer from .exceptions import NoQueueError, UnpickleError +from .timeouts import death_pentalty_after green = make_colorizer('darkgreen') yellow = make_colorizer('darkyellow') @@ -333,33 +334,6 @@ class Worker(object): # constrast to the regular sys.exit() os._exit(int(not success)) - def raise_death_penalty_after(self, timeout): - """Sets up an alarm signal and a signal handler that raises - a JobTimeoutException after the given `timeout` amount (expressed - in seconds). - """ - - class JobTimeoutException(Exception): - """Raised when a job takes longer to complete than the allowed - maximum time. - """ - pass - - # Setup a timeout handler - def timeout_handler(signum, frame): - raise JobTimeoutException('Job exceeded maximum timeout ' - 'value (%d seconds).' % timeout) - - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(timeout) - - def cancel_death_penalty(self): - """Removes the death penalty alarm and puts back the system into - default signal handling. - """ - signal.alarm(0) - signal.signal(signal.SIGALRM, signal.SIG_DFL) - def perform_job(self, job): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -368,13 +342,10 @@ class Worker(object): job.func.__name__, job.origin, time.time())) - # Set up death penalty - self.raise_death_penalty_after(job.timeout or 180) try: - rv = job.perform() - self.cancel_death_penalty() + with death_pentalty_after(job.timeout or 180): + rv = job.perform() except Exception as e: - self.cancel_death_penalty() fq = self.failed_queue self.log.exception(red(str(e))) self.log.warning('Moving job to %s queue.' % fq.name) From 000849c43039a4b8ae86855cdde9d267b836979e Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 17:36:42 +0100 Subject: [PATCH 3/8] Initialize jobs with timeouts. --- rq/job.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rq/job.py b/rq/job.py index 0939eed..50ab65e 100644 --- a/rq/job.py +++ b/rq/job.py @@ -75,6 +75,7 @@ class Job(object): self.ended_at = None self._result = None self.exc_info = None + self.timeout = None # Data access From 9ac9c234126cddece2fb92a4b419d6a9c16624c7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 17:39:14 +0100 Subject: [PATCH 4/8] Flake8. --- tests/fixtures.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 917073a..15dbd12 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -3,30 +3,33 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ + def say_hello(name=None): """A job with a single argument and a return value.""" if name is None: name = 'Stranger' return 'Hi there, %s!' % (name,) + def do_nothing(): """The best job in the world.""" pass + def div_by_zero(x): """Prepare for a division-by-zero exception.""" return x / 0 + def some_calculation(x, y, z=1): """Some arbitrary calculation with three numbers. Choose z smartly if you want a division by zero exception. """ return x * y / z + def create_file(path): """Creates a file at the given path. Actually, leaves evidence that the job ran.""" with open(path, 'w') as f: f.write('Just a sentinel.') - - From e807748ee6baaa680c5e1ff9140c620dedcc3322 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 18:01:14 +0100 Subject: [PATCH 5/8] Test the timing out of jobs. Really looking for a way to speed up this test. It takes up a whole second doing nothing now, really. --- tests/fixtures.py | 6 ++++++ tests/test_job.py | 2 +- tests/test_worker.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/tests/fixtures.py b/tests/fixtures.py index 15dbd12..36d0c8a 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -2,6 +2,7 @@ This file contains all jobs that are used in tests. Each of these test fixtures has a slighty different characteristics. """ +import time def say_hello(name=None): @@ -33,3 +34,8 @@ def create_file(path): job ran.""" with open(path, 'w') as f: f.write('Just a sentinel.') + + +def create_file_after_timeout(path, timeout): + time.sleep(timeout) + create_file(path) diff --git a/tests/test_job.py b/tests/test_job.py index 5669618..ff260d5 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -110,6 +110,7 @@ class TestJob(RQTestCase): ['created_at', 'data', 'description']) def test_store_then_fetch(self): + """Store, then fetch.""" job = Job.create(some_calculation, 3, 4, z=2) job.save() @@ -149,4 +150,3 @@ class TestJob(RQTestCase): self.testconn.hset(job.key, 'data', unimportable_data) with self.assertRaises(UnpickleError): job.refresh() - diff --git a/tests/test_worker.py b/tests/test_worker.py index 19086e2..3f3fe2a 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,6 +1,7 @@ import os from tests import RQTestCase -from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file +from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ + create_file_after_timeout from tests.helpers import strip_milliseconds from rq import Queue, Worker, Job @@ -129,3 +130,30 @@ class TestWorker(RQTestCase): # results are immediately removed assert self.testconn.ttl(job_with_rv.key) > 0 assert self.testconn.exists(job_without_rv.key) == False + + + def test_timeouts(self): + """Worker kills jobs after timeout.""" + sentinel_file = '/tmp/.rq_sentinel' + + q = Queue() + w = Worker([q]) + + # Put it on the queue with a timeout value + res = q.enqueue( + create_file_after_timeout, sentinel_file, 4, + timeout=1) + + try: + os.unlink(sentinel_file) + except OSError as e: + if e.errno == 2: + pass + + self.assertEquals(os.path.exists(sentinel_file), False) + w.work(burst=True) + self.assertEquals(os.path.exists(sentinel_file), False) + + # TODO: Having to do the manual refresh() here is really ugly! + res.refresh() + self.assertIn('JobTimeoutException', res.exc_info) From 91fff48389064cce580dd5b179e684bf5a463a27 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Wed, 22 Feb 2012 18:02:56 +0100 Subject: [PATCH 6/8] Flake8. --- tests/test_worker.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 3f3fe2a..b1aa68c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -17,10 +17,12 @@ class TestWorker(RQTestCase): """Worker processes work, then quits.""" fooq, barq = Queue('foo'), Queue('bar') w = Worker([fooq, barq]) - self.assertEquals(w.work(burst=True), False, 'Did not expect any work on the queue.') + self.assertEquals(w.work(burst=True), False, + 'Did not expect any work on the queue.') fooq.enqueue(say_hello, name='Frank') - self.assertEquals(w.work(burst=True), True, 'Expected at least some work done.') + self.assertEquals(w.work(burst=True), True, + 'Expected at least some work done.') def test_work_is_unreadable(self): """Unreadable jobs are put on the failed queue.""" @@ -79,13 +81,13 @@ class TestWorker(RQTestCase): job = Job.fetch(job.id) self.assertEquals(job.origin, q.name) - # should be the original enqueued_at date, not the date of enqueueing to - # the failed queue + # Should be the original enqueued_at date, not the date of enqueueing + # to the failed queue self.assertEquals(job.enqueued_at, enqueued_at_date) self.assertIsNotNone(job.exc_info) # should contain exc_info - def test_cancelled_jobs_arent_executed(self): + def test_cancelled_jobs_arent_executed(self): # noqa """Cancelling jobs.""" SENTINEL_FILE = '/tmp/rq-tests.txt' @@ -132,7 +134,7 @@ class TestWorker(RQTestCase): assert self.testconn.exists(job_without_rv.key) == False - def test_timeouts(self): + def test_timeouts(self): # noqa """Worker kills jobs after timeout.""" sentinel_file = '/tmp/.rq_sentinel' From 844c5ed8c7656cc7bdeaff3b7dfd3aa950bc04c7 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 24 Feb 2012 07:39:44 +0100 Subject: [PATCH 7/8] Add @slow wrapper to avoid running slow tests. Use ./run_tests -f to only run the fast tests. --- rq/timeouts.py | 4 ++-- run_tests | 5 +++++ tests/__init__.py | 12 ++++++++++++ tests/test_worker.py | 5 +++-- 4 files changed, 22 insertions(+), 4 deletions(-) diff --git a/rq/timeouts.py b/rq/timeouts.py index 83be1e6..72b827e 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -27,8 +27,8 @@ class death_pentalty_after(object): # __exit__ may return True to supress further exception handling. We # don't want to suppress any exceptions here, since all errors should - # just pass through, JobTimeoutException being handled as just one of - # them. + # just pass through, JobTimeoutException being handled normally to the + # invoking context. return False def handle_death_penalty(self, signum, frame): diff --git a/run_tests b/run_tests index 4ac4f7e..670b438 100755 --- a/run_tests +++ b/run_tests @@ -11,6 +11,11 @@ else safe_rg=cat fi +export ONLY_RUN_FAST_TESTS=1 +if [ "$1" == '-f' ]; then # Poor man's argparse + unset ONLY_RUN_FAST_TESTS +fi + if check_redis_running; then /usr/bin/env python -m unittest discover -v -s tests $@ 2>&1 | egrep -v '^test_' | $safe_rg else diff --git a/tests/__init__.py b/tests/__init__.py index 76895a4..2d54f29 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -15,6 +15,18 @@ def find_empty_redis_database(): assert False, 'No empty Redis database found to run tests in.' +def slow(f): + import os + from functools import wraps + + @wraps(f) + def _inner(*args, **kwargs): + if os.environ.get('ONLY_RUN_FAST_TESTS'): + f(*args, **kwargs) + + return _inner + + class RQTestCase(unittest.TestCase): """Base class to inherit test cases from for RQ. diff --git a/tests/test_worker.py b/tests/test_worker.py index b1aa68c..92f177c 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -1,5 +1,5 @@ import os -from tests import RQTestCase +from tests import RQTestCase, slow from tests.fixtures import say_hello, div_by_zero, do_nothing, create_file, \ create_file_after_timeout from tests.helpers import strip_milliseconds @@ -134,7 +134,8 @@ class TestWorker(RQTestCase): assert self.testconn.exists(job_without_rv.key) == False - def test_timeouts(self): # noqa + @slow # noqa + def test_timeouts(self): """Worker kills jobs after timeout.""" sentinel_file = '/tmp/.rq_sentinel' From 3c05f20d95ae8aa11146b5693c3b6446e22e4d27 Mon Sep 17 00:00:00 2001 From: Vincent Driessen Date: Fri, 24 Feb 2012 11:36:33 +0100 Subject: [PATCH 8/8] Flake8. --- tests/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/__init__.py b/tests/__init__.py index 2d54f29..1c24bba 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -3,6 +3,7 @@ from redis import Redis from logbook import NullHandler from rq import conn + def find_empty_redis_database(): """Tries to connect to a random Redis database (starting from 4), and will use/connect it when no keys are in there. @@ -64,5 +65,5 @@ class RQTestCase(unittest.TestCase): # Pop the connection to Redis testconn = conn.pop() - assert testconn == cls.testconn, 'Wow, something really nasty happened to the Redis connection stack. Check your setup.' - + assert testconn == cls.testconn, 'Wow, something really nasty ' \ + 'happened to the Redis connection stack. Check your setup.'