From a02136df825ee602121dc8e6e0cfb21b013f9506 Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Fri, 20 Oct 2023 17:13:35 +0300 Subject: [PATCH 1/2] handle invalid unicode in logs --- platform_monitoring/api.py | 15 ++-- platform_monitoring/logs.py | 134 ++++++++++++++++++++++++--------- tests/integration/test_kube.py | 8 +- tests/unit/test_logs.py | 76 ++++++++++++++++++- 4 files changed, 186 insertions(+), 47 deletions(-) diff --git a/platform_monitoring/api.py b/platform_monitoring/api.py index 4ed478f0..99bae7c7 100644 --- a/platform_monitoring/api.py +++ b/platform_monitoring/api.py @@ -484,7 +484,7 @@ async def port_forward(self, request: Request) -> StreamResponse: async def _listen(ws: WebSocketResponse) -> None: - # Maintain the WebSocket connetion. + # Maintain the WebSocket connection. # Process ping-pong game and perform closing handshake. async for msg in ws: logger.info(f"Received unexpected WebSocket message: {msg!r}") @@ -493,12 +493,13 @@ async def _listen(ws: WebSocketResponse) -> None: async def _forward_bytes_iterating( ws: WebSocketResponse, it: AsyncIterator[bytes] ) -> None: - async for chunk in it: - if ws.closed: - break - await ws.send_bytes(chunk) - if ws.closed: - break + with suppress(ConnectionResetError): + async for chunk in it: + if ws.closed: + break + await ws.send_bytes(chunk) + if ws.closed: + break async def _forward_reading(ws: WebSocketResponse, reader: asyncio.StreamReader) -> None: diff --git a/platform_monitoring/logs.py b/platform_monitoring/logs.py index 18a656a3..6a1055f7 100644 --- a/platform_monitoring/logs.py +++ b/platform_monitoring/logs.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import abc import asyncio import io @@ -5,10 +7,11 @@ import logging import zlib from collections.abc import AsyncIterator, Awaitable, Callable -from contextlib import AbstractAsyncContextManager +from contextlib import AbstractAsyncContextManager, suppress +from dataclasses import dataclass from datetime import datetime, timedelta, timezone from os.path import basename -from typing import Any, Optional +from typing import Any import aiohttp from aiobotocore.client import AioBaseClient @@ -16,6 +19,7 @@ from aioelasticsearch import Elasticsearch, RequestError from aioelasticsearch.helpers import Scan from async_timeout import timeout +from iso8601 import UTC, ParseError from neuro_logging import trace from .base import LogReader @@ -125,11 +129,11 @@ def __init__( client: KubeClient, pod_name: str, container_name: str, - client_conn_timeout_s: Optional[float] = None, - client_read_timeout_s: Optional[float] = None, + client_conn_timeout_s: float | None = None, + client_read_timeout_s: float | None = None, *, previous: bool = False, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> None: @@ -143,10 +147,10 @@ def __init__( self._timestamps = timestamps self._debug = debug - self._stream_cm: Optional[ + self._stream_cm: ( AbstractAsyncContextManager[aiohttp.StreamReader] - ] = None - self._iterator: Optional[AsyncIterator[bytes]] = None + ) | None = None + self._iterator: AsyncIterator[bytes] | None = None async def __aenter__(self) -> AsyncIterator[bytes]: await self._client.wait_pod_is_not_waiting(self._pod_name) @@ -191,7 +195,7 @@ def __init__( pod_name: str, container_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, ) -> None: super().__init__(container_runtime=container_runtime, timestamps=timestamps) @@ -203,8 +207,8 @@ def __init__( self._pod_name = pod_name self._container_name = container_name self._since = since - self._scan: Optional[Scan] = None - self._iterator: Optional[AsyncIterator[bytes]] = None + self._scan: Scan | None = None + self._iterator: AsyncIterator[bytes] | None = None def _combine_search_query(self) -> dict[str, Any]: terms = [ @@ -264,6 +268,59 @@ async def _iterate(self) -> AsyncIterator[bytes]: raise +@dataclass(frozen=True) +class S3LogRecord: + time: datetime + time_str: str + message: str + + @classmethod + def parse(cls, line: bytes, fallback_time: datetime) -> S3LogRecord: + data = json.loads(line.decode(errors="replace")) + time_str, time = cls._parse_time(data, fallback_time) + return cls( + time_str=time_str, + time=time, + message=cls._parse_message(data), + ) + + @classmethod + def _parse_time( + cls, data: dict[str, Any], default_time: datetime + ) -> tuple[str, datetime]: + time_str = data.get("time") + if time_str: + return (time_str, parse_date(time_str)) + # There are cases when unparsed log records are pushed to + # object storage, the reasons of this behavior are unknown for now. + # If there is no time key then record is considered not parsed by Fluent Bit. + # We can try to parse time from container runtime log message. + msg = data.get("log", "") + if result := cls._parse_time_from_message(msg): + return result + time_str = default_time.strftime("%Y-%m-%dT%H:%M:%S.%f") + return (time_str, default_time) + + @classmethod + def _parse_time_from_message(cls, msg: str) -> tuple[str, datetime] | None: + time_end = msg.find(" ") + if time_end > 0: + time_str = msg[:time_end] + with suppress(ParseError): + return (time_str, parse_date(time_str)) + return None + + @classmethod + def _parse_message(cls, data: dict[str, Any]) -> str: + log = data.get("log", "") + if "time" in data: + return log + # If there is no time key then record is considered not parsed by Fluent Bit. + # We need to extract raw message from container runtime log message. + log_arr = log.split(" ", 3) + return log_arr[-1] if len(log_arr) == 4 else "" + + class S3LogReader(LogReader): def __init__( self, @@ -275,7 +332,7 @@ def __init__( pod_name: str, container_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> None: @@ -289,7 +346,7 @@ def __init__( self._container_name = container_name self._since = since self._debug = debug - self._iterator: Optional[AsyncIterator[bytes]] = None + self._iterator: AsyncIterator[bytes] | None = None @staticmethod def get_prefix( @@ -314,10 +371,10 @@ async def __aenter__(self) -> AsyncIterator[bytes]: return self._iterator @trace - async def _load_log_keys(self, since: Optional[datetime]) -> list[str]: + async def _load_log_keys(self, since: datetime | None) -> list[str]: since_time_str = f"{since:%Y%m%d%H%M}" if since else "" paginator = self._s3_client.get_paginator("list_objects_v2") - keys: list[tuple[int, int, str]] = [] + keys = [] async for page in paginator.paginate( Bucket=self._bucket_name, Prefix=self._get_prefix() ): @@ -349,24 +406,31 @@ async def _iterate(self) -> AsyncIterator[bytes]: line_iterator = self._iter_decompressed_lines(response_body) else: line_iterator = response_body.iter_lines() + if self.last_time: + record_default_time = max(self.last_time, self._get_key_time(key)) + else: + record_default_time = self._get_key_time(key) + async with aclosing(line_iterator): async for line in line_iterator: try: - event = json.loads(line) - time_str = event["time"] - time = parse_date(time_str) - if since is not None and time < since: + record = S3LogRecord.parse(line, record_default_time) + record_default_time = record.time + if since is not None and record.time < since: continue - self.last_time = time + self.last_time = record.time if self._debug and key: yield f"~~~ From file {basename(key)}\n".encode() key = "" - log = event.get("log", "") - yield self.encode_log(time_str, log) + yield self.encode_log(record.time_str, record.message) except Exception: logger.exception("Invalid log entry: %r", line) raise + def _get_key_time(cls, key: str) -> datetime: + time_str = basename(key).split("_")[0] + return datetime.strptime(time_str, "%Y%m%d%H%M").replace(tzinfo=UTC) + @classmethod async def _iter_decompressed_lines( cls, body: StreamingBody @@ -392,14 +456,14 @@ async def get_pod_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, - separator: Optional[bytes] = None, + since: datetime | None = None, + separator: bytes | None = None, timestamps: bool = False, timeout_s: float = 10.0 * 60, interval_s: float = 1.0, archive_delay_s: float = DEFAULT_ARCHIVE_DELAY, debug: bool = False, - stop_func: Optional[Callable[[], Awaitable[bool]]] = None, + stop_func: Callable[[], Awaitable[bool]] | None = None, ) -> AsyncIterator[bytes]: archive_delay = timedelta(seconds=archive_delay_s) if stop_func is None: @@ -408,7 +472,7 @@ async def stop_func() -> bool: await asyncio.sleep(interval_s) return False - def get_last_start(status: ContainerStatus) -> Optional[datetime]: + def get_last_start(status: ContainerStatus) -> datetime | None: if status.is_running: return status.started_at elif status.is_pod_terminated: @@ -548,13 +612,13 @@ def get_last_start(status: ContainerStatus) -> Optional[datetime]: @abc.abstractmethod async def get_first_log_entry_time( self, pod_name: str, *, timeout_s: float = 2.0 * 60 - ) -> Optional[datetime]: + ) -> datetime | None: pass # pragma: no cover async def wait_pod_is_running( self, name: str, - old_start: Optional[datetime], + old_start: datetime | None, *, timeout_s: float = 10.0 * 60, interval_s: float = 1.0, @@ -578,7 +642,7 @@ def get_pod_live_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> LogReader: @@ -589,7 +653,7 @@ def get_pod_archive_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> LogReader: @@ -613,7 +677,7 @@ def get_pod_live_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> LogReader: @@ -628,7 +692,7 @@ def get_pod_live_log_reader( async def get_first_log_entry_time( self, pod_name: str, *, timeout_s: float = 2.0 * 60 - ) -> Optional[datetime]: + ) -> datetime | None: return await get_first_log_entry_time( self._kube_client, pod_name, timeout_s=timeout_s ) @@ -650,7 +714,7 @@ def get_pod_archive_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> LogReader: @@ -687,7 +751,7 @@ def get_pod_archive_log_reader( self, pod_name: str, *, - since: Optional[datetime] = None, + since: datetime | None = None, timestamps: bool = False, debug: bool = False, ) -> LogReader: @@ -724,7 +788,7 @@ async def drop_logs(self, pod_name: str) -> None: async def get_first_log_entry_time( kube_client: KubeClient, pod_name: str, *, timeout_s: float = 60 * 2.0 -) -> Optional[datetime]: +) -> datetime | None: """Return the timestamp of the first container log line from Kubernetes. Return None if the container is not created yet, or there are no logs yet, diff --git a/tests/integration/test_kube.py b/tests/integration/test_kube.py index d82306f8..e36013cc 100644 --- a/tests/integration/test_kube.py +++ b/tests/integration/test_kube.py @@ -8,7 +8,7 @@ import uuid from collections.abc import AsyncIterator, Callable, Coroutine, Iterator from contextlib import AbstractAsyncContextManager -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from pathlib import Path from typing import Any from unittest import mock @@ -1386,9 +1386,11 @@ async def coro() -> bytes | Exception: for i in range(4): run_log_reader(f"started [{i}]", delay=i * 2, timeout_s=90) await kube_client.wait_container_is_restarted(job_pod.name, 1) + await kube_client.wait_pod_is_running(job_pod.name) for i in range(4): run_log_reader(f"restarted 1 [{i}]", delay=i * 2) await kube_client.wait_container_is_restarted(job_pod.name, 2) + await kube_client.wait_pod_is_running(job_pod.name) for i in range(4): run_log_reader(f"restarted 2 [{i}]", delay=i * 2) await kube_client.wait_pod_is_terminated(job_pod.name) @@ -1478,11 +1480,12 @@ async def coro() -> bytes: assert finished1 await kube_client.wait_container_is_restarted(job_pod.name, 1) + await kube_client.wait_pod_is_running(job_pod.name) status = await kube_client.get_container_status(job_pod.name) started2 = status.started_at assert started2 run_log_reader(since=started2) - run_log_reader(since=datetime.now(tz=timezone.utc)) + run_log_reader(since=started2 + timedelta(seconds=2)) await kube_client.wait_pod_is_terminated(job_pod.name) status = await kube_client.get_container_status(job_pod.name) @@ -1491,6 +1494,7 @@ async def coro() -> bytes: assert finished2 await kube_client.wait_container_is_restarted(job_pod.name, 2) + await kube_client.wait_pod_is_running(job_pod.name) run_log_reader(since=finished1 - timedelta(seconds=4)) run_log_reader(since=started2) run_log_reader(since=finished2 - timedelta(seconds=4)) diff --git a/tests/unit/test_logs.py b/tests/unit/test_logs.py index 8bdebb03..92b11705 100644 --- a/tests/unit/test_logs.py +++ b/tests/unit/test_logs.py @@ -5,8 +5,9 @@ from unittest import mock import pytest +from iso8601 import UTC -from platform_monitoring.logs import S3LogReader +from platform_monitoring.logs import S3LogReader, S3LogRecord class TestS3LogReader: @@ -38,9 +39,9 @@ def setup_s3_key_content( ) -> Callable[[dict[str, list[str]]], None]: def _setup(content: dict[str, list[str]]) -> None: async def get_object(Key: str, *args: Any, **kwargs: Any) -> dict[str, Any]: - async def _iter() -> AsyncIterator[str]: + async def _iter() -> AsyncIterator[bytes]: for line in content[Key]: - yield line + yield line.encode() body = mock.AsyncMock() body.iter_lines = mock.MagicMock() @@ -114,3 +115,72 @@ def later_iso(sec: int = 0) -> str: async for chunk in it: res.append(chunk.decode()[:-1]) # -1 implies removal of \n assert log_lines == res + + +class TestS3LogRecord: + def test_parse(self) -> None: + record = S3LogRecord.parse( + b'{"time": "2023-10-01T12:34:56.123456789", "log": "hello"}', datetime.now() + ) + + assert record == S3LogRecord( + time=datetime(2023, 10, 1, 12, 34, 56, 123456, UTC), + time_str="2023-10-01T12:34:56.123456789", + message="hello", + ) + + def test_parse__no_log(self) -> None: + record = S3LogRecord.parse( + b'{"time": "2023-10-01T12:34:56.123456789"}', datetime.now() + ) + + assert record == S3LogRecord( + time=datetime(2023, 10, 1, 12, 34, 56, 123456, UTC), + time_str="2023-10-01T12:34:56.123456789", + message="", + ) + + def test_parse__invalid_unicode(self) -> None: + record = S3LogRecord.parse( + b'{"time": "2023-10-01T12:34:56.123456789", "log": "hello\xff"}', + datetime.now(), + ) + + assert record == S3LogRecord( + time=datetime(2023, 10, 1, 12, 34, 56, 123456, UTC), + time_str="2023-10-01T12:34:56.123456789", + message="hello�", + ) + + def test_parse__no_time_key(self) -> None: + record = S3LogRecord.parse( + b'{"log": "2023-10-01T12:34:56.123456789 stdout P hello"}', + datetime.now(), + ) + + assert record == S3LogRecord( + time=datetime(2023, 10, 1, 12, 34, 56, 123456, UTC), + time_str="2023-10-01T12:34:56.123456789", + message="hello", + ) + + def test_parse__no_time_key__invalid_time(self) -> None: + fallback_time = datetime(2023, 10, 1, 12, 34, 56, 123456, UTC) + record = S3LogRecord.parse(b'{"log": "invalid stdout P hello"}', fallback_time) + + assert record == S3LogRecord( + time=fallback_time, + time_str="2023-10-01T12:34:56.123456", + message="hello", + ) + + def test_parse__no_time_key__no_message(self) -> None: + record = S3LogRecord.parse( + b'{"log": "2023-10-01T12:34:56.123456789 stdout P "}', datetime.now() + ) + + assert record == S3LogRecord( + time=datetime(2023, 10, 1, 12, 34, 56, 123456, UTC), + time_str="2023-10-01T12:34:56.123456789", + message="", + ) From 5d8eeb4657e3c224132b3fd031c2f2a692abcea5 Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Fri, 20 Oct 2023 17:48:57 +0300 Subject: [PATCH 2/2] cleanup --- platform_monitoring/logs.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/platform_monitoring/logs.py b/platform_monitoring/logs.py index 6a1055f7..f1538f7a 100644 --- a/platform_monitoring/logs.py +++ b/platform_monitoring/logs.py @@ -407,15 +407,15 @@ async def _iterate(self) -> AsyncIterator[bytes]: else: line_iterator = response_body.iter_lines() if self.last_time: - record_default_time = max(self.last_time, self._get_key_time(key)) + record_fallback_time = max(self.last_time, self._get_key_time(key)) else: - record_default_time = self._get_key_time(key) + record_fallback_time = self._get_key_time(key) async with aclosing(line_iterator): async for line in line_iterator: try: - record = S3LogRecord.parse(line, record_default_time) - record_default_time = record.time + record = S3LogRecord.parse(line, record_fallback_time) + record_fallback_time = record.time if since is not None and record.time < since: continue self.last_time = record.time