Skip to content

Commit

Permalink
✨ [file-based cdk] S3 file format adapter (airbytehq#29353)
Browse files Browse the repository at this point in the history
* [ISSUE airbytehq#28893] infer csv schema

* [ISSUE airbytehq#28893] align with pyarrow

* Automated Commit - Formatting Changes

* [ISSUE airbytehq#28893] legacy inference and infer only when needed

* [ISSUE airbytehq#28893] fix scenario tests

* [ISSUE airbytehq#28893] using discovered schema as part of read

* [ISSUE airbytehq#28893] self-review + cleanup

* [ISSUE airbytehq#28893] fix test

* [ISSUE airbytehq#28893] code review part #1

* [ISSUE airbytehq#28893] code review part #2

* Fix test

* formatcdk

* [ISSUE airbytehq#28893] code review

* FIX test log level

* Re-adding failing tests

* [ISSUE airbytehq#28893] improve inferrence to consider multiple types per value

* Automated Commit - Formatting Changes

* add file adapters for avro, csv, jsonl, and parquet

* fix try catch

* pr feedback with a few additional default options set

* fix things from the rebase of master

---------

Co-authored-by: maxi297 <maxime@airbyte.io>
Co-authored-by: maxi297 <maxi297@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 14, 2023
1 parent 86ba823 commit 82b8274
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from datetime import datetime
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional, Union

from source_s3.source import SourceS3Spec
from source_s3.source_files_abstract.formats.avro_spec import AvroFormat
from source_s3.source_files_abstract.formats.csv_spec import CsvFormat
from source_s3.source_files_abstract.formats.jsonl_spec import JsonlFormat
from source_s3.source_files_abstract.formats.parquet_spec import ParquetFormat

SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand All @@ -27,7 +32,6 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
"file_type": legacy_config.format.filetype,
"globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix),
"validation_policy": "Emit Record",
# todo: add formats on a per-type basis in follow up PRs
}
],
}
Expand All @@ -42,6 +46,8 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
transformed_config["endpoint"] = legacy_config.provider.endpoint
if legacy_config.user_schema and legacy_config.user_schema != "{}":
transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema
if legacy_config.format:
transformed_config["streams"][0]["format"] = cls._transform_file_format(legacy_config.format)

return transformed_config

Expand All @@ -55,6 +61,53 @@ def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]:
def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
try:
parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT)
return datetime.strftime(parsed_datetime, MICROS_FORMAT)
return parsed_datetime.strftime(MICROS_FORMAT)
except ValueError as e:
raise e
raise ValueError("Timestamp could not be parsed when transforming legacy connector config") from e

@classmethod
def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat]) -> Mapping[str, Any]:
if isinstance(format_options, AvroFormat):
return {"filetype": "avro"}
elif isinstance(format_options, CsvFormat):
additional_reader_options = cls.parse_config_options_str("additional_reader_options", format_options.additional_reader_options)
advanced_options = cls.parse_config_options_str("advanced_options", format_options.advanced_options)

csv_options = {
"filetype": "csv",
"delimiter": format_options.delimiter,
"quote_char": format_options.quote_char,
"double_quote": format_options.double_quote,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.get("strings_can_be_null", False),
}

if format_options.escape_char:
csv_options["escape_char"] = format_options.escape_char
if format_options.encoding:
csv_options["encoding"] = format_options.encoding
if "skip_rows" in advanced_options:
csv_options["skip_rows_before_header"] = advanced_options["skip_rows"]
if "skip_rows_after_names" in advanced_options:
csv_options["skip_rows_after_header"] = advanced_options["skip_rows_after_names"]
if "autogenerate_column_names" in advanced_options:
csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"]
return csv_options
elif isinstance(format_options, JsonlFormat):
return {"filetype": "jsonl"}
elif isinstance(format_options, ParquetFormat):
return {"filetype": "parquet"}
else:
# This should never happen because it would fail schema validation
raise ValueError(f"Format filetype {format_options} is not a supported file type")

@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Mapping[str, Any]:
options_str = options_value or "{}"
try:
return json.loads(options_str)
except json.JSONDecodeError as error:
raise ValueError(f"Malformed {options_field} config json: {error}. Please ensure that it is a valid JSON.")
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@
},
"format": {
"filetype": "csv",
"delimiter": "^",
"quote_char": "|",
"escape_char": "!",
"double_quote": True,
"quoting_behavior": "Quote All"
"filetype": "avro",
},
"path_pattern": "**/*.csv",
"path_pattern": "**/*.avro",
"schema": '{"col1": "string", "col2": "integer"}'
},
{
Expand All @@ -43,10 +38,13 @@
"streams": [
{
"name": "test_data",
"file_type": "csv",
"globs": ["a_folder/**/*.csv"],
"file_type": "avro",
"globs": ["a_folder/**/*.avro"],
"validation_policy": "Emit Record",
"input_schema": '{"col1": "string", "col2": "integer"}',
"format": {
"filetype": "avro"
}
}
]
}
Expand All @@ -62,16 +60,19 @@
"format": {
"filetype": "avro",
},
"path_pattern": "**/*.csv",
"path_pattern": "**/*.avro",
},
{
"bucket": "test_bucket",
"streams": [
{
"name": "test_data",
"file_type": "avro",
"globs": ["**/*.csv"],
"globs": ["**/*.avro"],
"validation_policy": "Emit Record",
"format": {
"filetype": "avro"
}
}
]
}
Expand All @@ -84,3 +85,172 @@ def test_convert_legacy_config(legacy_config, expected_config):
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)

assert actual_config == expected_config


@pytest.mark.parametrize(
"file_type,legacy_format_config,expected_format_config, expected_error",
[
pytest.param(
"csv",
{
"filetype": "csv",
"delimiter": "&",
"infer_datatypes": False,
"quote_char": "^",
"escape_char": "$",
"encoding": "ansi",
"double_quote": False,
"newlines_in_values": True,
"additional_reader_options": "{\"strings_can_be_null\": true}",
"advanced_options": "{\"skip_rows\": 3, \"skip_rows_after_names\": 5, \"autogenerate_column_names\": true}",
"blocksize": 20000,
},
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"escape_char": "$",
"encoding": "ansi",
"double_quote": False,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "None",
"strings_can_be_null": True,
"skip_rows_before_header": 3,
"skip_rows_after_header": 5,
"autogenerate_column_names": True,
},
None,
id="test_csv_all_legacy_options_set"),
pytest.param(
"csv",
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"double_quote": True,
"newlines_in_values": False,
},
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_csv_only_required_options"),
pytest.param(
"csv",
{},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_csv_empty_format"),
pytest.param(
"csv",
{
"additional_reader_options": "{\"not_valid\": \"at all}",
},
None,
ValueError,
id="test_malformed_additional_reader_options"),
pytest.param(
"csv",
{
"advanced_options": "{\"not_valid\": \"at all}",
},
None,
ValueError,
id="test_malformed_advanced_options"),
pytest.param(
"jsonl",
{
"filetype": "jsonl",
"newlines_in_values": True,
"unexpected_field_behavior": "ignore",
"block_size": 0,
},
{
"filetype": "jsonl"
},
None,
id="test_jsonl_format"),
pytest.param(
"parquet",
{
"filetype": "parquet",
"columns": ["test"],
"batch_size": 65536,
"buffer_size": 100,
},
{
"filetype": "parquet"
},
None,
id="test_parquet_format"),
pytest.param(
"avro",
{
"filetype": "avro",
},
{
"filetype": "avro"
},
None,
id="test_avro_format"),
]
)
def test_convert_file_format(file_type, legacy_format_config, expected_format_config, expected_error):
legacy_config = {
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",

},
"format": legacy_format_config,
"path_pattern": f"**/*.{file_type}",
}

expected_config = {
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"streams": [
{
"name": "test_data",
"file_type": file_type,
"globs": [f"**/*.{file_type}"],
"validation_policy": "Emit Record",
"format": expected_format_config
}
]
}

parsed_legacy_config = SourceS3Spec(**legacy_config)

if expected_error:
with pytest.raises(expected_error):
LegacyConfigTransformer.convert(parsed_legacy_config)
else:
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)
assert actual_config == expected_config

0 comments on commit 82b8274

Please sign in to comment.