Skip to content

Commit

Permalink
Source S3: use AirbyteTracedException (#18602)
Browse files Browse the repository at this point in the history
* #750 # 837 #904 Source S3: use AirbyteTracedException

* source s3: upd changelog

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
2 people authored and nataly committed Nov 3, 2022
1 parent 9d94a06 commit b34d328
Show file tree
Hide file tree
Showing 16 changed files with 105 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@
- name: S3
sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2
dockerRepository: airbyte/source-s3
dockerImageTag: 0.1.24
dockerImageTag: 0.1.25
documentationUrl: https://docs.airbyte.com/integrations/sources/s3
icon: s3.svg
sourceType: file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10310,7 +10310,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-s3:0.1.24"
- dockerImage: "airbyte/source-s3:0.1.25"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/s3"
changelogUrl: "https://docs.airbyte.com/integrations/sources/s3"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.24
LABEL io.airbyte.version=0.1.25
LABEL io.airbyte.name=airbyte/source-s3
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"aws_access_key_id": "123456",
"aws_secret_access_key": "123456key",
"path_prefix": "",
"endpoint": "http://10.0.92.4:9000"
"endpoint": "http://10.0.56.135:9000"
},
"format": {
"filetype": "csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ def _stream_records_test_logic(
for file_dict in stream_slice["files"]:
# TODO: if we ever test other filetypes in these tests this will need fixing
file_reader = CsvParser(format)
with file_dict["storage_file"].open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f).keys()))
storage_file = file_dict["storage_file"]
with storage_file.open(file_reader.is_binary) as f:
expected_columns.extend(list(file_reader.get_inferred_schema(f, storage_file.file_info).keys()))
expected_columns = set(expected_columns) # de-dupe

for record in fs.read_records(sync_mode, stream_slice=stream_slice):
Expand Down
35 changes: 35 additions & 0 deletions airbyte-integrations/connectors/source-s3/source_s3/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from typing import List, Optional, Union

from airbyte_cdk.models import FailureType
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

from .source_files_abstract.file_info import FileInfo


class S3Exception(AirbyteTracedException):
def __init__(
self,
file_info: Union[List[FileInfo], FileInfo],
internal_message: Optional[str] = None,
message: Optional[str] = None,
failure_type: FailureType = FailureType.system_error,
exception: BaseException = None,
):
file_info = (
file_info
if isinstance(file_info, (list, tuple))
else [
file_info,
]
)
file_names = ", ".join([file.key for file in file_info])
user_friendly_message = f"""
The connector encountered an error while processing the file(s): {file_names}.
{message}
This can be an input configuration error as well, please double check your connection settings.
"""
super().__init__(internal_message=internal_message, message=user_friendly_message, failure_type=failure_type, exception=exception)
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ def is_binary(self) -> bool:
"""

@abstractmethod
def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
"""
Override this with format-specifc logic to infer the schema of file
Note: needs to return inferred schema with JsonSchema datatypes
:param file: file-like object (opened via StorageFile)
:param file_info: file metadata
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import fastavro
from fastavro import reader
from source_s3.source_files_abstract.file_info import FileInfo

from .abstract_file_parser import AbstractFileParser

Expand Down Expand Up @@ -69,18 +70,20 @@ def _get_avro_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
else:
return schema

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
"""Return schema
:param file: file-like object (opened via StorageFile)
:param file_info: file metadata
:return: mapping of {columns:datatypes} where datatypes are JsonSchema types
"""
avro_schema = self._get_avro_schema(file)
schema_dict = self._parse_data_type(data_type_mapping, avro_schema)
return schema_dict

def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
"""Stream the data using a generator
:param file: file-like object (opened via StorageFile)
:param file_info: file metadata
:yield: data record as a mapping of {columns:values}
"""
avro_reader = reader(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import pyarrow as pa
import six # type: ignore[import]
from pyarrow import csv as pa_csv
from source_s3.exceptions import S3Exception
from source_s3.source_files_abstract.file_info import FileInfo
from source_s3.utils import get_value_or_json_if_empty_string, run_in_external_process

from .abstract_file_parser import AbstractFileParser
Expand All @@ -20,6 +22,19 @@
TMP_FOLDER = tempfile.mkdtemp()


def wrap_exception(exceptions: Tuple[type, ...]):
def wrapper(fn: callable):
def inner(self, file: Union[TextIO, BinaryIO], file_info: FileInfo):
try:
return fn(self, file, file_info)
except exceptions as e:
raise S3Exception(file_info, str(e), str(e), exception=e)

return inner

return wrapper


class CsvParser(AbstractFileParser):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
Expand Down Expand Up @@ -74,7 +89,8 @@ def _convert_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str
**json.loads(additional_reader_options),
}

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
@wrap_exception((ValueError,))
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
This now uses multiprocessing in order to timeout the schema inference as it can hang.
Expand Down Expand Up @@ -146,7 +162,8 @@ def _get_schema_dict_without_inference(self, file: Union[TextIO, BinaryIO]) -> M
field_names = next(reader)
return {field_name.strip(): pyarrow.string() for field_name in field_names}

