Skip to content

Commit

Permalink
🐛 SAT: fix logs slicing (#8848)
Browse files Browse the repository at this point in the history
* fix logs slicing

* add unit test

* add more tests

* bumb version

* send last line if there are not trackback or container error
  • Loading branch information
antixar authored Dec 17, 2021
1 parent 0d7101e commit da40c4f
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.36
LABEL io.airbyte.version=0.1.38
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -115,24 +115,35 @@ def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[
def read(cls, container: Container, command: str = None, with_ext: bool = True) -> Iterable[str]:
"""Reads connector's logs per line"""
buffer = b""
has_exception = False
exception = ""
line = ""
for chunk in container.logs(stdout=True, stderr=True, stream=True, follow=True):

buffer += chunk
found = buffer.find(b"\n")
if found <= -1:
continue
line = buffer[:found].decode("utf-8")
if has_exception or "Traceback (most recent call last)" in line:
has_exception = True
while True:
# every chunk can include several lines
found = buffer.find(b"\n")
if found <= -1:
break

line = buffer[: found + 1].decode("utf-8")
if len(exception) > 0 or line.startswith("Traceback (most recent call last)"):
exception += line
else:
yield line
buffer = buffer[found + 1 :]

if buffer:
# send the latest chunk if exists
line = buffer.decode("utf-8")
if exception:
exception += line
else:
yield line
buffer = buffer[found + 1 :]
if not has_exception and buffer:
yield buffer.decode("utf-8")

exit_status = container.wait()
if exit_status["StatusCode"]:
error = buffer.decode("utf-8") if has_exception else exit_status["Error"]
error = exit_status["Error"] or exception or line
logging.error(f"Docker container was failed, " f'code {exit_status["StatusCode"]}, error:\n{error}')
if with_ext:
raise ContainerError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import random
import string
from functools import partial
from typing import Iterable
from unittest.mock import Mock

import pytest
from docker.errors import ContainerError
from source_acceptance_test.utils.compare import make_hashable
from source_acceptance_test.utils.connector_runner import ConnectorRunner


def not_sorted_data():
Expand Down Expand Up @@ -164,3 +170,82 @@ def test_exclude_fields():
output = map(serializer, data)
for item in output:
assert "organization_id" not in item


class MockContainer:
def __init__(self, status: dict, iter_logs: Iterable):
self.wait = Mock(return_value=status)
self.logs = Mock(return_value=iter(iter_logs))

class Image:
pass

self.image = Image()


def binary_generator(lengths, last_line=None):
data = ""
for length in lengths:
data += "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) + "\n"
data = data.encode()
chunk_size = random.randint(512, 1024)

while len(data) > chunk_size:
yield data[:chunk_size]
data = data[chunk_size:]
yield data
if last_line:
yield ("bla-1234567890-bla\n" + last_line).encode()


def test_successful_logs_reading():
line_count = 1234
line_lengths = [random.randint(0, 1024 * 20) for _ in range(line_count)]
lines = [
line for line in ConnectorRunner.read(container=MockContainer(status={"StatusCode": 0}, iter_logs=binary_generator(line_lengths)))
]
assert line_count == len(lines)
for line, length in zip(lines, line_lengths):
assert len(line) - 1 == length


@pytest.mark.parametrize(
"traceback,container_error,last_line,expected_error",
(
# container returns a some internal error
(
"Traceback (most recent call last):\n File \"<stdin>\", line 1, in <module>\nKeyError: 'bbbb'",
"Some Container Error",
"Last Container Logs Line",
"Some Container Error",
),
# container returns a raw traceback
(
"Traceback (most recent call last):\n File \"<stdin>\", line 1, in <module>\nKeyError: 'bbbb'",
None,
"Last Container Logs Line",
"Traceback (most recent call last):\n File \"<stdin>\", line 1, in <module>\nKeyError: 'bbbb'",
),
# container doesn't return any tracebacks or errors
(
None,
None,
"Last Container Logs Line",
"Last Container Logs Line",
),
),
)
def test_failed_reading(traceback, container_error, last_line, expected_error):
line_count = 10
line_lengths = [random.randint(0, 523) for _ in range(line_count)]

with pytest.raises(ContainerError) as exc:
list(
ConnectorRunner.read(
container=MockContainer(
status={"StatusCode": 1, "Error": container_error}, iter_logs=binary_generator(line_lengths, traceback or last_line)
)
)
)

assert expected_error == exc.value.stderr

0 comments on commit da40c4f

Please sign in to comment.