From c00d3681f95ce04af5342a4b68cb6f9239fe701d Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Wed, 25 May 2016 15:17:37 -0400 Subject: [PATCH 01/13] Failing test to demonstrate issue #702 Test that demonstrates that if a work-horse process is terminated unexpectedly the job being processed could be stuck at the "Started" state (https://github.com/nvie/rq/issues/702) --- tests/test_worker.py | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 7df27f0..8d0368f 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -14,7 +14,8 @@ import subprocess from tests import RQTestCase, slow from tests.fixtures import (create_file, create_file_after_timeout, div_by_zero, do_nothing, say_hello, say_pid, - run_dummy_heroku_worker, access_self) + run_dummy_heroku_worker, access_self, + long_running_job) from tests.helpers import strip_microseconds from rq import (get_failed_queue, Queue, SimpleWorker, Worker, @@ -577,6 +578,9 @@ def kill_worker(pid, double_kill): time.sleep(0.5) os.kill(pid, signal.SIGTERM) +def kill_work_horse(pid): + os.kill(pid, signal.SIGKILL) + class TimeoutTestCase: def setUp(self): @@ -649,6 +653,35 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertIsNotNone(shutdown_requested_date) self.assertEqual(type(shutdown_requested_date).__name__, 'datetime') + @slow + def test_work_horse_death_sets_job_failed(self): + """worker with an ongoing job whose work horse dies unexpectadly should + set the job's status either to FINISHED or FAILED + """ + fooq = Queue('foo') + failed_q = get_failed_queue() + self.assertEqual(failed_q.count, 0) + self.assertEqual(fooq.count, 0) + w = Worker(fooq) + registry = StartedJobRegistry(connection=self.testconn) + sentinel_file = '/tmp/.rq_sentinel_work_horse_death' + if os.path.exists(sentinel_file): + os.remove(sentinel_file) + fooq.enqueue(create_file_after_timeout, sentinel_file, 100) + job, queue = w.dequeue_job_and_maintain_ttl(5) + w.fork_work_horse(job, queue) + p = Process(target=kill_work_horse, args=(w._horse_pid,)) + p.start() + p.join(1) + w.monitor_work_horse(job) + job_status = job.get_status() + if os.path.exists(sentinel_file): + self.assertEqual(job_status, JobStatus.FINISHED) + os.remove(sentinel_file) + else: + self.assertEqual(job_status, JobStatus.FAILED) + self.assertEqual(failed_q.count, 1) + self.assertEqual(fooq.count, 0) def schedule_access_self(): q = Queue('default', connection=get_current_connection()) From 93d286a6c77e47148d315ed884a46aea1ace6654 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Wed, 25 May 2016 15:23:09 -0400 Subject: [PATCH 02/13] Split execute job to expose issue 702 In order to create a test for issue 702 we had to split execute_job to a fork_work_horse function and a monitor_work_horse function. --- rq/worker.py | 50 +++++++++++++++++++++++++++++++------------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index f7c8c63..01145cb 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -503,13 +503,9 @@ class Worker(object): self.log.debug('Sent heartbeat to prevent worker timeout. ' 'Next one should arrive within {0} seconds.'.format(timeout)) - def execute_job(self, job, queue): + def fork_work_horse(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. - The worker will wait for the work horse and make sure it executes - within the given timeout bounds, or will end the work horse with - SIGALRM. """ - self.set_state('busy') child_pid = os.fork() os.environ['RQ_WORKER_ID'] = self.name os.environ['RQ_JOB_ID'] = job.id @@ -518,20 +514,36 @@ class Worker(object): else: self._horse_pid = child_pid self.procline('Forked {0} at {1}'.format(child_pid, time.time())) - while True: - try: - os.waitpid(child_pid, 0) - self.set_state('idle') - break - except OSError as e: - # In case we encountered an OSError due to EINTR (which is - # caused by a SIGINT or SIGTERM signal during - # os.waitpid()), we simply ignore it and enter the next - # iteration of the loop, waiting for the child to end. In - # any other case, this is some other unexpected OS error, - # which we don't want to catch, so we re-raise those ones. - if e.errno != errno.EINTR: - raise + + def monitor_work_horse(self, job): + """The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ + while True: + try: + _, ret_val = os.waitpid(self._horse_pid, 0) + break + except OSError as e: + # In case we encountered an OSError due to EINTR (which is + # caused by a SIGINT or SIGTERM signal during + # os.waitpid()), we simply ignore it and enter the next + # iteration of the loop, waiting for the child to end. In + # any other case, this is some other unexpected OS error, + # which we don't want to catch, so we re-raise those ones. + if e.errno != errno.EINTR: + raise + + def execute_job(self, job, queue): + """Spawns a work horse to perform the actual work and passes it a job. + The worker will wait for the work horse and make sure it executes + within the given timeout bounds, or will end the work horse with + SIGALRM. + """ + self.set_state('busy') + self.fork_work_horse(job, queue) + self.monitor_work_horse(job) + self.set_state('idle') def main_work_horse(self, job, queue): """This is the entry point of the newly spawned work horse.""" From f9d58979227dd6498545ff62c0d3378475a6db4e Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Wed, 25 May 2016 15:24:34 -0400 Subject: [PATCH 03/13] Solves issue 702 In order to solve issue 702 we have to check whether a work-horse terminated unexpectedly (by inspecting the exit code of the work-horse process). If it exited unexpectedly we check if the job has either been marked as finished, failed or other valid states. If it's not in any valid state we mark it as failed and move it to the failed queue. Since the process was terminated unexpectedly (think OOM) we do not have any exception context and we can't run any custom exception handlers. There is still a chance that the job will finish successfully but the work-horse process will be killed before the job is marked as finished and we will erroneously mark it as failed. The users should take care to write idempotent jobs. --- rq/worker.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/rq/worker.py b/rq/worker.py index 01145cb..efeaf92 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -523,6 +523,25 @@ class Worker(object): while True: try: _, ret_val = os.waitpid(self._horse_pid, 0) + if not (ret_val == os.EX_OK): + job_status = job.get_status() + if job_status is None: + # Job completed and its ttl has expired + break + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + with self.connection._pipeline() as pipeline: + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + pass + self.move_to_failed_queue_unhandled( + job, + "Work-horse proccess was terminated unexpectedly" + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -690,6 +709,11 @@ class Worker(object): self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) + def move_to_failed_queue_unhandled(self, job, message): + """Unhandled failure default handler: move the job to the failed queue.""" + self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) + self.failed_queue.quarantine(job, exc_info=message) + def push_exc_handler(self, handler_func): """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func) From c4fd1659ea3a2c57fd467042c1ae865b5a91b655 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Thu, 30 Jun 2016 13:16:41 -0400 Subject: [PATCH 04/13] Fix docstring for monitor_work_horse --- rq/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index efeaf92..3a192ef 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -516,9 +516,9 @@ class Worker(object): self.procline('Forked {0} at {1}'.format(child_pid, time.time())) def monitor_work_horse(self, job): - """The worker will wait for the work horse and make sure it executes - within the given timeout bounds, or will end the work horse with - SIGALRM. + """The worker will monitor the work horse and make sure that it + either executes successfully or the status of the job is set to + failed """ while True: try: From 08de4190e7262956f3eb4cc61f3fc9723d806733 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Thu, 30 Jun 2016 13:17:28 -0400 Subject: [PATCH 05/13] Dry the code. Export handling failed current job to a method --- rq/worker.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 3a192ef..0a1d090 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -530,10 +530,10 @@ class Worker(object): break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: with self.connection._pipeline() as pipeline: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry = StartedJobRegistry(job.origin, self.connection) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) + self.handle_current_job_failure( + job=job, + pipeline=pipeline + ) try: pipeline.execute() except Exception: @@ -611,6 +611,27 @@ class Worker(object): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) + def handle_current_job_failure( + self, + job, + started_job_registry=None, + pipeline=None + ): + """Handles the failure or an executing job by: + 1. Setting the job status to failed + 2. Removing the job from the started_job_registry + 3. Setting the workers current job to None + """ + + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, + self.connection + ) + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + def perform_job(self, job, queue): """Performs the actual work of a job. Will/should only be called inside the work horse's process. @@ -651,9 +672,11 @@ class Worker(object): pipeline.execute() except Exception: - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) + self.handle_current_job_failure( + job=job, + started_job_registry=started_job_registry, + pipeline=pipeline + ) try: pipeline.execute() except Exception: From 3362fe2ba5c3a1f18810472f29fbbb03ea479212 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Thu, 30 Jun 2016 13:19:12 -0400 Subject: [PATCH 06/13] Test killing work_horse after we start monitoring --- tests/test_worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 8d0368f..89e0996 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -578,7 +578,8 @@ def kill_worker(pid, double_kill): time.sleep(0.5) os.kill(pid, signal.SIGTERM) -def kill_work_horse(pid): +def wait_and_kill_work_horse(pid, time_to_wait=0.0): + time.sleep(time_to_wait) os.kill(pid, signal.SIGKILL) @@ -663,18 +664,17 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): self.assertEqual(failed_q.count, 0) self.assertEqual(fooq.count, 0) w = Worker(fooq) - registry = StartedJobRegistry(connection=self.testconn) sentinel_file = '/tmp/.rq_sentinel_work_horse_death' if os.path.exists(sentinel_file): os.remove(sentinel_file) fooq.enqueue(create_file_after_timeout, sentinel_file, 100) job, queue = w.dequeue_job_and_maintain_ttl(5) w.fork_work_horse(job, queue) - p = Process(target=kill_work_horse, args=(w._horse_pid,)) + p = Process(target=wait_and_kill_work_horse, args=(w._horse_pid, 0.5)) p.start() - p.join(1) w.monitor_work_horse(job) job_status = job.get_status() + p.join(1) if os.path.exists(sentinel_file): self.assertEqual(job_status, JobStatus.FINISHED) os.remove(sentinel_file) From 9b774771dbef2c00eecde93bf6cd13820f616349 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sat, 16 Jul 2016 02:26:34 -0400 Subject: [PATCH 07/13] Fix style of conditional --- rq/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rq/worker.py b/rq/worker.py index 0a1d090..952ad0c 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -523,7 +523,7 @@ class Worker(object): while True: try: _, ret_val = os.waitpid(self._horse_pid, 0) - if not (ret_val == os.EX_OK): + if ret_val != os.EX_OK: job_status = job.get_status() if job_status is None: # Job completed and its ttl has expired From 94d5caed1525e7a64a517993667d3d0535c4e6a7 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 09:22:29 -0400 Subject: [PATCH 08/13] Make clearer the puprose of test --- tests/test_worker.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index 89e0996..d44990b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -656,8 +656,8 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): @slow def test_work_horse_death_sets_job_failed(self): - """worker with an ongoing job whose work horse dies unexpectadly should - set the job's status either to FINISHED or FAILED + """worker with an ongoing job whose work horse dies unexpectadly (before + completing the job) should set the job's status to FAILED """ fooq = Queue('foo') failed_q = get_failed_queue() @@ -675,13 +675,9 @@ class WorkerShutdownTestCase(TimeoutTestCase, RQTestCase): w.monitor_work_horse(job) job_status = job.get_status() p.join(1) - if os.path.exists(sentinel_file): - self.assertEqual(job_status, JobStatus.FINISHED) - os.remove(sentinel_file) - else: - self.assertEqual(job_status, JobStatus.FAILED) - self.assertEqual(failed_q.count, 1) - self.assertEqual(fooq.count, 0) + self.assertEqual(job_status, JobStatus.FAILED) + self.assertEqual(failed_q.count, 1) + self.assertEqual(fooq.count, 0) def schedule_access_self(): q = Queue('default', connection=get_current_connection()) From 14d87832120a4f0f648668e86cc190739b271c33 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 09:23:31 -0400 Subject: [PATCH 09/13] Rename function --- rq/worker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index 952ad0c..fd55b44 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -530,7 +530,7 @@ class Worker(object): break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: with self.connection._pipeline() as pipeline: - self.handle_current_job_failure( + self.handle_job_failure( job=job, pipeline=pipeline ) @@ -611,7 +611,7 @@ class Worker(object): msg = 'Processing {0} from {1} since {2}' self.procline(msg.format(job.func_name, job.origin, time.time())) - def handle_current_job_failure( + def handle_job_failure( self, job, started_job_registry=None, @@ -672,7 +672,7 @@ class Worker(object): pipeline.execute() except Exception: - self.handle_current_job_failure( + self.handle_job_failure( job=job, started_job_registry=started_job_registry, pipeline=pipeline From b38aaab8a71310c5800a6fa8e60f7cff2abf8ac7 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 09:23:58 -0400 Subject: [PATCH 10/13] Inline unhandled failure handler --- rq/worker.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/rq/worker.py b/rq/worker.py index fd55b44..dc1a621 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -538,9 +538,19 @@ class Worker(object): pipeline.execute() except Exception: pass - self.move_to_failed_queue_unhandled( + + #Unhandled failure: move the job to the failed queue + self.log.warning( + 'Moving job to {0!r} queue'.format( + self.failed_queue.name + ) + ) + self.failed_queue.quarantine( job, - "Work-horse proccess was terminated unexpectedly" + exc_info=( + "Work-horse proccess " + "was terminated unexpectedly" + ) ) break except OSError as e: @@ -732,11 +742,6 @@ class Worker(object): self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) self.failed_queue.quarantine(job, exc_info=exc_string) - def move_to_failed_queue_unhandled(self, job, message): - """Unhandled failure default handler: move the job to the failed queue.""" - self.log.warning('Moving job to {0!r} queue'.format(self.failed_queue.name)) - self.failed_queue.quarantine(job, exc_info=message) - def push_exc_handler(self, handler_func): """Pushes an exception handler onto the exc handler stack.""" self._exc_handlers.append(handler_func) From 4d9ced42bc9faf78326ad27f803aa02a9676a309 Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 09:43:44 -0400 Subject: [PATCH 11/13] Investigating interminent failings in travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index c804c42..d6c6e93 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,6 @@ install: - pip install coveralls #- pip install pytest # installed by Travis by default already script: - - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 + - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=10 after_success: - coveralls From fbb29ec3331412eeac22cb6032eb95c52b99069a Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 09:58:44 -0400 Subject: [PATCH 12/13] Investigating timeouts --- .travis.yml | 2 +- tests/test_worker.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index d6c6e93..c804c42 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,6 +16,6 @@ install: - pip install coveralls #- pip install pytest # installed by Travis by default already script: - - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=10 + - RUN_SLOW_TESTS_TOO=1 py.test --cov rq --durations=5 after_success: - coveralls diff --git a/tests/test_worker.py b/tests/test_worker.py index d44990b..d0db63b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -587,7 +587,7 @@ class TimeoutTestCase: def setUp(self): # we want tests to fail if signal are ignored and the work remain # running, so set a signal to kill them after X seconds - self.killtimeout = 10 + self.killtimeout = 20 signal.signal(signal.SIGALRM, self._timeout) signal.alarm(self.killtimeout) From 4a8aa0921fc50eb1ed02e29f1cc3a4711ec35c8c Mon Sep 17 00:00:00 2001 From: Yannis Spiliopoulos Date: Sun, 24 Jul 2016 12:46:14 -0400 Subject: [PATCH 13/13] Set test timeout to original value --- tests/test_worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_worker.py b/tests/test_worker.py index d0db63b..d44990b 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -587,7 +587,7 @@ class TimeoutTestCase: def setUp(self): # we want tests to fail if signal are ignored and the work remain # running, so set a signal to kill them after X seconds - self.killtimeout = 20 + self.killtimeout = 10 signal.signal(signal.SIGALRM, self._timeout) signal.alarm(self.killtimeout)