From 690479d221210f9510eef35b30ad85b0dcc6283c Mon Sep 17 00:00:00 2001 From: Alexandre Girard Date: Mon, 14 Aug 2023 18:13:52 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Source=20S3=20(v4):=20Set=20decimal=5F?= =?UTF-8?q?as=5Ffloat=20to=20True=20for=20parquet=20files=20(#29342)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [ISSUE #28893] infer csv schema * [ISSUE #28893] align with pyarrow * Automated Commit - Formatting Changes * [ISSUE #28893] legacy inference and infer only when needed * [ISSUE #28893] fix scenario tests * [ISSUE #28893] using discovered schema as part of read * [ISSUE #28893] self-review + cleanup * [ISSUE #28893] fix test * [ISSUE #28893] code review part #1 * [ISSUE #28893] code review part #2 * Fix test * formatcdk * [ISSUE #28893] code review * FIX test log level * Re-adding failing tests * [ISSUE #28893] improve inferrence to consider multiple types per value * set decimal_as_float to True * update * Automated Commit - Formatting Changes * add file adapters for avro, csv, jsonl, and parquet * fix try catch * update * format * pr feedback with a few additional default options set --------- Co-authored-by: maxi297 Co-authored-by: maxi297 Co-authored-by: brianjlai --- .../source_s3/v4/legacy_config_transformer.py | 3 +- .../v4/test_legacy_config_transformer.py | 80 +++++++++---------- 2 files changed, 40 insertions(+), 43 deletions(-) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py index d7ec5c7e955d..b32f835d46c6 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py @@ -96,10 +96,11 @@ def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat, if "autogenerate_column_names" in advanced_options: csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"] return csv_options + elif isinstance(format_options, JsonlFormat): return {"filetype": "jsonl"} elif isinstance(format_options, ParquetFormat): - return {"filetype": "parquet"} + return {"filetype": "parquet", "decimal_as_float": True} else: # This should never happen because it would fail schema validation raise ValueError(f"Format filetype {format_options} is not a supported file type") diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py index aabbca08da71..e9c38fb18182 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py @@ -20,14 +20,13 @@ "aws_secret_access_key": "some_secret", "endpoint": "https://external-s3.com", "path_prefix": "a_folder/", - "start_date": "2022-01-01T01:02:03Z" - + "start_date": "2022-01-01T01:02:03Z", }, "format": { "filetype": "avro", }, "path_pattern": "**/*.avro", - "schema": '{"col1": "string", "col2": "integer"}' + "schema": '{"col1": "string", "col2": "integer"}', }, { "bucket": "test_bucket", @@ -42,13 +41,11 @@ "globs": ["a_folder/**/*.avro"], "validation_policy": "Emit Record", "input_schema": '{"col1": "string", "col2": "integer"}', - "format": { - "filetype": "avro" - } + "format": {"filetype": "avro"}, } - ] - } - , id="test_convert_legacy_config" + ], + }, + id="test_convert_legacy_config", ), pytest.param( { @@ -70,15 +67,13 @@ "file_type": "avro", "globs": ["**/*.avro"], "validation_policy": "Emit Record", - "format": { - "filetype": "avro" - } + "format": {"filetype": "avro"}, } - ] - } - , id="test_convert_no_optional_fields" + ], + }, + id="test_convert_no_optional_fields", ), - ] + ], ) def test_convert_legacy_config(legacy_config, expected_config): parsed_legacy_config = SourceS3Spec(**legacy_config) @@ -101,8 +96,8 @@ def test_convert_legacy_config(legacy_config, expected_config): "encoding": "ansi", "double_quote": False, "newlines_in_values": True, - "additional_reader_options": "{\"strings_can_be_null\": true}", - "advanced_options": "{\"skip_rows\": 3, \"skip_rows_after_names\": 5, \"autogenerate_column_names\": true}", + "additional_reader_options": '{"strings_can_be_null": true}', + "advanced_options": '{"skip_rows": 3, "skip_rows_after_names": 5, "autogenerate_column_names": true}', "blocksize": 20000, }, { @@ -122,7 +117,8 @@ def test_convert_legacy_config(legacy_config, expected_config): "autogenerate_column_names": True, }, None, - id="test_csv_all_legacy_options_set"), + id="test_csv_all_legacy_options_set", + ), pytest.param( "csv", { @@ -145,14 +141,15 @@ def test_convert_legacy_config(legacy_config, expected_config): "strings_can_be_null": False, }, None, - id="test_csv_only_required_options"), + id="test_csv_only_required_options", + ), pytest.param( "csv", {}, { "filetype": "csv", "delimiter": ",", - "quote_char": "\"", + "quote_char": '"', "encoding": "utf8", "double_quote": True, "null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"], @@ -162,23 +159,26 @@ def test_convert_legacy_config(legacy_config, expected_config): "strings_can_be_null": False, }, None, - id="test_csv_empty_format"), + id="test_csv_empty_format", + ), pytest.param( "csv", { - "additional_reader_options": "{\"not_valid\": \"at all}", + "additional_reader_options": '{"not_valid": "at all}', }, None, ValueError, - id="test_malformed_additional_reader_options"), + id="test_malformed_additional_reader_options", + ), pytest.param( "csv", { - "advanced_options": "{\"not_valid\": \"at all}", + "advanced_options": '{"not_valid": "at all}', }, None, ValueError, - id="test_malformed_advanced_options"), + id="test_malformed_advanced_options", + ), pytest.param( "jsonl", { @@ -187,11 +187,10 @@ def test_convert_legacy_config(legacy_config, expected_config): "unexpected_field_behavior": "ignore", "block_size": 0, }, - { - "filetype": "jsonl" - }, + {"filetype": "jsonl"}, None, - id="test_jsonl_format"), + id="test_jsonl_format", + ), pytest.param( "parquet", { @@ -200,22 +199,20 @@ def test_convert_legacy_config(legacy_config, expected_config): "batch_size": 65536, "buffer_size": 100, }, - { - "filetype": "parquet" - }, + {"filetype": "parquet", "decimal_as_float": True}, None, - id="test_parquet_format"), + id="test_parquet_format", + ), pytest.param( "avro", { "filetype": "avro", }, - { - "filetype": "avro" - }, + {"filetype": "avro"}, None, - id="test_avro_format"), - ] + id="test_avro_format", + ), + ], ) def test_convert_file_format(file_type, legacy_format_config, expected_format_config, expected_error): legacy_config = { @@ -225,7 +222,6 @@ def test_convert_file_format(file_type, legacy_format_config, expected_format_co "bucket": "test_bucket", "aws_access_key_id": "some_access_key", "aws_secret_access_key": "some_secret", - }, "format": legacy_format_config, "path_pattern": f"**/*.{file_type}", @@ -241,9 +237,9 @@ def test_convert_file_format(file_type, legacy_format_config, expected_format_co "file_type": file_type, "globs": [f"**/*.{file_type}"], "validation_policy": "Emit Record", - "format": expected_format_config + "format": expected_format_config, } - ] + ], } parsed_legacy_config = SourceS3Spec(**legacy_config)