Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle invalid unicode in logs #846

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ async def port_forward(self, request: Request) -> StreamResponse:


async def _listen(ws: WebSocketResponse) -> None:
# Maintain the WebSocket connetion.
# Maintain the WebSocket connection.
# Process ping-pong game and perform closing handshake.
async for msg in ws:
logger.info(f"Received unexpected WebSocket message: {msg!r}")
Expand All @@ -493,12 +493,13 @@ async def _listen(ws: WebSocketResponse) -> None:
async def _forward_bytes_iterating(
ws: WebSocketResponse, it: AsyncIterator[bytes]
) -> None:
async for chunk in it:
if ws.closed:
break
await ws.send_bytes(chunk)
if ws.closed:
break
with suppress(ConnectionResetError):
async for chunk in it:
if ws.closed:
break
await ws.send_bytes(chunk)
if ws.closed:
break


async def _forward_reading(ws: WebSocketResponse, reader: asyncio.StreamReader) -> None:
Expand Down
134 changes: 99 additions & 35 deletions platform_monitoring/logs.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
from __future__ import annotations

import abc
import asyncio
import io
import json
import logging
import zlib
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import AbstractAsyncContextManager
from contextlib import AbstractAsyncContextManager, suppress
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from os.path import basename
from typing import Any, Optional
from typing import Any

import aiohttp
from aiobotocore.client import AioBaseClient
from aiobotocore.response import StreamingBody
from aioelasticsearch import Elasticsearch, RequestError
from aioelasticsearch.helpers import Scan
from async_timeout import timeout
from iso8601 import UTC, ParseError
from neuro_logging import trace

