Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ [file-based cdk] S3 file format adapter #29353

Merged
merged 24 commits into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
eaba483
[ISSUE #28893] infer csv schema
maxi297 Aug 4, 2023
60757c1
[ISSUE #28893] align with pyarrow
maxi297 Aug 4, 2023
f49235d
Automated Commit - Formatting Changes
maxi297 Aug 4, 2023
a394666
[ISSUE #28893] legacy inference and infer only when needed
maxi297 Aug 7, 2023
4f9d162
[ISSUE #28893] fix scenario tests
maxi297 Aug 7, 2023
0617c82
[ISSUE #28893] using discovered schema as part of read
maxi297 Aug 7, 2023
d157aa3
[ISSUE #28893] self-review + cleanup
maxi297 Aug 8, 2023
57b011f
[ISSUE #28893] fix test
maxi297 Aug 8, 2023
71cdca9
[ISSUE #28893] code review part #1
maxi297 Aug 9, 2023
ef8f5f5
Merge branch 'master' into issue-28893/infer-schema-csv
maxi297 Aug 9, 2023
f651d03
[ISSUE #28893] code review part #2
maxi297 Aug 9, 2023
a573a89
Fix test
maxi297 Aug 9, 2023
0ce37e5
formatcdk
maxi297 Aug 9, 2023
82db6c3
[ISSUE #28893] code review
maxi297 Aug 9, 2023
3027a4f
FIX test log level
maxi297 Aug 9, 2023
ac91730
Re-adding failing tests
maxi297 Aug 9, 2023
f1a60ba
[ISSUE #28893] improve inferrence to consider multiple types per value
maxi297 Aug 10, 2023
7113603
Merge branch 'master' into issue-28893/infer-schema-csv
maxi297 Aug 10, 2023
c9e2004
Automated Commit - Formatting Changes
maxi297 Aug 10, 2023
e6a0b4d
add file adapters for avro, csv, jsonl, and parquet
brianjlai Aug 11, 2023
2f827c0
fix try catch
brianjlai Aug 11, 2023
868a597
pr feedback with a few additional default options set
brianjlai Aug 11, 2023
e48b9d5
Merge branch 'master' into brian/s3_csv_format_adapter
brianjlai Aug 14, 2023
bbfbd80
fix things from the rebase of master
brianjlai Aug 14, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

import json
from datetime import datetime
from typing import Any, List, Mapping
from typing import Any, List, Mapping, Optional, Union

from source_s3.source import SourceS3Spec
from source_s3.source_files_abstract.formats.avro_spec import AvroFormat
from source_s3.source_files_abstract.formats.csv_spec import CsvFormat
from source_s3.source_files_abstract.formats.jsonl_spec import JsonlFormat
from source_s3.source_files_abstract.formats.parquet_spec import ParquetFormat

SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand All @@ -27,7 +32,6 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
"file_type": legacy_config.format.filetype,
"globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix),
"validation_policy": "Emit Record",
# todo: add formats on a per-type basis in follow up PRs
}
],
}
Expand All @@ -42,6 +46,8 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]:
transformed_config["endpoint"] = legacy_config.provider.endpoint
if legacy_config.user_schema and legacy_config.user_schema != "{}":
transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema
if legacy_config.format:
transformed_config["streams"][0]["format"] = cls._transform_file_format(legacy_config.format)

return transformed_config

Expand All @@ -55,6 +61,53 @@ def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]:
def _transform_seconds_to_micros(cls, datetime_str: str) -> str:
try:
parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT)
return datetime.strftime(parsed_datetime, MICROS_FORMAT)
return parsed_datetime.strftime(MICROS_FORMAT)
except ValueError as e:
raise e
raise ValueError("Timestamp could not be parsed when transforming legacy connector config") from e

@classmethod
def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat]) -> Mapping[str, Any]:
if isinstance(format_options, AvroFormat):
return {"filetype": "avro"}
elif isinstance(format_options, CsvFormat):
additional_reader_options = cls.parse_config_options_str("additional_reader_options", format_options.additional_reader_options)
advanced_options = cls.parse_config_options_str("advanced_options", format_options.advanced_options)

csv_options = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also pass the options that are hidden in the CsvSpec.advanced_options

  • advanced_options["skip_rows"] -> skip_rows_before_header
  • advanced_options["skip_rows_after_names"] -> skip_rows_after_header
  • advanced_options["autogenerate_column_names"] -> autogenerate_column_names

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. will add!

"filetype": "csv",
"delimiter": format_options.delimiter,
"quote_char": format_options.quote_char,
"double_quote": format_options.double_quote,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None",
"strings_can_be_null": additional_reader_options.get("strings_can_be_null", False),
}

if format_options.escape_char:
csv_options["escape_char"] = format_options.escape_char
if format_options.encoding:
csv_options["encoding"] = format_options.encoding
if "skip_rows" in advanced_options:
csv_options["skip_rows_before_header"] = advanced_options["skip_rows"]
if "skip_rows_after_names" in advanced_options:
csv_options["skip_rows_after_header"] = advanced_options["skip_rows_after_names"]
if "autogenerate_column_names" in advanced_options:
csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"]
return csv_options
elif isinstance(format_options, JsonlFormat):
return {"filetype": "jsonl"}
elif isinstance(format_options, ParquetFormat):
return {"filetype": "parquet"}
else:
# This should never happen because it would fail schema validation
raise ValueError(f"Format filetype {format_options} is not a supported file type")

@classmethod
def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Mapping[str, Any]:
options_str = options_value or "{}"
try:
return json.loads(options_str)
except json.JSONDecodeError as error:
raise ValueError(f"Malformed {options_field} config json: {error}. Please ensure that it is a valid JSON.")
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@

},
"format": {
"filetype": "csv",
"delimiter": "^",
"quote_char": "|",
"escape_char": "!",
"double_quote": True,
"quoting_behavior": "Quote All"
"filetype": "avro",
},
"path_pattern": "**/*.csv",
"path_pattern": "**/*.avro",
"schema": '{"col1": "string", "col2": "integer"}'
},
{
Expand All @@ -43,10 +38,13 @@
"streams": [
{
"name": "test_data",
"file_type": "csv",
"globs": ["a_folder/**/*.csv"],
"file_type": "avro",
"globs": ["a_folder/**/*.avro"],
"validation_policy": "Emit Record",
"input_schema": '{"col1": "string", "col2": "integer"}',
"format": {
"filetype": "avro"
}
}
]
}
Expand All @@ -62,16 +60,19 @@
"format": {
"filetype": "avro",
},
"path_pattern": "**/*.csv",
"path_pattern": "**/*.avro",
},
{
"bucket": "test_bucket",
"streams": [
{
"name": "test_data",
"file_type": "avro",
"globs": ["**/*.csv"],
"globs": ["**/*.avro"],
"validation_policy": "Emit Record",
"format": {
"filetype": "avro"
}
}
]
}
Expand All @@ -84,3 +85,172 @@ def test_convert_legacy_config(legacy_config, expected_config):
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)

