diff --git a/docs/docs/testing.md b/docs/docs/testing.md index 879e319..d340868 100644 --- a/docs/docs/testing.md +++ b/docs/docs/testing.md @@ -5,7 +5,7 @@ layout: docs ## Workers inside unit tests -You may wish to include your RQ tasks inside unit tests. However many frameworks (such as Django) use in-memory databases which do not play nicely with the default `fork()` behaviour of RQ. +You may wish to include your RQ tasks inside unit tests. However, many frameworks (such as Django) use in-memory databases, which do not play nicely with the default `fork()` behaviour of RQ. Therefore, you must use the SimpleWorker class to avoid fork(); @@ -21,6 +21,25 @@ worker.work(burst=True) # Runs enqueued job ``` +## Testing on Windows + +If you are testing on a Windows machine you can use the approach above, but with a slight tweak. +You will need to subclass SimpleWorker to override the default timeout mechanism of the worker. +Reason: Windows OS does not implement some underlying signals utilized by the default SimpleWorker. + +To subclass SimpleWorker for Windows you can do the following: + +```python +from rq import SimpleWorker +from rq.timeouts import TimerDeathPenalty + +class WindowsSimpleWorker(SimpleWorker): + death_penalty_class = TimerDeathPenalty +``` + +Now you can use WindowsSimpleWorker for running tasks on Windows. + + ## Running Jobs in unit tests Another solution for testing purposes is to use the `is_async=False` queue diff --git a/rq/timeouts.py b/rq/timeouts.py index f995443..eee061d 100644 --- a/rq/timeouts.py +++ b/rq/timeouts.py @@ -2,7 +2,9 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import ctypes import signal +import threading class BaseTimeoutException(Exception): @@ -76,3 +78,47 @@ class UnixSignalDeathPenalty(BaseDeathPenalty): """ signal.alarm(0) signal.signal(signal.SIGALRM, signal.SIG_DFL) + + +class TimerDeathPenalty(BaseDeathPenalty): + def __init__(self, timeout, exception=JobTimeoutException, **kwargs): + super().__init__(timeout, exception, **kwargs) + self._target_thread_id = threading.current_thread().ident + self._timer = None + + # Monkey-patch exception with the message ahead of time + # since PyThreadState_SetAsyncExc can only take a class + def init_with_message(self, *args, **kwargs): # noqa + super(exception, self).__init__( + "Task exceeded maximum timeout value ({0} seconds)".format(timeout) + ) + + self._exception.__init__ = init_with_message + + def new_timer(self): + """Returns a new timer since timers can only be used once.""" + return threading.Timer(self._timeout, self.handle_death_penalty) + + def handle_death_penalty(self): + """Raises an asynchronous exception in another thread. + + Reference http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc for more info. + """ + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self._target_thread_id), ctypes.py_object(self._exception) + ) + if ret == 0: + raise ValueError("Invalid thread ID {}".format(self._target_thread_id)) + elif ret > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._target_thread_id), 0) + raise SystemError("PyThreadState_SetAsyncExc failed") + + def setup_death_penalty(self): + """Starts the timer.""" + self._timer = self.new_timer() + self._timer.start() + + def cancel_death_penalty(self): + """Cancels the timer.""" + self._timer.cancel() + self._timer = None diff --git a/rq/worker.py b/rq/worker.py index 7bed707..793e01f 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -40,7 +40,8 @@ from .queue import Queue from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .scheduler import RQScheduler from .suspension import is_suspended -from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty +from .timeouts import (JobTimeoutException, HorseMonitorTimeoutException, + UnixSignalDeathPenalty, TimerDeathPenalty) from .utils import (backend_class, ensure_list, get_version, make_colorizer, utcformat, utcnow, utcparse) from .version import VERSION @@ -1191,7 +1192,6 @@ class Worker: class SimpleWorker(Worker): - def execute_job(self, job, queue): """Execute job in same thread/process, do not fork()""" self.set_state(WorkerStatus.BUSY) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py new file mode 100644 index 0000000..2872ee0 --- /dev/null +++ b/tests/test_timeouts.py @@ -0,0 +1,44 @@ +import time + +from rq import Queue, SimpleWorker +from rq.timeouts import TimerDeathPenalty +from rq.registry import FailedJobRegistry, FinishedJobRegistry +from tests import RQTestCase + + +class TimerBasedWorker(SimpleWorker): + death_penalty_class = TimerDeathPenalty + + +def thread_friendly_sleep_func(seconds): + end_at = time.time() + seconds + while True: + if time.time() > end_at: + break + + +class TestTimeouts(RQTestCase): + def test_timer_death_penalty(self): + """Ensure TimerDeathPenalty works correctly.""" + q = Queue(connection=self.testconn) + q.empty() + finished_job_registry = FinishedJobRegistry(connection=self.testconn) + failed_job_registry = FailedJobRegistry(connection=self.testconn) + + # make sure death_penalty_class persists + w = TimerBasedWorker([q], connection=self.testconn) + self.assertIsNotNone(w) + self.assertEqual(w.death_penalty_class, TimerDeathPenalty) + + # Test short-running job doesn't raise JobTimeoutException + job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=3) + w.work(burst=True) + job.refresh() + self.assertIn(job, finished_job_registry) + + # Test long-running job raises JobTimeoutException + job = q.enqueue(thread_friendly_sleep_func, args=(5,), job_timeout=3) + w.work(burst=True) + self.assertIn(job, failed_job_registry) + job.refresh() + self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)