handled unhandled exceptions in horse (#1303)

* handled unhandled exceptions in horse to prevent a job from being silently dropped without going into FailedRegistry

* changes after review

* made sure that work_horse always terminates in a proper way with tests

* minor refactoring

* fix for failing test

* fixes for the other tests

- removed exception handling (done in monitor_work_horse)
- adjusted some tests for the checks that are not relevant anymore

* review suggested changes

* cleanup

Co-authored-by: Ruslan Mullakhmetov <ruslan@twentythree.net>
main
Ruslan Mullakhmetov 4 years ago committed by GitHub
parent 57f286eac4
commit c2931b45b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -703,6 +703,7 @@ class Worker(object):
os.environ['RQ_JOB_ID'] = job.id
if child_pid == 0:
self.main_work_horse(job, queue)
os._exit(0) # just in case
else:
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))
@ -785,7 +786,10 @@ class Worker(object):
self.setup_work_horse_signals()
self._is_horse = True
self.log = logger
try:
self.perform_job(job, queue)
except:
os._exit(1)
# os._exit() is the way to exit from childs after a fork(), in
# contrast to the regular sys.exit()
@ -922,12 +926,13 @@ class Worker(object):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job, heartbeat_ttl)
push_connection(self.connection)
started_job_registry = queue.started_job_registry
try:
self.prepare_job_execution(job, heartbeat_ttl)
job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id):

@ -37,6 +37,11 @@ def do_nothing():
"""The best job in the world."""
pass
def raise_exc():
raise Exception('raise_exc error')
def raise_exc_mock():
return raise_exc
def div_by_zero(x):
"""Prepare for a division-by-zero exception."""

@ -25,7 +25,7 @@ from tests import RQTestCase, slow
from tests.fixtures import (
access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing,
kill_worker, long_running_job, modify_self, modify_self_and_error,
run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid,
run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock
)
from rq import Queue, SimpleWorker, Worker, get_current_connection
@ -312,6 +312,37 @@ class TestWorker(RQTestCase):
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertTrue(job.exc_info) # should contain exc_info
def test_horse_fails(self):
"""Tests that job status is set to FAILED even if horse unexpectedly fails"""
q = Queue()
self.assertEqual(q.count, 0)
# Action
job = q.enqueue(say_hello)
self.assertEqual(q.count, 1)
# keep for later
enqueued_at_date = str(job.enqueued_at)
w = Worker([q])
with mock.patch.object(w, 'perform_job', new_callable=raise_exc_mock):
w.work(burst=True) # should silently pass
# Postconditions
self.assertEqual(q.count, 0)
failed_job_registry = FailedJobRegistry(queue=q)
self.assertTrue(job in failed_job_registry)
self.assertEqual(w.get_current_job_id(), None)
# Check the job
job = Job.fetch(job.id)
self.assertEqual(job.origin, q.name)
# Should be the original enqueued_at date, not the date of enqueueing
# to the failed queue
self.assertEqual(str(job.enqueued_at), enqueued_at_date)
self.assertTrue(job.exc_info) # should contain exc_info
def test_statistics(self):
"""Successful and failed job counts are saved properly"""
queue = Queue()
@ -1206,10 +1237,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertEqual(p.exitcode, 1)
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
with open(os.path.join(self.sandbox, 'stderr.log')) as f:
stderr = f.read().strip('\n')
err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)'
self.assertTrue(stderr.endswith(err), stderr)
@slow
def test_1_sec_shutdown(self):
@ -1226,10 +1253,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
with open(os.path.join(self.sandbox, 'stderr.log')) as f:
stderr = f.read().strip('\n')
err = 'ShutDownImminentException: shut down imminent (signal: SIGALRM)'
self.assertTrue(stderr.endswith(err), stderr)
@slow
def test_shutdown_double_sigrtmin(self):
@ -1247,10 +1270,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase):
self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started')))
self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished')))
with open(os.path.join(self.sandbox, 'stderr.log')) as f:
stderr = f.read().strip('\n')
err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)'
self.assertTrue(stderr.endswith(err), stderr)
@mock.patch('rq.worker.logger.info')
def test_handle_shutdown_request(self, mock_logger_info):

Loading…
Cancel
Save