diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 1e490c431214..92539c2da9e6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -833,7 +833,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.17 + dockerImageTag: 0.1.18 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e5ca41da048b..49cfcb0b65c4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7892,7 +7892,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.17" +- dockerImage: "airbyte/source-s3:0.1.18" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/s3" changelogUrl: "https://docs.airbyte.io/integrations/sources/s3" @@ -8071,6 +8071,51 @@ title: "Filetype" const: "avro" type: "string" + - title: "Jsonl" + description: "This connector uses PyArrow for JSON Lines (jsonl) file parsing." + type: "object" + properties: + filetype: + title: "Filetype" + const: "jsonl" + type: "string" + newlines_in_values: + title: "Allow newlines in values" + description: "Whether newline characters are allowed in JSON values.\ + \ Turning this on may affect performance. Leave blank to default\ + \ to False." + default: false + order: 0 + type: "boolean" + unexpected_field_behavior: + title: "Unexpected field behavior" + description: "How JSON fields outside of explicit_schema (if given)\ + \ are treated. Check PyArrow documentation for details" + default: "infer" + examples: + - "ignore" + - "infer" + - "error" + order: 1 + allOf: + - title: "UnexpectedFieldBehaviorEnum" + description: "An enumeration." + enum: + - "ignore" + - "infer" + - "error" + type: "string" + block_size: + title: "Block Size" + description: "The chunk size in bytes to process at a time in memory\ + \ from each file. If your data is particularly wide and failing\ + \ during schema detection, increasing this should solve it. Beware\ + \ of raising this too high as you could hit OOM errors." + default: 10000 + order: 2 + type: "integer" schema: title: "Manually enforced data schema (Optional)" description: "Optionally provide a schema to enforce, as a valid JSON string.\ diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0229bab4038c..2511e5a681c8 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.17 +LABEL io.airbyte.version=0.1.18 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index bf55f54e0d9d..abbd23b5e02d 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -13,6 +13,12 @@ tests: status: "succeed" # # for Avro format - config_path: "secrets/avro_config.json" + status: + "succeed" + # for JSON format + - config_path: "secrets/jsonl_config.json" + status: "succeed" + - config_path: "secrets/jsonl_newlines_config.json" status: "succeed" # for custom server - config_path: "integration_tests/config_minio.json" @@ -24,65 +30,92 @@ tests: - config_path: "secrets/config.json" # for Parquet format - config_path: "secrets/parquet_config.json" - # # for Avro format + # for Avro format - config_path: "secrets/avro_config.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + - config_path: "secrets/jsonl_newlines_config.json" # for custom server - config_path: "integration_tests/config_minio.json" basic_read: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" expect_records: - path: "integration_tests/expected_records.txt" + path: "integration_tests/expected_records/csv.txt" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" expect_records: - path: "integration_tests/parquet_expected_records.txt" + path: "integration_tests/expected_records/parquet.txt" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + expect_records: + path: "integration_tests/expected_records/avro.txt" + # for JSONL format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" expect_records: - path: "integration_tests/expected_records_avro.txt" + path: "integration_tests/expected_records/jsonl.txt" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + expect_records: + path: "integration_tests/expected_records/jsonl_newlines.txt" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" # expected records contains _ab_source_file_last_modified property which # is modified all the time s3 file changed and for custom server it is # file creating date and it always new. Uncomment this line when SAT # would have ability to ignore specific fields from expected records. # expect_records: - # path: "integration_tests/expected_records_custom_server.txt.txt" + # path: "integration_tests/expected_records/custom_server.txt" incremental: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" @@ -91,16 +124,23 @@ tests: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json new file mode 100644 index 000000000000..5f2a21abc237 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -0,0 +1,16 @@ +{ + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-bucket", + "aws_access_key_id": "123456", + "aws_secret_access_key": "123456key", + "path_prefix": "", + "endpoint": "http://10.0.3.185:9000" + }, + "format": { + "filetype": "csv" + }, + "path_pattern": "*.csv", + "schema": "{}" +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/avro.json similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json rename to airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/avro.json diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/csv.json similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json rename to airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/csv.json diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json new file mode 100644 index 000000000000..631648d6329c --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json @@ -0,0 +1,15 @@ +{ + "streams": [ + { + "stream": { + "name": "test", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["_ab_source_file_last_modified"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json new file mode 100644 index 000000000000..631648d6329c --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json @@ -0,0 +1,15 @@ +{ + "streams": [ + { + "stream": { + "name": "test", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["_ab_source_file_last_modified"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt similarity index 99% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt index d4836fa10530..697501058c33 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt @@ -7,4 +7,4 @@ {"stream": "test", "data": {"id": 6, "fullname_and_valid": {"fullname": "MRNMXFkXZo", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} {"stream": "test", "data": {"id": 7, "fullname_and_valid": {"fullname": "MXvEWMgnIr", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} {"stream": "test", "data": {"id": 8, "fullname_and_valid": {"fullname": "rqmFGqZqdF", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} -{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} \ No newline at end of file +{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt similarity index 97% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt index aaeffe0694a5..c0d6452ed123 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt @@ -1,4 +1,4 @@ -{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 162727468000} {"stream": "test", "data": {"id": 2, "name": "j4DyXTS7", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} {"stream": "test", "data": {"id": 3, "name": "v0w8fTME", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} {"stream": "test", "data": {"id": 4, "name": "1q6jD8Np", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/custom_server.txt similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/custom_server.txt diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt new file mode 100644 index 000000000000..fc86bc1a3d06 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt @@ -0,0 +1,2 @@ +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000} +{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt new file mode 100644 index 000000000000..d4d6e09f1663 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt @@ -0,0 +1,2 @@ +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000} +{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/parquet.txt similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/parquet.txt diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py index 4fd769e179d4..5ae058b46e93 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py @@ -29,6 +29,8 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream): + region = "eu-west-3" + @property def stream_class(self) -> type: return IncrementalFileStreamS3 @@ -47,12 +49,11 @@ def provider(self, bucket_name: str) -> Mapping: return {"storage": "S3", "bucket": bucket_name} def _s3_connect(self, credentials: Mapping) -> None: - region = "eu-west-3" self.s3_client = boto3.client( "s3", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"], - region_name=region, + region_name=self.region, ) self.s3_resource = boto3.resource( "s3", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"] @@ -60,8 +61,8 @@ def _s3_connect(self, credentials: Mapping) -> None: def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upload: List, private: bool = True) -> Iterator[str]: self._s3_connect(credentials) - region = "eu-west-3" - location = {"LocationConstraint": region} + + location = {"LocationConstraint": self.region} bucket_name = cloud_bucket_name print("\n") @@ -133,5 +134,5 @@ def test_big_file(self, minio_credentials: Dict[str, Any]) -> None: minio_credentials["path_pattern"] = "big_files/file.csv" minio_credentials["format"]["block_size"] = 5 * 1024**2 source = SourceS3() - catalog = source.read_catalog(HERE / "configured_catalog.json") + catalog = source.read_catalog(HERE / "configured_catalogs/csv.json") assert self.read_source(minio_credentials, catalog) == expected_count diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro new file mode 100644 index 000000000000..49472c0cda54 Binary files /dev/null and b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro differ diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/csvfile/simple_test.csv b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/csvfile/simple_test.csv new file mode 100644 index 000000000000..a9cb2f626f52 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/csvfile/simple_test.csv @@ -0,0 +1,9 @@ +id,name,valid +1,PVdhmjb1,False +2,j4DyXTS7,True +3,v0w8fTME,False +4,1q6jD8Np,False +5,77h4aiMP,True +6,Le35Wyic,True +7,xZhh1Kyl,False +8,M2t286iJ,False diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test.jsonl b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test.jsonl new file mode 100644 index 000000000000..8af29a877feb --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test.jsonl @@ -0,0 +1,2 @@ +{"id":1,"name":"PVdhmjb1","valid":false, "value": 1.2, "event_date": "2022-01-01T00:00:00Z"} +{"id":2,"name":"ABCDEF","valid":true, "value": 1, "event_date": "2023-01-01T00:00:00Z"} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test_newlines.jsonl b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test_newlines.jsonl new file mode 100644 index 000000000000..d4e5ba24a8cd --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test_newlines.jsonl @@ -0,0 +1,15 @@ +{ + "id":1, + "name":"PVdhmjb1", + "valid":false, + "value": 1.2, + "event_date": "2022-01-01T00:00:00Z" +} +{ + "id":2, + "name":"ABCDEF", + "valid":true, + "value": 1, + "event_date": + "2023-01-01T00:00:00Z" +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/parquetfile/sample_test.parquet b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/parquetfile/sample_test.parquet new file mode 100644 index 000000000000..4c54b2725f94 Binary files /dev/null and b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/parquetfile/sample_test.parquet differ diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl new file mode 100644 index 000000000000..9697815f76c8 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl @@ -0,0 +1,8 @@ +{"id":1,"name":"PVdhmjb1","valid":false, "value": 1.2} +{"id":2,"name":"j4DyXTS7","valid":true, "value": 1.3} +{"id":3,"name":"v0w8fTME","valid":false, "value": 1.4} +{"id":4,"name":"1q6jD8Np","valid":false, "value": 1.5} +{"id":5,"name":"77h4aiMP","valid":true, "value": 1.6} +{"id":6,"name":"Le35Wyic","valid":true, "value": 1.7} +{"id":7,"name":"xZhh1Kyl","valid":false, "value": 1.8} +{"id":8,"name":"M2t286iJ","valid":false, "value": 1.9} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index ea64990e246b..d3825b9a78cb 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -160,6 +160,47 @@ "type": "string" } } + }, + { + "title": "Jsonl", + "description": "This connector uses PyArrow for JSON Lines (jsonl) file parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "const": "jsonl", + "type": "string" + }, + "newlines_in_values": { + "title": "Allow newlines in values", + "description": "Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + "default": false, + "order": 0, + "type": "boolean" + }, + "unexpected_field_behavior": { + "title": "Unexpected field behavior", + "description": "How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details", + "default": "infer", + "examples": ["ignore", "infer", "error"], + "order": 1, + "allOf": [ + { + "title": "UnexpectedFieldBehaviorEnum", + "description": "An enumeration.", + "enum": ["ignore", "infer", "error"], + "type": "string" + } + ] + }, + "block_size": { + "title": "Block Size", + "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + "default": 10000, + "order": 2, + "type": "integer" + } + } } ] }, diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py index 67ed4f65b379..d88ba96ecb1b 100644 --- a/airbyte-integrations/connectors/source-s3/setup.py +++ b/airbyte-integrations/connectors/source-s3/setup.py @@ -7,7 +7,7 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk", - "pyarrow==4.0.1", + "pyarrow==8.0.0", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py new file mode 100644 index 000000000000..9337ed515090 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -0,0 +1,73 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union + +import pyarrow as pa +from pyarrow import json as pa_json + +from .abstract_file_parser import AbstractFileParser +from .jsonl_spec import JsonlFormat + + +class JsonlParser(AbstractFileParser): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.format_model = None + + @property + def is_binary(self) -> bool: + return True + + @property + def format(self) -> JsonlFormat: + if self.format_model is None: + self.format_model = JsonlFormat.parse_obj(self._format) + return self.format_model + + def _read_options(self) -> Mapping[str, str]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html + build ReadOptions object like: pa.json.ReadOptions(**self._read_options()) + """ + return {**{"block_size": self.format.block_size, "use_threads": True}} + + def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html + build ParseOptions object like: pa.json.ParseOptions(**self._parse_options()) + :param json_schema: if this is passed in, pyarrow will attempt to enforce this schema on read, defaults to None + """ + parse_options = { + "newlines_in_values": self.format.newlines_in_values, + "unexpected_field_behavior": self.format.unexpected_field_behavior, + } + if json_schema: + parse_options["explicit_schema"] = pa.schema(self.json_schema_to_pyarrow_schema(json_schema)) + + return parse_options + + def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table: + return pa_json.read_json( + file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) + ) + + def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html + Json reader support multi thread hence, donot need to add external process + https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html + """ + table = self._read_table(file) + schema_dict = {field.name: field.type for field in table.schema} + return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True) + + def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html + + """ + table = self._read_table(file, self._master_schema) + yield from table.to_pylist() diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py new file mode 100644 index 000000000000..6af676c9159e --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum + +from pydantic import BaseModel, Field + + +class UnexpectedFieldBehaviorEnum(str, Enum): + ignore = "ignore" + infer = "infer" + error = "error" + + +class JsonlFormat(BaseModel): + 'This connector uses PyArrow for JSON Lines (jsonl) file parsing.' + + class Config: + title = "Jsonl" + + filetype: str = Field( + "jsonl", + const=True, + ) + + newlines_in_values: bool = Field( + title="Allow newlines in values", + default=False, + description="Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + order=0, + ) + + unexpected_field_behavior: UnexpectedFieldBehaviorEnum = Field( + title="Unexpected field behavior", + default="infer", + description='How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details', + examples=["ignore", "infer", "error"], + order=1, + ) + + block_size: int = Field( + default=10000, + description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + order=2, + ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py index 9a9a852e41af..5845a91e3b30 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py @@ -12,6 +12,7 @@ from .formats.avro_spec import AvroFormat from .formats.csv_spec import CsvFormat +from .formats.jsonl_spec import JsonlFormat from .formats.parquet_spec import ParquetFormat # To implement your provider specific spec, inherit from SourceFilesAbstractSpec and add provider-specific settings e.g.: @@ -60,7 +61,7 @@ class SourceFilesAbstractSpec(BaseModel): order=10, ) - format: Union[CsvFormat, ParquetFormat, AvroFormat] = Field( + format: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat] = Field( default="csv", title="File Format", description="The format of the files you'd like to replicate", order=20 ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 7f5fb1575268..9f2937a9d620 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -20,6 +20,7 @@ from .formats.abstract_file_parser import AbstractFileParser from .formats.avro_parser import AvroParser from .formats.csv_parser import CsvParser +from .formats.jsonl_parser import JsonlParser from .formats.parquet_parser import ParquetParser from .storagefile import StorageFile @@ -40,6 +41,7 @@ def fileformatparser_map(self) -> Mapping[str, type]: "csv": CsvParser, "parquet": ParquetParser, "avro": AvroParser, + "jsonl": JsonlParser, } # TODO: make these user configurable in spec.json diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl new file mode 100644 index 000000000000..a614e0374589 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl @@ -0,0 +1,8 @@ +{"id": 1, "name": "PVdhmjb1", "valid": false, "code": 12, "degrees": -31.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224125"} +{"id": 2, "name": "j4DyXTS7", "valid": true, "code": -8, "degrees": 41.6, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224383"} +{"id": 3, "name": "v0w8fTME", "valid": false, "code": 7, "degrees": -27.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224527"} +{"id": 4, "name": "1q6jD8Np", "valid": false, "code": -8, "degrees": -6.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224741"} +{"id": 5, "name": "77h4aiMP", "valid": true, "code": -15, "degrees": -13.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224907"} +{"id": 6, "name": "Le35Wyic", "valid": true, "code": 3, "degrees": 35.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225033"} +{"id": 7, "name": "xZhh1Kyl", "valid": false, "code": 10, "degrees": -9.2, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225145"} +{"id": 8, "name": "M2t286iJ", "valid": false, "code": 4, "degrees": -3.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225320"} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl new file mode 100644 index 000000000000..0ec51938a6a6 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl @@ -0,0 +1,8 @@ +{"id": 1,"name": "PVdhmjb1", "valid": false} +{"id": 2,"name": "j4DyXTS7", "valid": true} +{"id": 3,"name": "變形金剛,偽裝的機器人", "valid": false} +{"id": 4,"name": "1q6jD8Np", "valid": false} +{"id": 5,"name": "77h4aiMP", "valid": true} +{"id": 6,"name": "變形金剛,偽裝的機器人", "valid": true} +{"id": 7,"name": "xZhh1Kyl", "valid": false} +{"id": 8,"name": "M2t286iJ", "valid": false} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl new file mode 100644 index 000000000000..5c0b8f106dc1 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl @@ -0,0 +1,2 @@ +{"id": 1, "notes": "البايت الجوي هو الأفضل", "valid": false} +{"id": 2, "notes": "البايت الجوي هو الأفضل", "valid": true} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz new file mode 100644 index 000000000000..05858a56e866 Binary files /dev/null and b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz differ diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_6_empty.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_6_empty.jsonl new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_7_schema.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_7_schema.jsonl new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_8_structures.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_8_structures.jsonl new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_9_timestamp.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_9_timestamp.jsonl new file mode 100644 index 000000000000..17477034496e --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_9_timestamp.jsonl @@ -0,0 +1,8 @@ +{"id": 1, "name": "PVdhmjb1", "valid": false, "code": 12, "degrees": -31.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224125", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.224125"} +{"id": 2, "name": "j4DyXTS7", "valid": true, "code": -8, "degrees": 41.6, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224383", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.224383"} +{"id": 3, "name": "v0w8fTME", "valid": false, "code": 7, "degrees": -27.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224527", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.224527"} +{"id": 4, "name": "1q6jD8Np", "valid": false, "code": -8, "degrees": -6.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224741", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.224741"} +{"id": 5, "name": "77h4aiMP", "valid": true, "code": -15, "degrees": -13.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224907", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.224907"} +{"id": 6, "name": "Le35Wyic", "valid": true, "code": 3, "degrees": 35.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225033", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.225033"} +{"id": 7, "name": "xZhh1Kyl", "valid": false, "code": 10, "degrees": -9.2, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225145", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.225145"} +{"id": 8, "name": "M2t286iJ", "valid": false, "code": 4, "degrees": -3.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225320", "date": "2021-07-14", "timestamp": "2021-07-14 15:30:09.225320"} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/test_jsonl_parser.py b/airbyte-integrations/connectors/source-s3/unit_tests/test_jsonl_parser.py new file mode 100644 index 000000000000..6ab682893abc --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/test_jsonl_parser.py @@ -0,0 +1,163 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import os +from pathlib import Path +from typing import Any, Mapping + +from source_s3.source_files_abstract.formats.jsonl_parser import JsonlParser + +from .abstract_test_parser import AbstractTestParser + +SAMPLE_DIRECTORY = Path(__file__).resolve().parent.joinpath("sample_files/") + + +class TestJsonlParser(AbstractTestParser): + @classmethod + def cases(cls) -> Mapping[str, Any]: + return { + "basic_normal_test": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "master_schema_test": { + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "encoding_Big5": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_2_enc_Big5.jsonl"), + "num_records": 8, + "inferred_schema": {"id": "integer", "name": "string", "valid": "boolean"}, + "line_checks": {}, + "fails": [], + }, + "encoding_Arabic_(Windows 1256)": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_3_enc_Arabic.jsonl"), + "num_records": 2, + "inferred_schema": {"id": "integer", "notes": "string", "valid": "boolean"}, + "line_checks": {}, + "fails": [], + }, + "compression_gz": { + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_4.jsonl.gz"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": { + 7: { + "id": 7, + "name": "xZhh1Kyl", + "valid": False, + "code": 10, + "degrees": -9.2, + "birthday": "2021-07-14", + "last_seen": "2021-07-14 15:30:09.225145", + } + }, + "fails": [], + }, + "extra_columns_in_master_schema": { + # tests extra columns in master schema + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "EXTRA_COLUMN_1": "boolean", + "EXTRA_COLUMN_2": "number", + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "missing_columns_in_master_schema": { + # tests missing columns in master schema + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}, master_schema={"id": "integer", "name": "string"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + } diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 5e3d07e219ab..3eeaddfde2dd 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -191,10 +191,15 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Currently, no additional options are supported. +### Jsonl + +The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html) + ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | | 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | | 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | | 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |