Cross platform simple worker (#1629)

* Added CrossPlatformDeathPenalty that doesn't rely on signals

* Updated `SimpleWorker`'s `death_penalty_class` to utilize `CrossPlatformDeathPenalty` to allow use on Windows machines

* Changed `CrossPlatformDeathPenalty` to `TimerDeathPenalty`

* Removed overridden `death_penalty_class` in `SimpleWorker` until feature matures

* Added section in testing.md explaining how to utilize `SimpleWorker` on Windows OS

* Replaced usage of chatting with .format for python 3.5 compatibility

* Add tests for new timeout feature

* Explicitly set defaults.CALLBACK_TIMEOUT

* Implemented cross-thread method of raising errors by using ctypes

* Finished writing tests for new TimerDeathPenalty
main
Michael Hill 3 years ago committed by GitHub
parent 609c068ef8
commit f4602d30d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,7 +5,7 @@ layout: docs
## Workers inside unit tests ## Workers inside unit tests
You may wish to include your RQ tasks inside unit tests. However many frameworks (such as Django) use in-memory databases which do not play nicely with the default `fork()` behaviour of RQ. You may wish to include your RQ tasks inside unit tests. However, many frameworks (such as Django) use in-memory databases, which do not play nicely with the default `fork()` behaviour of RQ.
Therefore, you must use the SimpleWorker class to avoid fork(); Therefore, you must use the SimpleWorker class to avoid fork();
@ -21,6 +21,25 @@ worker.work(burst=True) # Runs enqueued job
``` ```
## Testing on Windows
If you are testing on a Windows machine you can use the approach above, but with a slight tweak.
You will need to subclass SimpleWorker to override the default timeout mechanism of the worker.
Reason: Windows OS does not implement some underlying signals utilized by the default SimpleWorker.
To subclass SimpleWorker for Windows you can do the following:
```python
from rq import SimpleWorker
from rq.timeouts import TimerDeathPenalty
class WindowsSimpleWorker(SimpleWorker):
death_penalty_class = TimerDeathPenalty
```
Now you can use WindowsSimpleWorker for running tasks on Windows.
## Running Jobs in unit tests ## Running Jobs in unit tests
Another solution for testing purposes is to use the `is_async=False` queue Another solution for testing purposes is to use the `is_async=False` queue

@ -2,7 +2,9 @@
from __future__ import (absolute_import, division, print_function, from __future__ import (absolute_import, division, print_function,
unicode_literals) unicode_literals)
import ctypes
import signal import signal
import threading
class BaseTimeoutException(Exception): class BaseTimeoutException(Exception):
@ -76,3 +78,47 @@ class UnixSignalDeathPenalty(BaseDeathPenalty):
""" """
signal.alarm(0) signal.alarm(0)
signal.signal(signal.SIGALRM, signal.SIG_DFL) signal.signal(signal.SIGALRM, signal.SIG_DFL)
class TimerDeathPenalty(BaseDeathPenalty):
def __init__(self, timeout, exception=JobTimeoutException, **kwargs):
super().__init__(timeout, exception, **kwargs)
self._target_thread_id = threading.current_thread().ident
self._timer = None
# Monkey-patch exception with the message ahead of time
# since PyThreadState_SetAsyncExc can only take a class
def init_with_message(self, *args, **kwargs): # noqa
super(exception, self).__init__(
"Task exceeded maximum timeout value ({0} seconds)".format(timeout)
)
self._exception.__init__ = init_with_message
def new_timer(self):
"""Returns a new timer since timers can only be used once."""
return threading.Timer(self._timeout, self.handle_death_penalty)
def handle_death_penalty(self):
"""Raises an asynchronous exception in another thread.
Reference http://docs.python.org/c-api/init.html#PyThreadState_SetAsyncExc for more info.
"""
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self._target_thread_id), ctypes.py_object(self._exception)
)
if ret == 0:
raise ValueError("Invalid thread ID {}".format(self._target_thread_id))
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._target_thread_id), 0)
raise SystemError("PyThreadState_SetAsyncExc failed")
def setup_death_penalty(self):
"""Starts the timer."""
self._timer = self.new_timer()
self._timer.start()
def cancel_death_penalty(self):
"""Cancels the timer."""
self._timer.cancel()
self._timer = None

@ -40,7 +40,8 @@ from .queue import Queue
from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries from .registry import FailedJobRegistry, StartedJobRegistry, clean_registries
from .scheduler import RQScheduler from .scheduler import RQScheduler
from .suspension import is_suspended from .suspension import is_suspended
from .timeouts import JobTimeoutException, HorseMonitorTimeoutException, UnixSignalDeathPenalty from .timeouts import (JobTimeoutException, HorseMonitorTimeoutException,
UnixSignalDeathPenalty, TimerDeathPenalty)
from .utils import (backend_class, ensure_list, get_version, from .utils import (backend_class, ensure_list, get_version,
make_colorizer, utcformat, utcnow, utcparse) make_colorizer, utcformat, utcnow, utcparse)
from .version import VERSION from .version import VERSION
@ -1191,7 +1192,6 @@ class Worker:
class SimpleWorker(Worker): class SimpleWorker(Worker):
def execute_job(self, job, queue): def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()""" """Execute job in same thread/process, do not fork()"""
self.set_state(WorkerStatus.BUSY) self.set_state(WorkerStatus.BUSY)

@ -0,0 +1,44 @@
import time
from rq import Queue, SimpleWorker
from rq.timeouts import TimerDeathPenalty
from rq.registry import FailedJobRegistry, FinishedJobRegistry
from tests import RQTestCase
class TimerBasedWorker(SimpleWorker):
death_penalty_class = TimerDeathPenalty
def thread_friendly_sleep_func(seconds):
end_at = time.time() + seconds
while True:
if time.time() > end_at:
break
class TestTimeouts(RQTestCase):
def test_timer_death_penalty(self):
"""Ensure TimerDeathPenalty works correctly."""
q = Queue(connection=self.testconn)
q.empty()
finished_job_registry = FinishedJobRegistry(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
# make sure death_penalty_class persists
w = TimerBasedWorker([q], connection=self.testconn)
self.assertIsNotNone(w)
self.assertEqual(w.death_penalty_class, TimerDeathPenalty)
# Test short-running job doesn't raise JobTimeoutException
job = q.enqueue(thread_friendly_sleep_func, args=(1,), job_timeout=3)
w.work(burst=True)
job.refresh()
self.assertIn(job, finished_job_registry)
# Test long-running job raises JobTimeoutException
job = q.enqueue(thread_friendly_sleep_func, args=(5,), job_timeout=3)
w.work(burst=True)
self.assertIn(job, failed_job_registry)
job.refresh()
self.assertIn("rq.timeouts.JobTimeoutException", job.exc_info)
Loading…
Cancel
Save