diff --git a/sdk/python/kfp/local/logging_utils.py b/sdk/python/kfp/local/logging_utils.py index dd4d2d90843..a2b7a0eaa25 100644 --- a/sdk/python/kfp/local/logging_utils.py +++ b/sdk/python/kfp/local/logging_utils.py @@ -16,6 +16,7 @@ import contextlib import datetime import logging +import sys from typing import Any, Dict, Generator, List from kfp import dsl @@ -52,7 +53,11 @@ def local_logger_context() -> Generator[None, None, None]: fmt='%(asctime)s - %(levelname)s - %(message)s', datefmt='%H:%M:%S.%f', ) - handler = logging.StreamHandler() + # use sys.stdout so that both inner process and outer process logs + # go to stdout + # this is needed for logs to present sequentially in a colab notebook, + # since stderr will print above stdout + handler = logging.StreamHandler(sys.stdout) handler.setFormatter(formatter) logger.handlers.clear() logger.addHandler(handler) diff --git a/sdk/python/kfp/local/task_dispatcher_test.py b/sdk/python/kfp/local/task_dispatcher_test.py index c876bac27e5..36b1b4519e1 100755 --- a/sdk/python/kfp/local/task_dispatcher_test.py +++ b/sdk/python/kfp/local/task_dispatcher_test.py @@ -21,6 +21,7 @@ """ import io import os +import re import sys import unittest from unittest import mock @@ -199,11 +200,9 @@ def fail_comp(): ) @mock.patch('sys.stdout', new_callable=io.StringIO) - @mock.patch('sys.stderr', new_callable=io.StringIO) def test_user_code_no_exception_if_not_raise_on_error( self, runner, - mock_stderr, mock_stdout, ): local.init(runner=runner, raise_on_error=False) @@ -216,7 +215,7 @@ def fail_comp(): self.assertDictEqual(task.outputs, {}) self.assertRegex( - mock_stderr.getvalue(), + mock_stdout.getvalue(), r"\d+:\d+:\d+\.\d+ - ERROR - Task \x1b\[96m'fail-comp'\x1b\[0m finished with status \x1b\[91mFAILURE\x1b\[0m", ) self.assertIn( @@ -225,11 +224,9 @@ def fail_comp(): ) @mock.patch('sys.stdout', new_callable=io.StringIO) - @mock.patch('sys.stderr', new_callable=io.StringIO) def test_all_logs( self, runner, - mock_stderr, mock_stdout, ): local.init(runner=runner) @@ -245,24 +242,30 @@ def many_type_component( many_type_component(num=2) - # outer process logs in stderr - outer_log_regex = ( - r"\d+:\d+:\d+\.\d+ - INFO - Executing task \x1b\[96m'many-type-component'\x1b\[0m\n" - + r'\d+:\d+:\d+\.\d+ - INFO - Streamed logs:\n\n' + - r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n" - + - r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: hellohello\n model: Model\( name=model,\n uri=[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model,\n metadata={'foo': 'bar'} \)\n\n" - ) + # inner process logs correctly nested inside outer process logs + outer_log_regex_sections = [ + r"\d+:\d+:\d+\.\d+ - INFO - Executing task \x1b\[96m'many-type-component'\x1b\[0m\n", + r'\d+:\d+:\d+\.\d+ - INFO - Streamed logs:\n\n', + r'.*', + r'Looking for component ', + r'.*', + r'Loading KFP component ', + r'.*', + r'Got executor_input:', + r'.*', + r'Inside of my component!', + r'.*', + r'Wrote executor output file to', + r'.*', + r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n", + r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: hellohello\n model: Model\( name=model,\n uri=[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model,\n metadata={'foo': 'bar'} \)\n\n", + ] self.assertRegex( - mock_stderr.getvalue(), - outer_log_regex, + mock_stdout.getvalue(), + # use dotall os that .* include newline characters + re.compile(''.join(outer_log_regex_sections), re.DOTALL), ) - # inner process logs in stdout - self.assertIn('[KFP Executor', mock_stdout.getvalue()) - self.assertIn('Got executor_input:', mock_stdout.getvalue()) - self.assertIn('Inside of my component!', mock_stdout.getvalue()) - self.assertIn('Wrote executor output file to', mock_stdout.getvalue()) @parameterized.parameters(ALL_RUNNERS)