assert actual_config == expected_config


@pytest.mark.parametrize(
"file_type,legacy_format_config,expected_format_config, expected_error",
[
pytest.param(
"csv",
{
"filetype": "csv",
"delimiter": "&",
"infer_datatypes": False,
"quote_char": "^",
"escape_char": "$",
"encoding": "ansi",
"double_quote": False,
"newlines_in_values": True,
"additional_reader_options": "{\"strings_can_be_null\": true}",
"advanced_options": "{\"skip_rows\": 3, \"skip_rows_after_names\": 5, \"autogenerate_column_names\": true}",
"blocksize": 20000,
},
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"escape_char": "$",
"encoding": "ansi",
"double_quote": False,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "None",
"strings_can_be_null": True,
"skip_rows_before_header": 3,
"skip_rows_after_header": 5,
"autogenerate_column_names": True,
},
None,
id="test_csv_all_legacy_options_set"),
pytest.param(
"csv",
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"double_quote": True,
"newlines_in_values": False,
},
{
"filetype": "csv",
"delimiter": "&",
"quote_char": "^",
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_csv_only_required_options"),
pytest.param(
"csv",
{},
{
"filetype": "csv",
"delimiter": ",",
"quote_char": "\"",
"encoding": "utf8",
"double_quote": True,
"null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"],
"true_values": ["y", "yes", "t", "true", "on", "1"],
"false_values": ["n", "no", "f", "false", "off", "0"],
"inference_type": "Primitive Types Only",
"strings_can_be_null": False,
},
None,
id="test_csv_empty_format"),
pytest.param(
"csv",
{
"additional_reader_options": "{\"not_valid\": \"at all}",
},
None,
ValueError,
id="test_malformed_additional_reader_options"),
pytest.param(
"csv",
{
"advanced_options": "{\"not_valid\": \"at all}",
},
None,
ValueError,
id="test_malformed_advanced_options"),
pytest.param(
"jsonl",
{
"filetype": "jsonl",
"newlines_in_values": True,
"unexpected_field_behavior": "ignore",
"block_size": 0,
},
{
"filetype": "jsonl"
},
None,
id="test_jsonl_format"),
pytest.param(
"parquet",
{
"filetype": "parquet",
"columns": ["test"],
"batch_size": 65536,
"buffer_size": 100,
},
{
"filetype": "parquet"
},
None,
id="test_parquet_format"),
pytest.param(
"avro",
{
"filetype": "avro",
},
{
"filetype": "avro"
},
None,
id="test_avro_format"),
]
)
def test_convert_file_format(file_type, legacy_format_config, expected_format_config, expected_error):
legacy_config = {
"dataset": "test_data",
"provider": {
"storage": "S3",
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",

},
"format": legacy_format_config,
"path_pattern": f"**/*.{file_type}",
}

expected_config = {
"bucket": "test_bucket",
"aws_access_key_id": "some_access_key",
"aws_secret_access_key": "some_secret",
"streams": [
{
"name": "test_data",
"file_type": file_type,
"globs": [f"**/*.{file_type}"],
"validation_policy": "Emit Record",
"format": expected_format_config
}
]
}

parsed_legacy_config = SourceS3Spec(**legacy_config)

if expected_error:
with pytest.raises(expected_error):
LegacyConfigTransformer.convert(parsed_legacy_config)
else:
actual_config = LegacyConfigTransformer.convert(parsed_legacy_config)
assert actual_config == expected_config