From 3d499557b7307686eacd829282d91d06be467b53 Mon Sep 17 00:00:00 2001 From: sivankumar86 Date: Tue, 2 Aug 2022 00:48:23 +1000 Subject: [PATCH] source-S3: Support JSON format (#14213) * json format support added * json format support added * code formatted * format convertion changed * format naming convertion changed * test cased issue fixed * test case issued resolved * sample file and config added for integration tests * Json doc added Json doc added * update * sample file and config added for integration tests * sample file and config added for integration tests * update jsonl files * review 1 * review 1 * review 1 * pyarrow version upgrade * clean integration test folder architecture * add timestamp record to simple_test.jsonl * fixed integration test and parser review change * simplify table read * doc update * fix specs * user sample files * fix sample files * add newlines at end of files * rename json parser * rename jsonfile to jsonlfile * schema inference added * patch review fix * Update docs/integrations/sources/s3.md doc update Co-authored-by: George Claireaux * changing the version * changing the title to sync with other type * fix expected csv records * fix expected records for avro and parquet * review fix * fixed master schema handling * remove sample configs * fix expected records * json doc update added more details on json parser * fixed api name * bump version * auto-bump connector version [ci skip] Co-authored-by: alafanechere Co-authored-by: George Claireaux Co-authored-by: George Claireaux Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 47 ++++- .../connectors/source-s3/Dockerfile | 2 +- .../source-s3/acceptance-test-config.yml | 74 ++++++-- .../integration_tests/config_minio.json | 16 ++ .../avro.json} | 0 .../csv.json} | 0 .../configured_catalogs/jsonl.json | 15 ++ .../configured_catalogs/parquet.json | 15 ++ .../avro.txt} | 2 +- .../csv.txt} | 2 +- .../custom_server.txt} | 0 .../expected_records/jsonl.txt | 2 + .../expected_records/jsonl_newlines.txt | 2 + .../parquet.txt} | 0 .../integration_tests/integration_test.py | 11 +- .../sample_files/avrofile/test_sample.avro | Bin 0 -> 256 bytes .../sample_files/csvfile/simple_test.csv | 9 + .../sample_files/jsonlfile/simple_test.jsonl | 2 + .../jsonlfile/simple_test_newlines.jsonl | 15 ++ .../parquetfile/sample_test.parquet | Bin 0 -> 1238 bytes .../sample_files/simple_test.jsonl | 8 + .../source-s3/integration_tests/spec.json | 41 +++++ .../connectors/source-s3/setup.py | 2 +- .../formats/jsonl_parser.py | 73 ++++++++ .../formats/jsonl_spec.py | 46 +++++ .../source_s3/source_files_abstract/spec.py | 3 +- .../source_s3/source_files_abstract/stream.py | 2 + .../sample_files/jsonl/test_file_1.jsonl | 8 + .../jsonl/test_file_2_enc_Big5.jsonl | 8 + .../jsonl/test_file_3_enc_Arabic.jsonl | 2 + .../sample_files/jsonl/test_file_4.jsonl.gz | Bin 0 -> 328 bytes .../jsonl/test_file_6_empty.jsonl | 0 .../jsonl/test_file_7_schema.jsonl | 0 .../jsonl/test_file_8_structures.jsonl | 0 .../jsonl/test_file_9_timestamp.jsonl | 8 + .../source-s3/unit_tests/test_jsonl_parser.py | 163 ++++++++++++++++++ docs/integrations/sources/s3.md | 5 + 38 files changed, 556 insertions(+), 29 deletions(-) create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json rename airbyte-integrations/connectors/source-s3/integration_tests/{configured_catalog.json => configured_catalogs/avro.json} (100%) rename airbyte-integrations/connectors/source-s3/integration_tests/{parquet_configured_catalog.json => configured_catalogs/csv.json} (100%) create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json rename airbyte-integrations/connectors/source-s3/integration_tests/{expected_records_avro.txt => expected_records/avro.txt} (99%) rename airbyte-integrations/connectors/source-s3/integration_tests/{expected_records.txt => expected_records/csv.txt} (97%) rename airbyte-integrations/connectors/source-s3/integration_tests/{expected_records_custom_server.txt => expected_records/custom_server.txt} (100%) create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt rename airbyte-integrations/connectors/source-s3/integration_tests/{parquet_expected_records.txt => expected_records/parquet.txt} (100%) create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/csvfile/simple_test.csv create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/jsonlfile/simple_test_newlines.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/parquetfile/sample_test.parquet create mode 100644 airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py create mode 100644 airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_6_empty.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_7_schema.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_8_structures.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_9_timestamp.jsonl create mode 100644 airbyte-integrations/connectors/source-s3/unit_tests/test_jsonl_parser.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 1e490c431214..92539c2da9e6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -833,7 +833,7 @@ - name: S3 sourceDefinitionId: 69589781-7828-43c5-9f63-8925b1c1ccc2 dockerRepository: airbyte/source-s3 - dockerImageTag: 0.1.17 + dockerImageTag: 0.1.18 documentationUrl: https://docs.airbyte.io/integrations/sources/s3 icon: s3.svg sourceType: file diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index e5ca41da048b..49cfcb0b65c4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7892,7 +7892,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-s3:0.1.17" +- dockerImage: "airbyte/source-s3:0.1.18" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/s3" changelogUrl: "https://docs.airbyte.io/integrations/sources/s3" @@ -8071,6 +8071,51 @@ title: "Filetype" const: "avro" type: "string" + - title: "Jsonl" + description: "This connector uses PyArrow for JSON Lines (jsonl) file parsing." + type: "object" + properties: + filetype: + title: "Filetype" + const: "jsonl" + type: "string" + newlines_in_values: + title: "Allow newlines in values" + description: "Whether newline characters are allowed in JSON values.\ + \ Turning this on may affect performance. Leave blank to default\ + \ to False." + default: false + order: 0 + type: "boolean" + unexpected_field_behavior: + title: "Unexpected field behavior" + description: "How JSON fields outside of explicit_schema (if given)\ + \ are treated. Check PyArrow documentation for details" + default: "infer" + examples: + - "ignore" + - "infer" + - "error" + order: 1 + allOf: + - title: "UnexpectedFieldBehaviorEnum" + description: "An enumeration." + enum: + - "ignore" + - "infer" + - "error" + type: "string" + block_size: + title: "Block Size" + description: "The chunk size in bytes to process at a time in memory\ + \ from each file. If your data is particularly wide and failing\ + \ during schema detection, increasing this should solve it. Beware\ + \ of raising this too high as you could hit OOM errors." + default: 10000 + order: 2 + type: "integer" schema: title: "Manually enforced data schema (Optional)" description: "Optionally provide a schema to enforce, as a valid JSON string.\ diff --git a/airbyte-integrations/connectors/source-s3/Dockerfile b/airbyte-integrations/connectors/source-s3/Dockerfile index 0229bab4038c..2511e5a681c8 100644 --- a/airbyte-integrations/connectors/source-s3/Dockerfile +++ b/airbyte-integrations/connectors/source-s3/Dockerfile @@ -17,5 +17,5 @@ COPY source_s3 ./source_s3 ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.17 +LABEL io.airbyte.version=0.1.18 LABEL io.airbyte.name=airbyte/source-s3 diff --git a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml index bf55f54e0d9d..abbd23b5e02d 100644 --- a/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-s3/acceptance-test-config.yml @@ -13,6 +13,12 @@ tests: status: "succeed" # # for Avro format - config_path: "secrets/avro_config.json" + status: + "succeed" + # for JSON format + - config_path: "secrets/jsonl_config.json" + status: "succeed" + - config_path: "secrets/jsonl_newlines_config.json" status: "succeed" # for custom server - config_path: "integration_tests/config_minio.json" @@ -24,65 +30,92 @@ tests: - config_path: "secrets/config.json" # for Parquet format - config_path: "secrets/parquet_config.json" - # # for Avro format + # for Avro format - config_path: "secrets/avro_config.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + - config_path: "secrets/jsonl_newlines_config.json" # for custom server - config_path: "integration_tests/config_minio.json" basic_read: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" expect_records: - path: "integration_tests/expected_records.txt" + path: "integration_tests/expected_records/csv.txt" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" expect_records: - path: "integration_tests/parquet_expected_records.txt" + path: "integration_tests/expected_records/parquet.txt" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + expect_records: + path: "integration_tests/expected_records/avro.txt" + # for JSONL format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" expect_records: - path: "integration_tests/expected_records_avro.txt" + path: "integration_tests/expected_records/jsonl.txt" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + expect_records: + path: "integration_tests/expected_records/jsonl_newlines.txt" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" # expected records contains _ab_source_file_last_modified property which # is modified all the time s3 file changed and for custom server it is # file creating date and it always new. Uncomment this line when SAT # would have ability to ignore specific fields from expected records. # expect_records: - # path: "integration_tests/expected_records_custom_server.txt.txt" + # path: "integration_tests/expected_records/custom_server.txt" incremental: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + cursor_paths: + test: ["_ab_source_file_last_modified"] + future_state_path: "integration_tests/abnormal_state.json" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" cursor_paths: test: ["_ab_source_file_last_modified"] future_state_path: "integration_tests/abnormal_state.json" @@ -91,16 +124,23 @@ tests: # for CSV format - config_path: "secrets/config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" # for Parquet format - config_path: "secrets/parquet_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/parquet_configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/parquet.json" # for Avro format - config_path: "secrets/avro_config.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/avro.json" + # for JSON format + - config_path: "secrets/jsonl_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" + - config_path: "secrets/jsonl_newlines_config.json" + timeout_seconds: 1800 + configured_catalog_path: "integration_tests/configured_catalogs/jsonl.json" # for custom server - config_path: "integration_tests/config_minio.json" timeout_seconds: 1800 - configured_catalog_path: "integration_tests/configured_catalog.json" + configured_catalog_path: "integration_tests/configured_catalogs/csv.json" diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json new file mode 100644 index 000000000000..5f2a21abc237 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/config_minio.json @@ -0,0 +1,16 @@ +{ + "dataset": "test", + "provider": { + "storage": "S3", + "bucket": "test-bucket", + "aws_access_key_id": "123456", + "aws_secret_access_key": "123456key", + "path_prefix": "", + "endpoint": "http://10.0.3.185:9000" + }, + "format": { + "filetype": "csv" + }, + "path_pattern": "*.csv", + "schema": "{}" +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/avro.json similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/configured_catalog.json rename to airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/avro.json diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/csv.json similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/parquet_configured_catalog.json rename to airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/csv.json diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json new file mode 100644 index 000000000000..631648d6329c --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/jsonl.json @@ -0,0 +1,15 @@ +{ + "streams": [ + { + "stream": { + "name": "test", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["_ab_source_file_last_modified"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json new file mode 100644 index 000000000000..631648d6329c --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/configured_catalogs/parquet.json @@ -0,0 +1,15 @@ +{ + "streams": [ + { + "stream": { + "name": "test", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["_ab_source_file_last_modified"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt similarity index 99% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt index d4836fa10530..697501058c33 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_avro.txt +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/avro.txt @@ -7,4 +7,4 @@ {"stream": "test", "data": {"id": 6, "fullname_and_valid": {"fullname": "MRNMXFkXZo", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} {"stream": "test", "data": {"id": 7, "fullname_and_valid": {"fullname": "MXvEWMgnIr", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} {"stream": "test", "data": {"id": 8, "fullname_and_valid": {"fullname": "rqmFGqZqdF", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} -{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} \ No newline at end of file +{"stream": "test", "data": {"id": 9, "fullname_and_valid": {"fullname": "lmPpQTcPFM", "valid": true}, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-05-11T11:54:11+0000", "_ab_source_file_url": "test_sample.avro"}, "emitted_at": 10000000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt similarity index 97% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt index aaeffe0694a5..c0d6452ed123 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records.txt +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/csv.txt @@ -1,4 +1,4 @@ -{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 162727468000} {"stream": "test", "data": {"id": 2, "name": "j4DyXTS7", "valid": true, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} {"stream": "test", "data": {"id": 3, "name": "v0w8fTME", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} {"stream": "test", "data": {"id": 4, "name": "1q6jD8Np", "valid": false, "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2021-07-25T15:33:04+0000", "_ab_source_file_url": "simple_test.csv"}, "emitted_at": 1627227468000} diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/custom_server.txt similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/expected_records_custom_server.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/custom_server.txt diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt new file mode 100644 index 000000000000..fc86bc1a3d06 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl.txt @@ -0,0 +1,2 @@ +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000} +{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T08:31:02+0000", "_ab_source_file_url": "simple_test.jsonl"}, "emitted_at": 162727468000} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt new file mode 100644 index 000000000000..d4d6e09f1663 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/jsonl_newlines.txt @@ -0,0 +1,2 @@ +{"stream": "test", "data": {"id": 1, "name": "PVdhmjb1", "valid": false,"value": 1.2, "event_date": "2022-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000} +{"stream": "test", "data": {"id": 2, "name": "ABCDEF", "valid": true,"value": 1.0, "event_date": "2023-01-01T00:00:00Z", "_ab_additional_properties": {}, "_ab_source_file_last_modified": "2022-07-15T10:07:00+0000", "_ab_source_file_url": "simple_test_newlines.jsonl"}, "emitted_at": 162727468000} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt b/airbyte-integrations/connectors/source-s3/integration_tests/expected_records/parquet.txt similarity index 100% rename from airbyte-integrations/connectors/source-s3/integration_tests/parquet_expected_records.txt rename to airbyte-integrations/connectors/source-s3/integration_tests/expected_records/parquet.txt diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py index 4fd769e179d4..5ae058b46e93 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-s3/integration_tests/integration_test.py @@ -29,6 +29,8 @@ class TestIncrementalFileStreamS3(AbstractTestIncrementalFileStream): + region = "eu-west-3" + @property def stream_class(self) -> type: return IncrementalFileStreamS3 @@ -47,12 +49,11 @@ def provider(self, bucket_name: str) -> Mapping: return {"storage": "S3", "bucket": bucket_name} def _s3_connect(self, credentials: Mapping) -> None: - region = "eu-west-3" self.s3_client = boto3.client( "s3", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"], - region_name=region, + region_name=self.region, ) self.s3_resource = boto3.resource( "s3", aws_access_key_id=credentials["aws_access_key_id"], aws_secret_access_key=credentials["aws_secret_access_key"] @@ -60,8 +61,8 @@ def _s3_connect(self, credentials: Mapping) -> None: def cloud_files(self, cloud_bucket_name: str, credentials: Mapping, files_to_upload: List, private: bool = True) -> Iterator[str]: self._s3_connect(credentials) - region = "eu-west-3" - location = {"LocationConstraint": region} + + location = {"LocationConstraint": self.region} bucket_name = cloud_bucket_name print("\n") @@ -133,5 +134,5 @@ def test_big_file(self, minio_credentials: Dict[str, Any]) -> None: minio_credentials["path_pattern"] = "big_files/file.csv" minio_credentials["format"]["block_size"] = 5 * 1024**2 source = SourceS3() - catalog = source.read_catalog(HERE / "configured_catalog.json") + catalog = source.read_catalog(HERE / "configured_catalogs/csv.json") assert self.read_source(minio_credentials, catalog) == expected_count diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/avrofile/test_sample.avro new file mode 100644 index 0000000000000000000000000000000000000000..49472c0cda54eb7eb13f35e918079b10a690a053 GIT binary patch literal 256 zcmeZI%3@>@Nh~YM*GtY%NloU+E6vFf1M`cMGg5OCPcT(0l~fj_Dp@Hg6{RNU7o{la zC@AG6=7L4Q3R5c*a|?1(^+JnNi$IcTnW;G`#Xt?w2(6iUCCK_RQBuU;HlV0U%FoY9P0T}9RF;?nGAXu}Vfv*A<<|>-ozeJufgc5FKwE#}z4xD%P@*C3>mVLtE;QFHn%!G;JwOAPFsL5eM8P#0g28CM6*k z#1U~qocIAfa6tS3PMjjdPeL4+v16p7asa{F-Wl(kc{}sga!FKdF7uZX-z)GKm*)U( z$r8*+z-A&cgl1Kc@d@Lj%&OT4_(Ppq4qW4xmnf4fTrqVuKy+1_#%Qp3&MJZz78%Yl zDHM7&`DVRkCYOz*42q$%R-H2~rhR1EYi7N$)IT9MmarI^Gr;1=0GZDc|0D$+K7K@1 zmLAmIZu`I{zn#dB_jc~5rKmYv9c7;E6xY$Uk5lb@=57z&blOcgtzrq?LgQMpI&RgZ zXyN%o*R{9D9ik}4PvV(W>&~x|q_F)UPFDj;OgP*A8^z?&Y5Ve_(`|SHd|}#zS*Ky^ zYsmZ!(K+n`zsTq4P{2PFNzx_2WAH5Q9MOH@h64=lbBlpE2y7vxc`4Tze6oJ)wyfub$f! z^s}^S{$|oWsP1|Q@~uztvdERGzD*%&B2TCfdU2w>buJ&ah#8%yIAcE}%EvAJrimdC z1^t_#U+Fph<7bVNt@S8OATpP7HdJN!6H_(%HzB1_ZP literal 0 HcmV?d00001 diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl new file mode 100644 index 000000000000..9697815f76c8 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/integration_tests/sample_files/simple_test.jsonl @@ -0,0 +1,8 @@ +{"id":1,"name":"PVdhmjb1","valid":false, "value": 1.2} +{"id":2,"name":"j4DyXTS7","valid":true, "value": 1.3} +{"id":3,"name":"v0w8fTME","valid":false, "value": 1.4} +{"id":4,"name":"1q6jD8Np","valid":false, "value": 1.5} +{"id":5,"name":"77h4aiMP","valid":true, "value": 1.6} +{"id":6,"name":"Le35Wyic","valid":true, "value": 1.7} +{"id":7,"name":"xZhh1Kyl","valid":false, "value": 1.8} +{"id":8,"name":"M2t286iJ","valid":false, "value": 1.9} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json index ea64990e246b..d3825b9a78cb 100644 --- a/airbyte-integrations/connectors/source-s3/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-s3/integration_tests/spec.json @@ -160,6 +160,47 @@ "type": "string" } } + }, + { + "title": "Jsonl", + "description": "This connector uses PyArrow for JSON Lines (jsonl) file parsing.", + "type": "object", + "properties": { + "filetype": { + "title": "Filetype", + "const": "jsonl", + "type": "string" + }, + "newlines_in_values": { + "title": "Allow newlines in values", + "description": "Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + "default": false, + "order": 0, + "type": "boolean" + }, + "unexpected_field_behavior": { + "title": "Unexpected field behavior", + "description": "How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details", + "default": "infer", + "examples": ["ignore", "infer", "error"], + "order": 1, + "allOf": [ + { + "title": "UnexpectedFieldBehaviorEnum", + "description": "An enumeration.", + "enum": ["ignore", "infer", "error"], + "type": "string" + } + ] + }, + "block_size": { + "title": "Block Size", + "description": "The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + "default": 10000, + "order": 2, + "type": "integer" + } + } } ] }, diff --git a/airbyte-integrations/connectors/source-s3/setup.py b/airbyte-integrations/connectors/source-s3/setup.py index 67ed4f65b379..d88ba96ecb1b 100644 --- a/airbyte-integrations/connectors/source-s3/setup.py +++ b/airbyte-integrations/connectors/source-s3/setup.py @@ -7,7 +7,7 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk", - "pyarrow==4.0.1", + "pyarrow==8.0.0", "smart-open[s3]==5.1.0", "wcmatch==8.2", "dill==0.3.4", diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py new file mode 100644 index 000000000000..9337ed515090 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_parser.py @@ -0,0 +1,73 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +from typing import Any, BinaryIO, Iterator, Mapping, TextIO, Union + +import pyarrow as pa +from pyarrow import json as pa_json + +from .abstract_file_parser import AbstractFileParser +from .jsonl_spec import JsonlFormat + + +class JsonlParser(AbstractFileParser): + def __init__(self, *args: Any, **kwargs: Any) -> None: + super().__init__(*args, **kwargs) + self.format_model = None + + @property + def is_binary(self) -> bool: + return True + + @property + def format(self) -> JsonlFormat: + if self.format_model is None: + self.format_model = JsonlFormat.parse_obj(self._format) + return self.format_model + + def _read_options(self) -> Mapping[str, str]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html + build ReadOptions object like: pa.json.ReadOptions(**self._read_options()) + """ + return {**{"block_size": self.format.block_size, "use_threads": True}} + + def _parse_options(self, json_schema: Mapping[str, Any] = None) -> Mapping[str, str]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.ParseOptions.html + build ParseOptions object like: pa.json.ParseOptions(**self._parse_options()) + :param json_schema: if this is passed in, pyarrow will attempt to enforce this schema on read, defaults to None + """ + parse_options = { + "newlines_in_values": self.format.newlines_in_values, + "unexpected_field_behavior": self.format.unexpected_field_behavior, + } + if json_schema: + parse_options["explicit_schema"] = pa.schema(self.json_schema_to_pyarrow_schema(json_schema)) + + return parse_options + + def _read_table(self, file: Union[TextIO, BinaryIO], json_schema: Mapping[str, Any] = None) -> pa.Table: + return pa_json.read_json( + file, pa.json.ReadOptions(**self._read_options()), pa.json.ParseOptions(**self._parse_options(json_schema)) + ) + + def get_inferred_schema(self, file: Union[TextIO, BinaryIO]) -> Mapping[str, Any]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html + Json reader support multi thread hence, donot need to add external process + https://arrow.apache.org/docs/python/generated/pyarrow.json.ReadOptions.html + """ + table = self._read_table(file) + schema_dict = {field.name: field.type for field in table.schema} + return self.json_schema_to_pyarrow_schema(schema_dict, reverse=True) + + def stream_records(self, file: Union[TextIO, BinaryIO]) -> Iterator[Mapping[str, Any]]: + """ + https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html + + """ + table = self._read_table(file, self._master_schema) + yield from table.to_pylist() diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py new file mode 100644 index 000000000000..6af676c9159e --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/formats/jsonl_spec.py @@ -0,0 +1,46 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +from enum import Enum + +from pydantic import BaseModel, Field + + +class UnexpectedFieldBehaviorEnum(str, Enum): + ignore = "ignore" + infer = "infer" + error = "error" + + +class JsonlFormat(BaseModel): + 'This connector uses PyArrow for JSON Lines (jsonl) file parsing.' + + class Config: + title = "Jsonl" + + filetype: str = Field( + "jsonl", + const=True, + ) + + newlines_in_values: bool = Field( + title="Allow newlines in values", + default=False, + description="Whether newline characters are allowed in JSON values. Turning this on may affect performance. Leave blank to default to False.", + order=0, + ) + + unexpected_field_behavior: UnexpectedFieldBehaviorEnum = Field( + title="Unexpected field behavior", + default="infer", + description='How JSON fields outside of explicit_schema (if given) are treated. Check PyArrow documentation for details', + examples=["ignore", "infer", "error"], + order=1, + ) + + block_size: int = Field( + default=10000, + description="The chunk size in bytes to process at a time in memory from each file. If your data is particularly wide and failing during schema detection, increasing this should solve it. Beware of raising this too high as you could hit OOM errors.", + order=2, + ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py index 9a9a852e41af..5845a91e3b30 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/spec.py @@ -12,6 +12,7 @@ from .formats.avro_spec import AvroFormat from .formats.csv_spec import CsvFormat +from .formats.jsonl_spec import JsonlFormat from .formats.parquet_spec import ParquetFormat # To implement your provider specific spec, inherit from SourceFilesAbstractSpec and add provider-specific settings e.g.: @@ -60,7 +61,7 @@ class SourceFilesAbstractSpec(BaseModel): order=10, ) - format: Union[CsvFormat, ParquetFormat, AvroFormat] = Field( + format: Union[CsvFormat, ParquetFormat, AvroFormat, JsonlFormat] = Field( default="csv", title="File Format", description="The format of the files you'd like to replicate", order=20 ) diff --git a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py index 7f5fb1575268..9f2937a9d620 100644 --- a/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py +++ b/airbyte-integrations/connectors/source-s3/source_s3/source_files_abstract/stream.py @@ -20,6 +20,7 @@ from .formats.abstract_file_parser import AbstractFileParser from .formats.avro_parser import AvroParser from .formats.csv_parser import CsvParser +from .formats.jsonl_parser import JsonlParser from .formats.parquet_parser import ParquetParser from .storagefile import StorageFile @@ -40,6 +41,7 @@ def fileformatparser_map(self) -> Mapping[str, type]: "csv": CsvParser, "parquet": ParquetParser, "avro": AvroParser, + "jsonl": JsonlParser, } # TODO: make these user configurable in spec.json diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl new file mode 100644 index 000000000000..a614e0374589 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_1.jsonl @@ -0,0 +1,8 @@ +{"id": 1, "name": "PVdhmjb1", "valid": false, "code": 12, "degrees": -31.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224125"} +{"id": 2, "name": "j4DyXTS7", "valid": true, "code": -8, "degrees": 41.6, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224383"} +{"id": 3, "name": "v0w8fTME", "valid": false, "code": 7, "degrees": -27.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224527"} +{"id": 4, "name": "1q6jD8Np", "valid": false, "code": -8, "degrees": -6.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224741"} +{"id": 5, "name": "77h4aiMP", "valid": true, "code": -15, "degrees": -13.7, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.224907"} +{"id": 6, "name": "Le35Wyic", "valid": true, "code": 3, "degrees": 35.3, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225033"} +{"id": 7, "name": "xZhh1Kyl", "valid": false, "code": 10, "degrees": -9.2, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225145"} +{"id": 8, "name": "M2t286iJ", "valid": false, "code": 4, "degrees": -3.5, "birthday": "2021-07-14", "last_seen": "2021-07-14 15:30:09.225320"} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl new file mode 100644 index 000000000000..0ec51938a6a6 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_2_enc_Big5.jsonl @@ -0,0 +1,8 @@ +{"id": 1,"name": "PVdhmjb1", "valid": false} +{"id": 2,"name": "j4DyXTS7", "valid": true} +{"id": 3,"name": "變形金剛,偽裝的機器人", "valid": false} +{"id": 4,"name": "1q6jD8Np", "valid": false} +{"id": 5,"name": "77h4aiMP", "valid": true} +{"id": 6,"name": "變形金剛,偽裝的機器人", "valid": true} +{"id": 7,"name": "xZhh1Kyl", "valid": false} +{"id": 8,"name": "M2t286iJ", "valid": false} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl new file mode 100644 index 000000000000..5c0b8f106dc1 --- /dev/null +++ b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_3_enc_Arabic.jsonl @@ -0,0 +1,2 @@ +{"id": 1, "notes": "البايت الجوي هو الأفضل", "valid": false} +{"id": 2, "notes": "البايت الجوي هو الأفضل", "valid": true} diff --git a/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz b/airbyte-integrations/connectors/source-s3/unit_tests/sample_files/jsonl/test_file_4.jsonl.gz new file mode 100644 index 0000000000000000000000000000000000000000..05858a56e8667117ead76ec3875acac0f74e2760 GIT binary patch literal 328 zcmV-O0k{4iiwFq5<(y#v19W9`bYEs^Y-L|DE^2dcZUC**Jx{|h5C-6#UlDmGk733I#sn})zRr%nAcjDdSo9C8TZKVg^Nu{R_B;ZTG?>FRA2}Rx4)jBB zSJ}lxKFW`WsYC{QBvYeDAP>By)k5FAPIyl8iPJ;{dz66DBaug`LYOjJPJ90*ABzI6`g&@-s2`6j80-Oorb{DF#yY0c a@^Zo>PNqG)n Mapping[str, Any]: + return { + "basic_normal_test": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "master_schema_test": { + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "encoding_Big5": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_2_enc_Big5.jsonl"), + "num_records": 8, + "inferred_schema": {"id": "integer", "name": "string", "valid": "boolean"}, + "line_checks": {}, + "fails": [], + }, + "encoding_Arabic_(Windows 1256)": { + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_3_enc_Arabic.jsonl"), + "num_records": 2, + "inferred_schema": {"id": "integer", "notes": "string", "valid": "boolean"}, + "line_checks": {}, + "fails": [], + }, + "compression_gz": { + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_4.jsonl.gz"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": { + 7: { + "id": 7, + "name": "xZhh1Kyl", + "valid": False, + "code": 10, + "degrees": -9.2, + "birthday": "2021-07-14", + "last_seen": "2021-07-14 15:30:09.225145", + } + }, + "fails": [], + }, + "extra_columns_in_master_schema": { + # tests extra columns in master schema + "AbstractFileParser": JsonlParser( + format={"filetype": "jsonl"}, + master_schema={ + "EXTRA_COLUMN_1": "boolean", + "EXTRA_COLUMN_2": "number", + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + ), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + "missing_columns_in_master_schema": { + # tests missing columns in master schema + "AbstractFileParser": JsonlParser(format={"filetype": "jsonl"}, master_schema={"id": "integer", "name": "string"}), + "filepath": os.path.join(SAMPLE_DIRECTORY, "jsonl/test_file_1.jsonl"), + "num_records": 8, + "inferred_schema": { + "id": "integer", + "name": "string", + "valid": "boolean", + "code": "integer", + "degrees": "number", + "birthday": "string", + "last_seen": "string", + }, + "line_checks": {}, + "fails": [], + }, + } diff --git a/docs/integrations/sources/s3.md b/docs/integrations/sources/s3.md index 5e3d07e219ab..3eeaddfde2dd 100644 --- a/docs/integrations/sources/s3.md +++ b/docs/integrations/sources/s3.md @@ -191,10 +191,15 @@ You can find details on [here](https://arrow.apache.org/docs/python/generated/py The avro parser uses [fastavro](https://fastavro.readthedocs.io/en/latest/). Currently, no additional options are supported. +### Jsonl + +The Jsonl parser uses pyarrow hence,only the line-delimited JSON format is supported.For more detailed info, please refer to the [docs] (https://arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html) + ## Changelog | Version | Date | Pull Request | Subject | |:--------|:-----------|:----------------------------------------------------------------------------------------------------------------|:----------------------------------------------------------------------------------------| +| 0.1.18 | 2022-08-01 | [14213](https://github.com/airbytehq/airbyte/pull/14213) | Add support for jsonl format files. | | 0.1.17 | 2022-07-21 | [14911](https://github.com/airbytehq/airbyte/pull/14911) | "decimal" type added for parquet | | 0.1.16 | 2022-07-13 | [14669](https://github.com/airbytehq/airbyte/pull/14669) | Fixed bug when extra columns apeared to be non-present in master schema | | 0.1.15 | 2022-05-31 | [12568](https://github.com/airbytehq/airbyte/pull/12568) | Fixed possible case of files being missed during incremental syncs |