from .base import LogReader
Expand Down Expand Up @@ -125,11 +129,11 @@ def __init__(
client: KubeClient,
pod_name: str,
container_name: str,
client_conn_timeout_s: Optional[float] = None,
client_read_timeout_s: Optional[float] = None,
client_conn_timeout_s: float | None = None,
client_read_timeout_s: float | None = None,
*,
previous: bool = False,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> None:
Expand All @@ -143,10 +147,10 @@ def __init__(
self._timestamps = timestamps
self._debug = debug

self._stream_cm: Optional[
self._stream_cm: (
AbstractAsyncContextManager[aiohttp.StreamReader]
] = None
self._iterator: Optional[AsyncIterator[bytes]] = None
) | None = None
self._iterator: AsyncIterator[bytes] | None = None

async def __aenter__(self) -> AsyncIterator[bytes]:
await self._client.wait_pod_is_not_waiting(self._pod_name)
Expand Down Expand Up @@ -191,7 +195,7 @@ def __init__(
pod_name: str,
container_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
) -> None:
super().__init__(container_runtime=container_runtime, timestamps=timestamps)
Expand All @@ -203,8 +207,8 @@ def __init__(
self._pod_name = pod_name
self._container_name = container_name
self._since = since
self._scan: Optional[Scan] = None
self._iterator: Optional[AsyncIterator[bytes]] = None
self._scan: Scan | None = None
self._iterator: AsyncIterator[bytes] | None = None

def _combine_search_query(self) -> dict[str, Any]:
terms = [
Expand Down Expand Up @@ -264,6 +268,59 @@ async def _iterate(self) -> AsyncIterator[bytes]:
raise


@dataclass(frozen=True)
class S3LogRecord:
time: datetime
time_str: str
message: str

@classmethod
def parse(cls, line: bytes, fallback_time: datetime) -> S3LogRecord:
data = json.loads(line.decode(errors="replace"))
time_str, time = cls._parse_time(data, fallback_time)
return cls(
time_str=time_str,
time=time,
message=cls._parse_message(data),
)

@classmethod
def _parse_time(
cls, data: dict[str, Any], default_time: datetime
) -> tuple[str, datetime]:
time_str = data.get("time")
if time_str:
return (time_str, parse_date(time_str))
# There are cases when unparsed log records are pushed to
# object storage, the reasons of this behavior are unknown for now.
# If there is no time key then record is considered not parsed by Fluent Bit.
# We can try to parse time from container runtime log message.
msg = data.get("log", "")
if result := cls._parse_time_from_message(msg):
return result
time_str = default_time.strftime("%Y-%m-%dT%H:%M:%S.%f")
return (time_str, default_time)

@classmethod
def _parse_time_from_message(cls, msg: str) -> tuple[str, datetime] | None:
time_end = msg.find(" ")
if time_end > 0:
time_str = msg[:time_end]
with suppress(ParseError):
return (time_str, parse_date(time_str))
return None

@classmethod
def _parse_message(cls, data: dict[str, Any]) -> str:
log = data.get("log", "")
if "time" in data:
return log
# If there is no time key then record is considered not parsed by Fluent Bit.
# We need to extract raw message from container runtime log message.
log_arr = log.split(" ", 3)
return log_arr[-1] if len(log_arr) == 4 else ""


class S3LogReader(LogReader):
def __init__(
self,
Expand All @@ -275,7 +332,7 @@ def __init__(
pod_name: str,
container_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> None:
Expand All @@ -289,7 +346,7 @@ def __init__(
self._container_name = container_name
self._since = since
self._debug = debug
self._iterator: Optional[AsyncIterator[bytes]] = None
self._iterator: AsyncIterator[bytes] | None = None

@staticmethod
def get_prefix(
Expand All @@ -314,10 +371,10 @@ async def __aenter__(self) -> AsyncIterator[bytes]:
return self._iterator

@trace
async def _load_log_keys(self, since: Optional[datetime]) -> list[str]:
async def _load_log_keys(self, since: datetime | None) -> list[str]:
since_time_str = f"{since:%Y%m%d%H%M}" if since else ""
paginator = self._s3_client.get_paginator("list_objects_v2")
keys: list[tuple[int, int, str]] = []
keys = []
async for page in paginator.paginate(
Bucket=self._bucket_name, Prefix=self._get_prefix()
):
Expand Down Expand Up @@ -349,24 +406,31 @@ async def _iterate(self) -> AsyncIterator[bytes]:
line_iterator = self._iter_decompressed_lines(response_body)
else:
line_iterator = response_body.iter_lines()
if self.last_time:
record_fallback_time = max(self.last_time, self._get_key_time(key))
else:
record_fallback_time = self._get_key_time(key)

async with aclosing(line_iterator):
async for line in line_iterator:
try:
event = json.loads(line)
time_str = event["time"]
time = parse_date(time_str)
if since is not None and time < since:
record = S3LogRecord.parse(line, record_fallback_time)
record_fallback_time = record.time
if since is not None and record.time < since:
continue
self.last_time = time
self.last_time = record.time
if self._debug and key:
yield f"~~~ From file {basename(key)}\n".encode()
key = ""
log = event.get("log", "")
yield self.encode_log(time_str, log)
yield self.encode_log(record.time_str, record.message)
except Exception:
logger.exception("Invalid log entry: %r", line)
raise

def _get_key_time(cls, key: str) -> datetime:
time_str = basename(key).split("_")[0]
return datetime.strptime(time_str, "%Y%m%d%H%M").replace(tzinfo=UTC)

@classmethod
async def _iter_decompressed_lines(
cls, body: StreamingBody
Expand All @@ -392,14 +456,14 @@ async def get_pod_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
separator: Optional[bytes] = None,
since: datetime | None = None,
separator: bytes | None = None,
timestamps: bool = False,
timeout_s: float = 10.0 * 60,
interval_s: float = 1.0,
archive_delay_s: float = DEFAULT_ARCHIVE_DELAY,
debug: bool = False,
stop_func: Optional[Callable[[], Awaitable[bool]]] = None,
stop_func: Callable[[], Awaitable[bool]] | None = None,
) -> AsyncIterator[bytes]:
archive_delay = timedelta(seconds=archive_delay_s)
if stop_func is None:
Expand All @@ -408,7 +472,7 @@ async def stop_func() -> bool:
await asyncio.sleep(interval_s)
return False

def get_last_start(status: ContainerStatus) -> Optional[datetime]:
def get_last_start(status: ContainerStatus) -> datetime | None:
if status.is_running:
return status.started_at
elif status.is_pod_terminated:
Expand Down Expand Up @@ -548,13 +612,13 @@ def get_last_start(status: ContainerStatus) -> Optional[datetime]:
@abc.abstractmethod
async def get_first_log_entry_time(
self, pod_name: str, *, timeout_s: float = 2.0 * 60
) -> Optional[datetime]:
) -> datetime | None:
pass # pragma: no cover

async def wait_pod_is_running(
self,
name: str,
old_start: Optional[datetime],
old_start: datetime | None,
*,
timeout_s: float = 10.0 * 60,
interval_s: float = 1.0,
Expand All @@ -578,7 +642,7 @@ def get_pod_live_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> LogReader:
Expand All @@ -589,7 +653,7 @@ def get_pod_archive_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> LogReader:
Expand All @@ -613,7 +677,7 @@ def get_pod_live_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> LogReader:
Expand All @@ -628,7 +692,7 @@ def get_pod_live_log_reader(

async def get_first_log_entry_time(
self, pod_name: str, *, timeout_s: float = 2.0 * 60
) -> Optional[datetime]:
) -> datetime | None:
return await get_first_log_entry_time(
self._kube_client, pod_name, timeout_s=timeout_s
)
Expand All @@ -650,7 +714,7 @@ def get_pod_archive_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> LogReader:
Expand Down Expand Up @@ -687,7 +751,7 @@ def get_pod_archive_log_reader(
self,
pod_name: str,
*,
since: Optional[datetime] = None,
since: datetime | None = None,
timestamps: bool = False,
debug: bool = False,
) -> LogReader:
Expand Down Expand Up @@ -724,7 +788,7 @@ async def drop_logs(self, pod_name: str) -> None:

async def get_first_log_entry_time(
kube_client: KubeClient, pod_name: str, *, timeout_s: float = 60 * 2.0
) -> Optional[datetime]:
) -> datetime | None:
"""Return the timestamp of the first container log line from Kubernetes.

Return None if the container is not created yet, or there are no logs yet,
Expand Down
8 changes: 6 additions & 2 deletions tests/integration/test_kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from collections.abc import AsyncIterator, Callable, Coroutine, Iterator
from contextlib import AbstractAsyncContextManager
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from unittest import mock
Expand Down Expand Up @@ -1386,9 +1386,11 @@ async def coro() -> bytes | Exception:
for i in range(4):
run_log_reader(f"started [{i}]", delay=i * 2, timeout_s=90)
await kube_client.wait_container_is_restarted(job_pod.name, 1)
await kube_client.wait_pod_is_running(job_pod.name)
for i in range(4):
run_log_reader(f"restarted 1 [{i}]", delay=i * 2)
await kube_client.wait_container_is_restarted(job_pod.name, 2)
await kube_client.wait_pod_is_running(job_pod.name)
for i in range(4):
run_log_reader(f"restarted 2 [{i}]", delay=i * 2)
await kube_client.wait_pod_is_terminated(job_pod.name)
Expand Down Expand Up @@ -1478,11 +1480,12 @@ async def coro() -> bytes:
assert finished1

await kube_client.wait_container_is_restarted(job_pod.name, 1)
await kube_client.wait_pod_is_running(job_pod.name)
status = await kube_client.get_container_status(job_pod.name)
started2 = status.started_at
assert started2
run_log_reader(since=started2)
run_log_reader(since=datetime.now(tz=timezone.utc))
run_log_reader(since=started2 + timedelta(seconds=2))

await kube_client.wait_pod_is_terminated(job_pod.name)
status = await kube_client.get_container_status(job_pod.name)
Expand All @@ -1491,6 +1494,7 @@ async def coro() -> bytes:
assert finished2

await kube_client.wait_container_is_restarted(job_pod.name, 2)
await kube_client.wait_pod_is_running(job_pod.name)
run_log_reader(since=finished1 - timedelta(seconds=4))
run_log_reader(since=started2)
run_log_reader(since=finished2 - timedelta(seconds=4))
Expand Down
Loading