diff --git a/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py index 8406b682f249..d7ec5c7e955d 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/v4/legacy_config_transformer.py @@ -2,10 +2,15 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json from datetime import datetime -from typing import Any, List, Mapping +from typing import Any, List, Mapping, Optional, Union from source_s3.source import SourceS3Spec +from source_s3.source_files_abstract.formats.avro_spec import AvroFormat +from source_s3.source_files_abstract.formats.csv_spec import CsvFormat +from source_s3.source_files_abstract.formats.jsonl_spec import JsonlFormat +from source_s3.source_files_abstract.formats.parquet_spec import ParquetFormat SECONDS_FORMAT = "%Y-%m-%dT%H:%M:%SZ" MICROS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -27,7 +32,6 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]: "file_type": legacy_config.format.filetype, "globs": cls._create_globs(legacy_config.path_pattern, legacy_config.provider.path_prefix), "validation_policy": "Emit Record", - # todo: add formats on a per-type basis in follow up PRs } ], } @@ -42,6 +46,8 @@ def convert(cls, legacy_config: SourceS3Spec) -> Mapping[str, Any]: transformed_config["endpoint"] = legacy_config.provider.endpoint if legacy_config.user_schema and legacy_config.user_schema != "{}": transformed_config["streams"][0]["input_schema"] = legacy_config.user_schema + if legacy_config.format: + transformed_config["streams"][0]["format"] = cls._transform_file_format(legacy_config.format) return transformed_config @@ -55,6 +61,53 @@ def _create_globs(cls, path_pattern: str, path_prefix: str) -> List[str]: def _transform_seconds_to_micros(cls, datetime_str: str) -> str: try: parsed_datetime = datetime.strptime(datetime_str, SECONDS_FORMAT) - return datetime.strftime(parsed_datetime, MICROS_FORMAT) + return parsed_datetime.strftime(MICROS_FORMAT) except ValueError as e: - raise e + raise ValueError("Timestamp could not be parsed when transforming legacy connector config") from e + + @classmethod + def _transform_file_format(cls, format_options: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat]) -> Mapping[str, Any]: + if isinstance(format_options, AvroFormat): + return {"filetype": "avro"} + elif isinstance(format_options, CsvFormat): + additional_reader_options = cls.parse_config_options_str("additional_reader_options", format_options.additional_reader_options) + advanced_options = cls.parse_config_options_str("advanced_options", format_options.advanced_options) + + csv_options = { + "filetype": "csv", + "delimiter": format_options.delimiter, + "quote_char": format_options.quote_char, + "double_quote": format_options.double_quote, + "null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"], + "true_values": ["y", "yes", "t", "true", "on", "1"], + "false_values": ["n", "no", "f", "false", "off", "0"], + "inference_type": "Primitive Types Only" if format_options.infer_datatypes else "None", + "strings_can_be_null": additional_reader_options.get("strings_can_be_null", False), + } + + if format_options.escape_char: + csv_options["escape_char"] = format_options.escape_char + if format_options.encoding: + csv_options["encoding"] = format_options.encoding + if "skip_rows" in advanced_options: + csv_options["skip_rows_before_header"] = advanced_options["skip_rows"] + if "skip_rows_after_names" in advanced_options: + csv_options["skip_rows_after_header"] = advanced_options["skip_rows_after_names"] + if "autogenerate_column_names" in advanced_options: + csv_options["autogenerate_column_names"] = advanced_options["autogenerate_column_names"] + return csv_options + elif isinstance(format_options, JsonlFormat): + return {"filetype": "jsonl"} + elif isinstance(format_options, ParquetFormat): + return {"filetype": "parquet"} + else: + # This should never happen because it would fail schema validation + raise ValueError(f"Format filetype {format_options} is not a supported file type") + + @classmethod + def parse_config_options_str(cls, options_field: str, options_value: Optional[str]) -> Mapping[str, Any]: + options_str = options_value or "{}" + try: + return json.loads(options_str) + except json.JSONDecodeError as error: + raise ValueError(f"Malformed {options_field} config json: {error}. Please ensure that it is a valid JSON.") diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py index e0b5dd777be1..aabbca08da71 100644 --- a/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py +++ b/airbyte-integrations/connectors/source-s3/unit_tests/v4/test_legacy_config_transformer.py @@ -24,14 +24,9 @@ }, "format": { - "filetype": "csv", - "delimiter": "^", - "quote_char": "|", - "escape_char": "!", - "double_quote": True, - "quoting_behavior": "Quote All" + "filetype": "avro", }, - "path_pattern": "**/*.csv", + "path_pattern": "**/*.avro", "schema": '{"col1": "string", "col2": "integer"}' }, { @@ -43,10 +38,13 @@ "streams": [ { "name": "test_data", - "file_type": "csv", - "globs": ["a_folder/**/*.csv"], + "file_type": "avro", + "globs": ["a_folder/**/*.avro"], "validation_policy": "Emit Record", "input_schema": '{"col1": "string", "col2": "integer"}', + "format": { + "filetype": "avro" + } } ] } @@ -62,7 +60,7 @@ "format": { "filetype": "avro", }, - "path_pattern": "**/*.csv", + "path_pattern": "**/*.avro", }, { "bucket": "test_bucket", @@ -70,8 +68,11 @@ { "name": "test_data", "file_type": "avro", - "globs": ["**/*.csv"], + "globs": ["**/*.avro"], "validation_policy": "Emit Record", + "format": { + "filetype": "avro" + } } ] } @@ -84,3 +85,172 @@ def test_convert_legacy_config(legacy_config, expected_config): actual_config = LegacyConfigTransformer.convert(parsed_legacy_config) assert actual_config == expected_config + + +@pytest.mark.parametrize( + "file_type,legacy_format_config,expected_format_config, expected_error", + [ + pytest.param( + "csv", + { + "filetype": "csv", + "delimiter": "&", + "infer_datatypes": False, + "quote_char": "^", + "escape_char": "$", + "encoding": "ansi", + "double_quote": False, + "newlines_in_values": True, + "additional_reader_options": "{\"strings_can_be_null\": true}", + "advanced_options": "{\"skip_rows\": 3, \"skip_rows_after_names\": 5, \"autogenerate_column_names\": true}", + "blocksize": 20000, + }, + { + "filetype": "csv", + "delimiter": "&", + "quote_char": "^", + "escape_char": "$", + "encoding": "ansi", + "double_quote": False, + "null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"], + "true_values": ["y", "yes", "t", "true", "on", "1"], + "false_values": ["n", "no", "f", "false", "off", "0"], + "inference_type": "None", + "strings_can_be_null": True, + "skip_rows_before_header": 3, + "skip_rows_after_header": 5, + "autogenerate_column_names": True, + }, + None, + id="test_csv_all_legacy_options_set"), + pytest.param( + "csv", + { + "filetype": "csv", + "delimiter": "&", + "quote_char": "^", + "double_quote": True, + "newlines_in_values": False, + }, + { + "filetype": "csv", + "delimiter": "&", + "quote_char": "^", + "encoding": "utf8", + "double_quote": True, + "null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"], + "true_values": ["y", "yes", "t", "true", "on", "1"], + "false_values": ["n", "no", "f", "false", "off", "0"], + "inference_type": "Primitive Types Only", + "strings_can_be_null": False, + }, + None, + id="test_csv_only_required_options"), + pytest.param( + "csv", + {}, + { + "filetype": "csv", + "delimiter": ",", + "quote_char": "\"", + "encoding": "utf8", + "double_quote": True, + "null_values": ["", "null", "NULL", "N/A", "NA", "NaN", "None"], + "true_values": ["y", "yes", "t", "true", "on", "1"], + "false_values": ["n", "no", "f", "false", "off", "0"], + "inference_type": "Primitive Types Only", + "strings_can_be_null": False, + }, + None, + id="test_csv_empty_format"), + pytest.param( + "csv", + { + "additional_reader_options": "{\"not_valid\": \"at all}", + }, + None, + ValueError, + id="test_malformed_additional_reader_options"), + pytest.param( + "csv", + { + "advanced_options": "{\"not_valid\": \"at all}", + }, + None, + ValueError, + id="test_malformed_advanced_options"), + pytest.param( + "jsonl", + { + "filetype": "jsonl", + "newlines_in_values": True, + "unexpected_field_behavior": "ignore", + "block_size": 0, + }, + { + "filetype": "jsonl" + }, + None, + id="test_jsonl_format"), + pytest.param( + "parquet", + { + "filetype": "parquet", + "columns": ["test"], + "batch_size": 65536, + "buffer_size": 100, + }, + { + "filetype": "parquet" + }, + None, + id="test_parquet_format"), + pytest.param( + "avro", + { + "filetype": "avro", + }, + { + "filetype": "avro" + }, + None, + id="test_avro_format"), + ] +) +def test_convert_file_format(file_type, legacy_format_config, expected_format_config, expected_error): + legacy_config = { + "dataset": "test_data", + "provider": { + "storage": "S3", + "bucket": "test_bucket", + "aws_access_key_id": "some_access_key", + "aws_secret_access_key": "some_secret", + + }, + "format": legacy_format_config, + "path_pattern": f"**/*.{file_type}", + } + + expected_config = { + "bucket": "test_bucket", + "aws_access_key_id": "some_access_key", + "aws_secret_access_key": "some_secret", + "streams": [ + { + "name": "test_data", + "file_type": file_type, + "globs": [f"**/*.{file_type}"], + "validation_policy": "Emit Record", + "format": expected_format_config + } + ] + } + + parsed_legacy_config = SourceS3Spec(**legacy_config) + + if expected_error: + with pytest.raises(expected_error): + LegacyConfigTransformer.convert(parsed_legacy_config) + else: + actual_config = LegacyConfigTransformer.convert(parsed_legacy_config) + assert actual_config == expected_config