diff --git a/rq/worker.py b/rq/worker.py index bae12f1..c646970 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -703,6 +703,7 @@ class Worker(object): os.environ['RQ_JOB_ID'] = job.id if child_pid == 0: self.main_work_horse(job, queue) + os._exit(0) # just in case else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) @@ -785,7 +786,10 @@ class Worker(object): self.setup_work_horse_signals() self._is_horse = True self.log = logger - self.perform_job(job, queue) + try: + self.perform_job(job, queue) + except: + os._exit(1) # os._exit() is the way to exit from childs after a fork(), in # contrast to the regular sys.exit() @@ -922,12 +926,13 @@ class Worker(object): """Performs the actual work of a job. Will/should only be called inside the work horse's process. """ - self.prepare_job_execution(job, heartbeat_ttl) push_connection(self.connection) started_job_registry = queue.started_job_registry try: + self.prepare_job_execution(job, heartbeat_ttl) + job.started_at = utcnow() timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT with self.death_penalty_class(timeout, JobTimeoutException, job_id=job.id): diff --git a/tests/fixtures.py b/tests/fixtures.py index a1f507a..057b932 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -37,6 +37,11 @@ def do_nothing(): """The best job in the world.""" pass +def raise_exc(): + raise Exception('raise_exc error') + +def raise_exc_mock(): + return raise_exc def div_by_zero(x): """Prepare for a division-by-zero exception.""" diff --git a/tests/test_worker.py b/tests/test_worker.py index 931c1fa..a5d4026 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -25,7 +25,7 @@ from tests import RQTestCase, slow from tests.fixtures import ( access_self, create_file, create_file_after_timeout, div_by_zero, do_nothing, kill_worker, long_running_job, modify_self, modify_self_and_error, - run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, + run_dummy_heroku_worker, save_key_ttl, say_hello, say_pid, raise_exc_mock ) from rq import Queue, SimpleWorker, Worker, get_current_connection @@ -312,6 +312,37 @@ class TestWorker(RQTestCase): self.assertEqual(str(job.enqueued_at), enqueued_at_date) self.assertTrue(job.exc_info) # should contain exc_info + def test_horse_fails(self): + """Tests that job status is set to FAILED even if horse unexpectedly fails""" + q = Queue() + self.assertEqual(q.count, 0) + + # Action + job = q.enqueue(say_hello) + self.assertEqual(q.count, 1) + + # keep for later + enqueued_at_date = str(job.enqueued_at) + + w = Worker([q]) + with mock.patch.object(w, 'perform_job', new_callable=raise_exc_mock): + w.work(burst=True) # should silently pass + + # Postconditions + self.assertEqual(q.count, 0) + failed_job_registry = FailedJobRegistry(queue=q) + self.assertTrue(job in failed_job_registry) + self.assertEqual(w.get_current_job_id(), None) + + # Check the job + job = Job.fetch(job.id) + self.assertEqual(job.origin, q.name) + + # Should be the original enqueued_at date, not the date of enqueueing + # to the failed queue + self.assertEqual(str(job.enqueued_at), enqueued_at_date) + self.assertTrue(job.exc_info) # should contain exc_info + def test_statistics(self): """Successful and failed job counts are saved properly""" queue = Queue() @@ -1206,10 +1237,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertEqual(p.exitcode, 1) self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) - with open(os.path.join(self.sandbox, 'stderr.log')) as f: - stderr = f.read().strip('\n') - err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)' - self.assertTrue(stderr.endswith(err), stderr) @slow def test_1_sec_shutdown(self): @@ -1226,10 +1253,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) - with open(os.path.join(self.sandbox, 'stderr.log')) as f: - stderr = f.read().strip('\n') - err = 'ShutDownImminentException: shut down imminent (signal: SIGALRM)' - self.assertTrue(stderr.endswith(err), stderr) @slow def test_shutdown_double_sigrtmin(self): @@ -1247,10 +1270,6 @@ class HerokuWorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertTrue(os.path.exists(os.path.join(self.sandbox, 'started'))) self.assertFalse(os.path.exists(os.path.join(self.sandbox, 'finished'))) - with open(os.path.join(self.sandbox, 'stderr.log')) as f: - stderr = f.read().strip('\n') - err = 'ShutDownImminentException: shut down imminent (signal: SIGRTMIN)' - self.assertTrue(stderr.endswith(err), stderr) @mock.patch('rq.worker.logger.info') def test_handle_shutdown_request(self, mock_logger_info):