def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
@wrap_exception((ValueError,))
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.csv.open_csv.html
PyArrow returns lists of values for each column so we zip() these up into records which we then yield
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import pyarrow as pa
from pyarrow import json as pa_json
from source_s3.source_files_abstract.file_info import FileInfo

from .abstract_file_parser import AbstractFileParser
from .jsonl_spec import JsonlFormat
Expand Down Expand Up @@ -73,7 +74,7 @@ def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, A
file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema))
)

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]:
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Mapping[str, Any]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
Json reader support multi thread hence, donot need to add external process
Expand All @@ -93,7 +94,7 @@ def field_type_to_str(type_: Any) -> str:
schema_dict = {field.name: field_type_to_str(field.type) for field in table.schema}
return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True)

def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
from typing import Any, BinaryIO, Iterator, List, Mapping, TextIO, Tuple, Union

import pyarrow.parquet as pq
from airbyte_cdk.models import FailureType
from pyarrow.parquet import ParquetFile
from source_s3.exceptions import S3Exception
from source_s3.source_files_abstract.file_info import FileInfo

from .abstract_file_parser import AbstractFileParser
from .parquet_spec import ParquetFormat
Expand Down Expand Up @@ -85,7 +88,7 @@ def convert_field_data(logical_type: str, field_value: Any) -> Any:
return func(field_value) if func else field_value
raise TypeError(f"unsupported field type: {logical_type}, value: {field_value}")

def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
def get_inferred_schema(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> dict:
"""
https://arrow.apache.org/docs/python/parquet.html#finer-grained-reading-and-writing
Expand All @@ -97,10 +100,10 @@ def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> dict:
}
if not schema_dict:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
raise S3Exception(file_info, "empty Parquet file", "The .parquet file is empty!", FailureType.config_error)
return schema_dict

def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]:
def stream_records(self, file: Union[TextIO, BinaryIO], file_info: FileInfo) -> Iterator[Mapping[str, Any]]:
"""
https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetFile.html
PyArrow reads streaming batches from a Parquet file
Expand All @@ -116,7 +119,7 @@ def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str,
}
if not reader.schema:
# pyarrow can parse empty parquet files but a connector can't generate dynamic schema
raise OSError("empty Parquet file")
raise S3Exception(file_info, "empty Parquet file", "The .parquet file is empty!", FailureType.config_error)

args = self._select_options("columns", "batch_size") # type: ignore[arg-type]
self.logger.debug(f"Found the {reader.num_row_groups} Parquet groups")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Union

from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import FailureType
from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources.streams import Stream
from wcmatch.glob import GLOBSTAR, SPLIT, globmatch

from ..exceptions import S3Exception
from .file_info import FileInfo
from .formats.abstract_file_parser import AbstractFileParser
from .formats.avro_parser import AvroParser
Expand Down Expand Up @@ -221,14 +223,16 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:

file_reader = self.fileformatparser_class(self._format)

processed_files = []
for file_info in self.get_time_ordered_file_infos():
# skip this file if it's earlier than min_datetime
if (min_datetime is not None) and (file_info.last_modified < min_datetime):
continue

storagefile = self.storagefile_class(file_info, self._provider)
with storagefile.open(file_reader.is_binary) as f:
this_schema = file_reader.get_inferred_schema(f)
this_schema = file_reader.get_inferred_schema(f, file_info)
processed_files.append(file_info)

