diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index 69b9f8470706..572f770bf6dd 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./ COPY source_acceptance_test ./source_acceptance_test RUN pip install . -LABEL io.airbyte.version=0.1.36 +LABEL io.airbyte.version=0.1.38 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 28afee88f54e..3d88cd59148c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -115,24 +115,35 @@ def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[ def read(cls, container: Container, command: str = None, with_ext: bool = True) -> Iterable[str]: """Reads connector's logs per line""" buffer = b"" - has_exception = False + exception = "" + line = "" for chunk in container.logs(stdout=True, stderr=True, stream=True, follow=True): + buffer += chunk - found = buffer.find(b"\n") - if found <= -1: - continue - line = buffer[:found].decode("utf-8") - if has_exception or "Traceback (most recent call last)" in line: - has_exception = True + while True: + # every chunk can include several lines + found = buffer.find(b"\n") + if found <= -1: + break + + line = buffer[: found + 1].decode("utf-8") + if len(exception) > 0 or line.startswith("Traceback (most recent call last)"): + exception += line + else: + yield line + buffer = buffer[found + 1 :] + + if buffer: + # send the latest chunk if exists + line = buffer.decode("utf-8") + if exception: + exception += line else: yield line - buffer = buffer[found + 1 :] - if not has_exception and buffer: - yield buffer.decode("utf-8") exit_status = container.wait() if exit_status["StatusCode"]: - error = buffer.decode("utf-8") if has_exception else exit_status["Error"] + error = exit_status["Error"] or exception or line logging.error(f"Docker container was failed, " f'code {exit_status["StatusCode"]}, error:\n{error}') if with_ext: raise ContainerError( diff --git a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py index 50320605a7a1..f116a29bb51c 100644 --- a/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py +++ b/airbyte-integrations/bases/source-acceptance-test/unit_tests/test_utils.py @@ -2,10 +2,16 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import random +import string from functools import partial +from typing import Iterable +from unittest.mock import Mock import pytest +from docker.errors import ContainerError from source_acceptance_test.utils.compare import make_hashable +from source_acceptance_test.utils.connector_runner import ConnectorRunner def not_sorted_data(): @@ -164,3 +170,82 @@ def test_exclude_fields(): output = map(serializer, data) for item in output: assert "organization_id" not in item + + +class MockContainer: + def __init__(self, status: dict, iter_logs: Iterable): + self.wait = Mock(return_value=status) + self.logs = Mock(return_value=iter(iter_logs)) + + class Image: + pass + + self.image = Image() + + +def binary_generator(lengths, last_line=None): + data = "" + for length in lengths: + data += "".join(random.choice(string.ascii_uppercase + string.digits) for _ in range(length)) + "\n" + data = data.encode() + chunk_size = random.randint(512, 1024) + + while len(data) > chunk_size: + yield data[:chunk_size] + data = data[chunk_size:] + yield data + if last_line: + yield ("bla-1234567890-bla\n" + last_line).encode() + + +def test_successful_logs_reading(): + line_count = 1234 + line_lengths = [random.randint(0, 1024 * 20) for _ in range(line_count)] + lines = [ + line for line in ConnectorRunner.read(container=MockContainer(status={"StatusCode": 0}, iter_logs=binary_generator(line_lengths))) + ] + assert line_count == len(lines) + for line, length in zip(lines, line_lengths): + assert len(line) - 1 == length + + +@pytest.mark.parametrize( + "traceback,container_error,last_line,expected_error", + ( + # container returns a some internal error + ( + "Traceback (most recent call last):\n File \"\", line 1, in \nKeyError: 'bbbb'", + "Some Container Error", + "Last Container Logs Line", + "Some Container Error", + ), + # container returns a raw traceback + ( + "Traceback (most recent call last):\n File \"\", line 1, in \nKeyError: 'bbbb'", + None, + "Last Container Logs Line", + "Traceback (most recent call last):\n File \"\", line 1, in \nKeyError: 'bbbb'", + ), + # container doesn't return any tracebacks or errors + ( + None, + None, + "Last Container Logs Line", + "Last Container Logs Line", + ), + ), +) +def test_failed_reading(traceback, container_error, last_line, expected_error): + line_count = 10 + line_lengths = [random.randint(0, 523) for _ in range(line_count)] + + with pytest.raises(ContainerError) as exc: + list( + ConnectorRunner.read( + container=MockContainer( + status={"StatusCode": 1, "Error": container_error}, iter_logs=binary_generator(line_lengths, traceback or last_line) + ) + ) + ) + + assert expected_error == exc.value.stderr