Skip to content

Commit

Permalink
chore(sdk): write local execution logs to stdout #localexecution (kub…
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy authored and stijntratsaertit committed Feb 16, 2024
1 parent 55252e0 commit 816ce99
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 21 deletions.
7 changes: 6 additions & 1 deletion sdk/python/kfp/local/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import contextlib
import datetime
import logging
import sys
from typing import Any, Dict, Generator, List

from kfp import dsl
Expand Down Expand Up @@ -52,7 +53,11 @@ def local_logger_context() -> Generator[None, None, None]:
fmt='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%H:%M:%S.%f',
)
handler = logging.StreamHandler()
# use sys.stdout so that both inner process and outer process logs
# go to stdout
# this is needed for logs to present sequentially in a colab notebook,
# since stderr will print above stdout
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
logger.handlers.clear()
logger.addHandler(handler)
Expand Down
43 changes: 23 additions & 20 deletions sdk/python/kfp/local/task_dispatcher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"""
import io
import os
import re
import sys
import unittest
from unittest import mock
Expand Down Expand Up @@ -199,11 +200,9 @@ def fail_comp():
)

@mock.patch('sys.stdout', new_callable=io.StringIO)
@mock.patch('sys.stderr', new_callable=io.StringIO)
def test_user_code_no_exception_if_not_raise_on_error(
self,
runner,
mock_stderr,
mock_stdout,
):
local.init(runner=runner, raise_on_error=False)
Expand All @@ -216,7 +215,7 @@ def fail_comp():
self.assertDictEqual(task.outputs, {})

self.assertRegex(
mock_stderr.getvalue(),
mock_stdout.getvalue(),
r"\d+:\d+:\d+\.\d+ - ERROR - Task \x1b\[96m'fail-comp'\x1b\[0m finished with status \x1b\[91mFAILURE\x1b\[0m",
)
self.assertIn(
Expand All @@ -225,11 +224,9 @@ def fail_comp():
)

@mock.patch('sys.stdout', new_callable=io.StringIO)
@mock.patch('sys.stderr', new_callable=io.StringIO)
def test_all_logs(
self,
runner,
mock_stderr,
mock_stdout,
):
local.init(runner=runner)
Expand All @@ -245,24 +242,30 @@ def many_type_component(

many_type_component(num=2)

# outer process logs in stderr
outer_log_regex = (
r"\d+:\d+:\d+\.\d+ - INFO - Executing task \x1b\[96m'many-type-component'\x1b\[0m\n"
+ r'\d+:\d+:\d+\.\d+ - INFO - Streamed logs:\n\n' +
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n"
+
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: hellohello\n model: Model\( name=model,\n uri=[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model,\n metadata={'foo': 'bar'} \)\n\n"
)
# inner process logs correctly nested inside outer process logs
outer_log_regex_sections = [
r"\d+:\d+:\d+\.\d+ - INFO - Executing task \x1b\[96m'many-type-component'\x1b\[0m\n",
r'\d+:\d+:\d+\.\d+ - INFO - Streamed logs:\n\n',
r'.*',
r'Looking for component ',
r'.*',
r'Loading KFP component ',
r'.*',
r'Got executor_input:',
r'.*',
r'Inside of my component!',
r'.*',
r'Wrote executor output file to',
r'.*',
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m finished with status \x1b\[92mSUCCESS\x1b\[0m\n",
r"\d+:\d+:\d+\.\d+ - INFO - Task \x1b\[96m'many-type-component'\x1b\[0m outputs:\n Output: hellohello\n model: Model\( name=model,\n uri=[a-zA-Z0-9/_\.-]+/local_outputs/many-type-component-\d+-\d+-\d+-\d+-\d+-\d+-\d+/many-type-component/model,\n metadata={'foo': 'bar'} \)\n\n",
]

self.assertRegex(
mock_stderr.getvalue(),
outer_log_regex,
mock_stdout.getvalue(),
# use dotall os that .* include newline characters
re.compile(''.join(outer_log_regex_sections), re.DOTALL),
)
# inner process logs in stdout
self.assertIn('[KFP Executor', mock_stdout.getvalue())
self.assertIn('Got executor_input:', mock_stdout.getvalue())
self.assertIn('Inside of my component!', mock_stdout.getvalue())
self.assertIn('Wrote executor output file to', mock_stdout.getvalue())


@parameterized.parameters(ALL_RUNNERS)
Expand Down

0 comments on commit 816ce99

Please sign in to comment.