if this_schema == master_schema:
continue # exact schema match so go to next file
Expand All @@ -249,15 +253,18 @@ def _get_master_schema(self, min_datetime: datetime = None) -> Dict[str, Any]:
master_schema[col] = broadest_of_types
if override_type or type_explicitly_defined:
LOGGER.warn(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
f"Detected mismatched datatype on column '{col}', in file '{file_info}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'. "
+ f"Airbyte will attempt to coerce this to {master_schema[col]} on read."
)
continue
# otherwise throw an error on mismatching datatypes
raise RuntimeError(
f"Detected mismatched datatype on column '{col}', in file '{storagefile.url}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'."
raise S3Exception(
processed_files,
"Column type mismatch",
f"Detected mismatched datatype on column '{col}', in file '{file_info}'. "
+ f"Should be '{master_schema[col]}', but found '{this_schema[col]}'.",
failure_type=FailureType.config_error,
)

# missing columns in this_schema doesn't affect our master_schema, so we don't check for it here
Expand Down Expand Up @@ -343,7 +350,7 @@ def _read_from_slice(
storage_file: StorageFile = file_item["storage_file"]
with storage_file.open(file_reader.is_binary) as f:
# TODO: make this more efficient than mutating every record one-by-one as they stream
for record in file_reader.stream_records(f):
for record in file_reader.stream_records(f, storage_file.file_info):
schema_matched_record = self._match_target_schema(record, list(self._get_schema_map().keys()))
complete_record = self._add_extra_fields_from_map(
schema_matched_record,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,25 +132,28 @@ def _get_readmode(self, file_info: Mapping[str, Any]) -> str:

@memory_limit(1024)
def test_suite_inferred_schema(self, file_info: Mapping[str, Any]) -> None:
file_info_instance = FileInfo(key=file_info["filepath"], size=os.stat(file_info["filepath"]).st_size, last_modified=datetime.now())
with smart_open(file_info["filepath"], self._get_readmode(file_info)) as f:
if "test_get_inferred_schema" in file_info["fails"]:
with pytest.raises(Exception) as e_info:
file_info["AbstractFileParser"].get_inferred_schema(f)
file_info["AbstractFileParser"].get_inferred_schema(f), file_info_instance
self.logger.debug(str(e_info))
else:
assert file_info["AbstractFileParser"].get_inferred_schema(f) == file_info["inferred_schema"]
assert file_info["AbstractFileParser"].get_inferred_schema(f, file_info_instance) == file_info["inferred_schema"]

@memory_limit(1024)
def test_stream_suite_records(self, file_info: Mapping[str, Any]) -> None:
filepath = file_info["filepath"]
self.logger.info(f"read the file: {filepath}, size: {os.stat(filepath).st_size / (1024 ** 2)}Mb")
file_size = os.stat(filepath).st_size
file_info_instance = FileInfo(key=filepath, size=file_size, last_modified=datetime.now())
self.logger.info(f"read the file: {filepath}, size: {file_size / (1024 ** 2)}Mb")
with smart_open(filepath, self._get_readmode(file_info)) as f:
if "test_stream_records" in file_info["fails"]:
with pytest.raises(Exception) as e_info:
[print(r) for r in file_info["AbstractFileParser"].stream_records(f)]
[print(r) for r in file_info["AbstractFileParser"].stream_records(f, file_info_instance)]
self.logger.debug(str(e_info))
else:
records = [r for r in file_info["AbstractFileParser"].stream_records(f)]
records = [r for r in file_info["AbstractFileParser"].stream_records(f, file_info_instance)]

assert len(records) == file_info["num_records"]
for index, expected_record in file_info["line_checks"].items():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from pathlib import Path
from typing import Any, List, Mapping, Tuple

import pendulum
import pytest
from smart_open import open as smart_open
from source_s3.source_files_abstract.file_info import FileInfo
from source_s3.source_files_abstract.formats.csv_parser import CsvParser

from .abstract_test_parser import AbstractTestParser, memory_limit
Expand Down Expand Up @@ -403,7 +405,7 @@ def test_big_file(self) -> None:
next(expected_file)
read_count = 0
with smart_open(filepath, self._get_readmode({"AbstractFileParser": parser})) as f:
for record in parser.stream_records(f):
for record in parser.stream_records(f, FileInfo(key=filepath, size=file_size, last_modified=pendulum.now())):
record_line = ",".join("" if v is None else str(v) for v in record.values())
expected_line = next(expected_file).strip("\n")
assert record_line == expected_line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pytest
from airbyte_cdk import AirbyteLogger
from airbyte_cdk.models import SyncMode
from source_s3.exceptions import S3Exception
from source_s3.source_files_abstract.file_info import FileInfo
from source_s3.source_files_abstract.storagefile import StorageFile
from source_s3.source_files_abstract.stream import IncrementalFileStream
Expand Down Expand Up @@ -642,7 +643,7 @@ def test_master_schema(
dataset="dummy", provider={}, format={"filetype": "csv"}, schema=user_schema, path_pattern="**/prefix*.csv"
)
if error_expected:
with pytest.raises(RuntimeError):
with pytest.raises(S3Exception):
stream_instance._get_master_schema(min_datetime=min_datetime)
else:
assert stream_instance._get_master_schema(min_datetime=min_datetime) == expected_schema
Expand Down
Loading

0 comments on commit b34d328

Please sign in to comment.