From 34add9a82a4cd710187bb83a87b8ef57af2ac82c Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 30 Aug 2021 13:39:14 +0300 Subject: [PATCH 01/19] fixed s3 destination field naming for Parquet and Avro formats --- .../4816b78f-1489-44c1-9060-4b19d5fa9362.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../connectors/destination-s3/Dockerfile | 2 +- .../destination/s3/S3NameTransformer.java | 43 +++++++++++++++++++ .../s3/avro/JsonToAvroSchemaConverter.java | 4 +- docs/integrations/destinations/s3.md | 1 + 6 files changed, 49 insertions(+), 5 deletions(-) create mode 100644 airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json index 89faf584a79b..9be2b8a0c312 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/4816b78f-1489-44c1-9060-4b19d5fa9362.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "4816b78f-1489-44c1-9060-4b19d5fa9362", "name": "S3", "dockerRepository": "airbyte/destination-s3", - "dockerImageTag": "0.1.10", + "dockerImageTag": "0.1.11", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/s3" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 7bb143ad6db7..567ac79b54b7 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -42,7 +42,7 @@ - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.1.10 + dockerImageTag: 0.1.11 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 7ff24c157ff0..85327fbe2b80 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.10 +LABEL io.airbyte.version=0.1.11 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java new file mode 100644 index 000000000000..8549597dfc17 --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java @@ -0,0 +1,43 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.destination.s3; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.integrations.destination.ExtendedNameTransformer; + +@VisibleForTesting +public class S3NameTransformer extends ExtendedNameTransformer { + + @Override + protected String applyDefaultCase(String input) { + return super.convertStreamName(input).toLowerCase(); + } + + @Override + public String getIdentifier(String name) { + return "_" + convertStreamName(name); + } + +} diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index d4468c62effe..97954729a26d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -29,7 +29,7 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.base.JavaBaseConstants; -import io.airbyte.integrations.destination.StandardNameTransformer; +import io.airbyte.integrations.destination.s3.S3NameTransformer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -62,7 +62,7 @@ public class JsonToAvroSchemaConverter { private static final Logger LOGGER = LoggerFactory.getLogger(JsonToAvroSchemaConverter.class); private static final Schema TIMESTAMP_MILLIS_SCHEMA = LogicalTypes.timestampMillis() .addToSchema(Schema.create(Type.LONG)); - private static final StandardNameTransformer NAME_TRANSFORMER = new StandardNameTransformer(); + private static final S3NameTransformer NAME_TRANSFORMER = new S3NameTransformer(); private final Map standardizedNames = new HashMap<>(); diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 0db07a2e9877..fc3fb34e4501 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -374,6 +374,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.11 | 2021-08-30 | [#5149](https://github.com/airbytehq/airbyte/pull/5149) | Updated the field names to start with `_` for formats `Parquet` and `Avro`. | | 0.1.10 | 2021-09-13 | [#4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator | | 0.1.9 | 2021-07-12 | [#4666](https://github.com/airbytehq/airbyte/pull/4666) | Fix MinIO output for Parquet format. | | 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. | From a8fbb9dee78a8e358fda5317388dfdeb2db7df5b Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 30 Aug 2021 13:44:36 +0300 Subject: [PATCH 02/19] pull request number update --- docs/integrations/destinations/s3.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index fc3fb34e4501..7cd03f5ab8c5 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -374,7 +374,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.11 | 2021-08-30 | [#5149](https://github.com/airbytehq/airbyte/pull/5149) | Updated the field names to start with `_` for formats `Parquet` and `Avro`. | +| 0.1.11 | 2021-08-30 | [#5729](https://github.com/airbytehq/airbyte/pull/5729) | Updated the field names to start with `_` for formats `Parquet` and `Avro`. | | 0.1.10 | 2021-09-13 | [#4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator | | 0.1.9 | 2021-07-12 | [#4666](https://github.com/airbytehq/airbyte/pull/4666) | Fix MinIO output for Parquet format. | | 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. | From 266928ee316dab488925cc36aa94492ec1b0207a Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 3 Sep 2021 11:30:39 +0300 Subject: [PATCH 03/19] updated resources for parquet --- .../get_avro_schema.json | 76 ++++++++++++------- .../json_schema_converter/get_field_type.json | 9 ++- 2 files changed, 56 insertions(+), 29 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 409a7ce58e28..133660444d94 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -13,12 +13,14 @@ }, "avroSchema": { "type": "record", - "name": "simple_schema", + "name": "_simple_schema", "namespace": "namespace1", + "doc":"_airbyte_original_name:simple_schema", "fields": [ { - "name": "node_id", + "name": "_node_id", "type": ["null", "string"], + "doc":"_airbyte_original_name:node_id", "default": null } ] @@ -49,36 +51,42 @@ }, "avroSchema": { "type": "record", - "name": "nested_record", + "name": "_nested_record", "namespace": "namespace2", + "doc":"_airbyte_original_name:nested_record", "fields": [ { - "name": "node_id", + "name": "_node_id", "type": ["null", "string"], + "doc":"_airbyte_original_name:node_id", "default": null }, { - "name": "user", + "name": "_user", "type": [ "null", { "type": "record", - "name": "user", + "name": "_user", "namespace": "", + "doc":"_airbyte_original_name:user", "fields": [ { - "name": "first_name", + "name": "_first_name", "type": ["null", "string"], + "doc":"_airbyte_original_name:first_name", "default": null }, { - "name": "last_name", + "name": "_last_name", "type": ["null", "string"], + "doc":"_airbyte_original_name:last_name", "default": null } ] } ], + "doc":"_airbyte_original_name:user", "default": null } ] @@ -98,8 +106,9 @@ }, "avroSchema": { "type": "record", - "name": "record_with_airbyte_fields", + "name": "_record_with_airbyte_fields", "namespace": "namespace3", + "doc":"_airbyte_original_name:record_with_airbyte_fields", "fields": [ { "name": "_airbyte_ab_id", @@ -116,8 +125,9 @@ } }, { - "name": "node_id", + "name": "_node_id", "type": ["null", "string"], + "doc":"_airbyte_original_name:node_id", "default": null } ] @@ -137,12 +147,12 @@ }, "avroSchema": { "type": "record", - "name": "name_with_special_characters", + "name": "_name_with_special_characters", "namespace": "namespace4", "doc": "_airbyte_original_name:name_with:spécial:characters", "fields": [ { - "name": "node_id", + "name": "_node_id", "doc": "_airbyte_original_name:node:id", "type": ["null", "string"], "default": null @@ -164,12 +174,14 @@ }, "avroSchema": { "type": "record", - "name": "record_with_union_type", + "name": "_record_with_union_type", "namespace": "namespace5", + "doc":"_airbyte_original_name:record_with_union_type", "fields": [ { - "name": "identifier", + "name": "_identifier", "type": ["null", "double", "string"], + "doc":"_airbyte_original_name:identifier", "default": null } ] @@ -192,11 +204,12 @@ }, "avroSchema": { "type": "record", - "name": "array_with_same_type", + "name": "_array_with_same_type", "namespace": "namespace6", + "doc":"_airbyte_original_name:array_with_same_type", "fields": [ { - "name": "identifier", + "name": "_identifier", "type": [ "null", { @@ -204,6 +217,7 @@ "items": ["null", "string"] } ], + "doc":"_airbyte_original_name:identifier", "default": null } ] @@ -237,11 +251,12 @@ }, "avroSchema": { "type": "record", - "name": "array_with_union_type", + "name": "_array_with_union_type", "namespace": "namespace7", + "doc":"_airbyte_original_name:array_with_union_type", "fields": [ { - "name": "identifiers", + "name": "_identifiers", "type": [ "null", { @@ -249,6 +264,7 @@ "items": ["null", "string", "int", "boolean"] } ], + "doc":"_airbyte_original_name:identifiers", "default": null } ] @@ -278,12 +294,14 @@ }, "avroSchema": { "type": "record", - "name": "field_with_combined_restriction", + "name": "_field_with_combined_restriction", "namespace": "namespace8", + "doc":"_airbyte_original_name:field_with_combined_restriction", "fields": [ { - "name": "created_at", + "name": "_created_at", "type": ["null", "string", "int"], + "doc":"_airbyte_original_name:created_at", "default": null } ] @@ -318,26 +336,30 @@ }, "avroSchema": { "type": "record", - "name": "record_with_combined_restriction_field", + "name": "_record_with_combined_restriction_field", "namespace": "namespace9", + "doc":"_airbyte_original_name:record_with_combined_restriction_field", "fields": [ { - "name": "user", + "name": "_user", "type": [ "null", { "type": "record", - "name": "user", + "name": "_user", "namespace": "", + "doc":"_airbyte_original_name:user", "fields": [ { - "name": "created_at", + "name": "_created_at", "type": ["null", "string", "int"], + "doc":"_airbyte_original_name:created_at", "default": null } ] } ], + "doc":"_airbyte_original_name:user", "default": null } ] @@ -364,11 +386,12 @@ }, "avroSchema": { "type": "record", - "name": "array_with_combined_restriction_field", + "name": "_array_with_combined_restriction_field", "namespace": "namespace10", + "doc":"_airbyte_original_name:array_with_combined_restriction_field", "fields": [ { - "name": "identifiers", + "name": "_identifiers", "type": [ "null", { @@ -376,6 +399,7 @@ "items": ["null", "int", "string", "boolean"] } ], + "doc":"_airbyte_original_name:identifiers", "default": null } ] diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index 6dd9a503e984..c19412f7c96d 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -88,16 +88,19 @@ "null", { "type": "record", - "name": "object_field", + "name": "_object_field", + "doc":"_airbyte_original_name:object_field", "fields": [ { - "name": "id", + "name": "_id", "type": ["null", "int"], + "doc":"_airbyte_original_name:id", "default": null }, { - "name": "node_id", + "name": "_node_id", "type": ["null", "string"], + "doc":"_airbyte_original_name:node_id", "default": null } ] From beb4030d6ea9f77b595e8f1e5209520d747731cd Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 8 Sep 2021 21:39:01 +0300 Subject: [PATCH 04/19] snowflake s3 destination COPY is writing records from different table in the same raw table fix --- .../424892c4-daac-4491-b35d-c6688ba547ba.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../connectors/destination-jdbc/Dockerfile | 2 +- .../jdbc/copy/s3/S3StreamCopier.java | 47 +++++++++++++++++-- .../destination-snowflake/Dockerfile | 2 +- .../snowflake/SnowflakeS3StreamCopier.java | 2 +- docs/integrations/destinations/snowflake.md | 1 + 7 files changed, 50 insertions(+), 8 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 6391160f1556..2edb43670955 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.13", + "dockerImageTag": "0.3.14", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 176461a5ce0f..6c9f2f0ef7b1 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -42,7 +42,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.13 + dockerImageTag: 0.3.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 diff --git a/airbyte-integrations/connectors/destination-jdbc/Dockerfile b/airbyte-integrations/connectors/destination-jdbc/Dockerfile index db6a5127037a..612b4d004600 100644 --- a/airbyte-integrations/connectors/destination-jdbc/Dockerfile +++ b/airbyte-integrations/connectors/destination-jdbc/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.3 +LABEL io.airbyte.version=0.3.4 LABEL io.airbyte.name=airbyte/destination-jdbc diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 13313d02ceef..4a3dee7c9f11 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -92,7 +92,7 @@ public S3StreamCopier(String stagingFolder, this.s3Client = client; this.s3Config = s3Config; - this.s3StagingFile = String.join("/", stagingFolder, schemaName, streamName); + this.s3StagingFile = prepareS3StagingFile(stagingFolder, streamName); LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not // have support for streaming multipart uploads; @@ -101,13 +101,50 @@ public S3StreamCopier(String stagingFolder, // Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's // configured part size. // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. + this.multipartUploadManager = + new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) + .numUploadThreads(DEFAULT_UPLOAD_THREADS) + .queueCapacity(DEFAULT_QUEUE_CAPACITY) + .partSize(s3Config.getPartSize()); + // We only need one output stream as we only have one input stream. This is reasonably performant. + // See the above comment. + this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); + + var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); + try { + this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public S3StreamCopier(String stagingFolder, + DestinationSyncMode destSyncMode, + String schema, + String streamName, + String s3FileName, + AmazonS3 client, + JdbcDatabase db, + S3Config s3Config, + ExtendedNameTransformer nameTransformer, + SqlOperations sqlOperations) { + this.destSyncMode = destSyncMode; + this.schemaName = schema; + this.streamName = streamName; + this.db = db; + this.nameTransformer = nameTransformer; + this.sqlOperations = sqlOperations; + this.tmpTableName = nameTransformer.getTmpTableName(streamName); + this.s3Client = client; + this.s3Config = s3Config; + + this.s3StagingFile = prepareS3StagingFile(stagingFolder, s3FileName); + LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); this.multipartUploadManager = new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) .numUploadThreads(DEFAULT_UPLOAD_THREADS) .queueCapacity(DEFAULT_QUEUE_CAPACITY) .partSize(s3Config.getPartSize()); - // We only need one output stream as we only have one input stream. This is reasonably performant. - // See the above comment. this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); @@ -118,6 +155,10 @@ public S3StreamCopier(String stagingFolder, } } + private String prepareS3StagingFile(String stagingFolder, String s3FileName) { + return String.join("/", stagingFolder, schemaName, s3FileName); + } + @Override public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { csvPrinter.printRecord(id, jsonDataString, emittedAt); diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index c5d2b2996002..d0fd15acba2d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.13 +LABEL io.airbyte.version=0.3.14 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index 2c060519e75d..a24819747ea0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -44,7 +44,7 @@ public SnowflakeS3StreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, streamName + ".csv", client, db, s3Config, nameTransformer, sqlOperations); } @Override diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 8d54bd44a029..d93b51b562ad 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -189,6 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.14 | 2021-09-08 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | | 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours | | 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | | 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer | From fd3cfe94fe117993959731bad0c5c39570780acc Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 8 Sep 2021 23:09:34 +0300 Subject: [PATCH 05/19] fixed s3 destination name transformer --- .../destination/s3/S3NameTransformer.java | 10 ++- .../get_avro_schema.json | 76 +++++++------------ .../json_schema_converter/get_field_type.json | 9 +-- 3 files changed, 38 insertions(+), 57 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java index 8549597dfc17..b14210670164 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java @@ -37,7 +37,15 @@ protected String applyDefaultCase(String input) { @Override public String getIdentifier(String name) { - return "_" + convertStreamName(name); + return checkFirsCharInStreamName(convertStreamName(name)); + } + + private String checkFirsCharInStreamName(String name) { + if (name.substring(0, 1).matches("\\d")) { + return "_" + name; + } else { + return name; + } } } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 133660444d94..409a7ce58e28 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -13,14 +13,12 @@ }, "avroSchema": { "type": "record", - "name": "_simple_schema", + "name": "simple_schema", "namespace": "namespace1", - "doc":"_airbyte_original_name:simple_schema", "fields": [ { - "name": "_node_id", + "name": "node_id", "type": ["null", "string"], - "doc":"_airbyte_original_name:node_id", "default": null } ] @@ -51,42 +49,36 @@ }, "avroSchema": { "type": "record", - "name": "_nested_record", + "name": "nested_record", "namespace": "namespace2", - "doc":"_airbyte_original_name:nested_record", "fields": [ { - "name": "_node_id", + "name": "node_id", "type": ["null", "string"], - "doc":"_airbyte_original_name:node_id", "default": null }, { - "name": "_user", + "name": "user", "type": [ "null", { "type": "record", - "name": "_user", + "name": "user", "namespace": "", - "doc":"_airbyte_original_name:user", "fields": [ { - "name": "_first_name", + "name": "first_name", "type": ["null", "string"], - "doc":"_airbyte_original_name:first_name", "default": null }, { - "name": "_last_name", + "name": "last_name", "type": ["null", "string"], - "doc":"_airbyte_original_name:last_name", "default": null } ] } ], - "doc":"_airbyte_original_name:user", "default": null } ] @@ -106,9 +98,8 @@ }, "avroSchema": { "type": "record", - "name": "_record_with_airbyte_fields", + "name": "record_with_airbyte_fields", "namespace": "namespace3", - "doc":"_airbyte_original_name:record_with_airbyte_fields", "fields": [ { "name": "_airbyte_ab_id", @@ -125,9 +116,8 @@ } }, { - "name": "_node_id", + "name": "node_id", "type": ["null", "string"], - "doc":"_airbyte_original_name:node_id", "default": null } ] @@ -147,12 +137,12 @@ }, "avroSchema": { "type": "record", - "name": "_name_with_special_characters", + "name": "name_with_special_characters", "namespace": "namespace4", "doc": "_airbyte_original_name:name_with:spécial:characters", "fields": [ { - "name": "_node_id", + "name": "node_id", "doc": "_airbyte_original_name:node:id", "type": ["null", "string"], "default": null @@ -174,14 +164,12 @@ }, "avroSchema": { "type": "record", - "name": "_record_with_union_type", + "name": "record_with_union_type", "namespace": "namespace5", - "doc":"_airbyte_original_name:record_with_union_type", "fields": [ { - "name": "_identifier", + "name": "identifier", "type": ["null", "double", "string"], - "doc":"_airbyte_original_name:identifier", "default": null } ] @@ -204,12 +192,11 @@ }, "avroSchema": { "type": "record", - "name": "_array_with_same_type", + "name": "array_with_same_type", "namespace": "namespace6", - "doc":"_airbyte_original_name:array_with_same_type", "fields": [ { - "name": "_identifier", + "name": "identifier", "type": [ "null", { @@ -217,7 +204,6 @@ "items": ["null", "string"] } ], - "doc":"_airbyte_original_name:identifier", "default": null } ] @@ -251,12 +237,11 @@ }, "avroSchema": { "type": "record", - "name": "_array_with_union_type", + "name": "array_with_union_type", "namespace": "namespace7", - "doc":"_airbyte_original_name:array_with_union_type", "fields": [ { - "name": "_identifiers", + "name": "identifiers", "type": [ "null", { @@ -264,7 +249,6 @@ "items": ["null", "string", "int", "boolean"] } ], - "doc":"_airbyte_original_name:identifiers", "default": null } ] @@ -294,14 +278,12 @@ }, "avroSchema": { "type": "record", - "name": "_field_with_combined_restriction", + "name": "field_with_combined_restriction", "namespace": "namespace8", - "doc":"_airbyte_original_name:field_with_combined_restriction", "fields": [ { - "name": "_created_at", + "name": "created_at", "type": ["null", "string", "int"], - "doc":"_airbyte_original_name:created_at", "default": null } ] @@ -336,30 +318,26 @@ }, "avroSchema": { "type": "record", - "name": "_record_with_combined_restriction_field", + "name": "record_with_combined_restriction_field", "namespace": "namespace9", - "doc":"_airbyte_original_name:record_with_combined_restriction_field", "fields": [ { - "name": "_user", + "name": "user", "type": [ "null", { "type": "record", - "name": "_user", + "name": "user", "namespace": "", - "doc":"_airbyte_original_name:user", "fields": [ { - "name": "_created_at", + "name": "created_at", "type": ["null", "string", "int"], - "doc":"_airbyte_original_name:created_at", "default": null } ] } ], - "doc":"_airbyte_original_name:user", "default": null } ] @@ -386,12 +364,11 @@ }, "avroSchema": { "type": "record", - "name": "_array_with_combined_restriction_field", + "name": "array_with_combined_restriction_field", "namespace": "namespace10", - "doc":"_airbyte_original_name:array_with_combined_restriction_field", "fields": [ { - "name": "_identifiers", + "name": "identifiers", "type": [ "null", { @@ -399,7 +376,6 @@ "items": ["null", "int", "string", "boolean"] } ], - "doc":"_airbyte_original_name:identifiers", "default": null } ] diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json index c19412f7c96d..6dd9a503e984 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_field_type.json @@ -88,19 +88,16 @@ "null", { "type": "record", - "name": "_object_field", - "doc":"_airbyte_original_name:object_field", + "name": "object_field", "fields": [ { - "name": "_id", + "name": "id", "type": ["null", "int"], - "doc":"_airbyte_original_name:id", "default": null }, { - "name": "_node_id", + "name": "node_id", "type": ["null", "string"], - "doc":"_airbyte_original_name:node_id", "default": null } ] From 9f437f4b467b65011376ffb93c21cfbdb3689dfa Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 09:29:13 +0300 Subject: [PATCH 06/19] updated s3 destination name transformer --- .../integrations/destination/s3/S3NameTransformer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java index b14210670164..522e68978f0f 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java @@ -41,10 +41,10 @@ public String getIdentifier(String name) { } private String checkFirsCharInStreamName(String name) { - if (name.substring(0, 1).matches("\\d")) { - return "_" + name; - } else { + if (name.substring(0, 1).matches("[A-Za-z_]")) { return name; + } else { + return "_" + name; } } From c173b898b31b4a26b1d9c6b10827cd1dc56768b7 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 11:48:57 +0300 Subject: [PATCH 07/19] updated snowflake s3 file name --- .../destination/snowflake/SnowflakeS3StreamCopier.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index a24819747ea0..d6d39c7acbfd 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -25,16 +25,22 @@ package io.airbyte.integrations.destination.snowflake; import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.protocol.models.DestinationSyncMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.sql.SQLException; public class SnowflakeS3StreamCopier extends S3StreamCopier { + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class); + public SnowflakeS3StreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, String schema, @@ -44,7 +50,7 @@ public SnowflakeS3StreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, streamName + ".csv", client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override From 43b4ec7bdbd42a2ac01965cf0579c1a3810c67ca Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 12:24:35 +0300 Subject: [PATCH 08/19] updated snowflake documentation --- docs/integrations/destinations/snowflake.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index d93b51b562ad..af38eeaa9fce 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -189,7 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| 0.3.14 | 2021-09-08 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | +| 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | | 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours | | 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | | 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer | From 529886207845b79e00c080aab09eb3ea0d40f229 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 12:55:22 +0300 Subject: [PATCH 09/19] updated snowflake documentation --- .../destination/snowflake/SnowflakeS3StreamCopier.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index d6d39c7acbfd..185ec57f2c6e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -50,7 +50,7 @@ public SnowflakeS3StreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + streamName, client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "_", 3) + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override From 25d0e9bea575cae0c32989cda9621079e51c35ec Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 13:01:37 +0300 Subject: [PATCH 10/19] updated snowflake documentation --- .../destination/snowflake/SnowflakeS3StreamCopier.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index 185ec57f2c6e..54f1850f1e5e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -50,7 +50,7 @@ public SnowflakeS3StreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "_", 3) + streamName, client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override From 6ef8937d8f789be4e0541ea08eaaa798f55d5bf6 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 14:24:05 +0300 Subject: [PATCH 11/19] updated code style --- .../destination/jdbc/copy/s3/S3StreamCopier.java | 8 ++++---- .../destination/snowflake/SnowflakeS3StreamCopier.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 4a3dee7c9f11..e350099a3716 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -102,10 +102,10 @@ public S3StreamCopier(String stagingFolder, // configured part size. // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. this.multipartUploadManager = - new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); + new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) + .numUploadThreads(DEFAULT_UPLOAD_THREADS) + .queueCapacity(DEFAULT_QUEUE_CAPACITY) + .partSize(s3Config.getPartSize()); // We only need one output stream as we only have one input stream. This is reasonably performant. // See the above comment. this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index 54f1850f1e5e..cc600119b65c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -32,11 +32,10 @@ import io.airbyte.integrations.destination.jdbc.copy.s3.S3Config; import io.airbyte.integrations.destination.jdbc.copy.s3.S3StreamCopier; import io.airbyte.protocol.models.DestinationSyncMode; +import java.sql.SQLException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; - public class SnowflakeS3StreamCopier extends S3StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StreamCopier.class); @@ -50,7 +49,8 @@ public SnowflakeS3StreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config, + nameTransformer, sqlOperations); } @Override From d4539537a8c2296fb50213ad3970292cba90b8b7 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 16:33:19 +0300 Subject: [PATCH 12/19] updated code style --- .../src/main/java/io/airbyte/db/mongodb/MongoUtils.java | 4 ++-- .../sources/MongoDbSourceAcceptanceTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 8acb9c89d4b0..b3fa7aadd0d6 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -110,8 +110,8 @@ private static void readBson(final Document document, final ObjectNode o, final } /** - * Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one field has different types in 2 and more - * documents, the type is set to String. + * Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one + * field has different types in 2 and more documents, the type is set to String. * * @param collection mongo collection * @return map of unique fields and its type diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java index 84b0a3866d56..c9bd7adee725 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java @@ -115,10 +115,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { .withDestinationSyncMode(DestinationSyncMode.APPEND) .withCursorField(List.of("_id")) .withStream(CatalogHelpers.createAirbyteStream( - "test.acceptance_test", - Field.of("_id", JsonSchemaPrimitive.STRING), - Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + "test.acceptance_test", + Field.of("_id", JsonSchemaPrimitive.STRING), + Field.of("id", JsonSchemaPrimitive.STRING), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } From a7cd3ca28357be7f171bd883fa8fe39be02ddffa Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Thu, 9 Sep 2021 16:36:46 +0300 Subject: [PATCH 13/19] updated code style --- .../src/main/java/io/airbyte/db/mongodb/MongoUtils.java | 4 ++-- .../sources/MongoDbSourceAcceptanceTest.java | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java index 8acb9c89d4b0..b3fa7aadd0d6 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/mongodb/MongoUtils.java @@ -110,8 +110,8 @@ private static void readBson(final Document document, final ObjectNode o, final } /** - * Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one field has different types in 2 and more - * documents, the type is set to String. + * Gets 10.000 documents from collection, gathers all unique fields and its type. In case when one + * field has different types in 2 and more documents, the type is set to String. * * @param collection mongo collection * @return map of unique fields and its type diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java index 84b0a3866d56..c9bd7adee725 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAcceptanceTest.java @@ -115,10 +115,10 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() throws Exception { .withDestinationSyncMode(DestinationSyncMode.APPEND) .withCursorField(List.of("_id")) .withStream(CatalogHelpers.createAirbyteStream( - "test.acceptance_test", - Field.of("_id", JsonSchemaPrimitive.STRING), - Field.of("id", JsonSchemaPrimitive.STRING), - Field.of("name", JsonSchemaPrimitive.STRING)) + "test.acceptance_test", + Field.of("_id", JsonSchemaPrimitive.STRING), + Field.of("id", JsonSchemaPrimitive.STRING), + Field.of("name", JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of("_id"))))); } From a63832bfca2ccc640051433c750ddd48e05fd14e Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 10 Sep 2021 12:58:11 +0300 Subject: [PATCH 14/19] updated redshift destination --- .../f7a7d195-377f-cf5b-70a5-be6b819019dc.json | 2 +- .../seed/destination_definitions.yaml | 2 +- .../src/main/resources/edge_case_catalog.json | 29 +++++++++++++++++++ .../src/main/resources/edge_case_messages.txt | 7 +++++ .../destination-redshift/Dockerfile | 2 +- .../redshift/RedshiftStreamCopier.java | 4 ++- docs/integrations/destinations/redshift.md | 1 + 7 files changed, 43 insertions(+), 4 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json index 1adb51301c7c..931c2492a8c8 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/f7a7d195-377f-cf5b-70a5-be6b819019dc.json @@ -2,7 +2,7 @@ "destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc", "name": "Redshift", "dockerRepository": "airbyte/destination-redshift", - "dockerImageTag": "0.3.13", + "dockerImageTag": "0.3.14", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift", "icon": "redshift.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 6c9f2f0ef7b1..a73b7ed79965 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -52,7 +52,7 @@ - destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc name: Redshift dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.13 + dockerImageTag: 0.3.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg - destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json index cb9ae8c50cb6..20c4651c143e 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_catalog.json @@ -96,6 +96,35 @@ } } } + }, + { + "name": "stream_name", + "json_schema": { + "properties": { + "some_id": { + "type": "integer" + }, + "some_field": { + "type": "string" + }, + "some_next_field": { + "type": "string" + } + } + } + }, + { + "name": "stream_name_next", + "json_schema": { + "properties": { + "some_id": { + "type": "integer" + }, + "next_field_name": { + "type": "string" + } + } + } } ] } diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt index 0d325f6e5d86..bb14b273a14c 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt +++ b/airbyte-integrations/bases/standard-destination-test/src/main/resources/edge_case_messages.txt @@ -7,4 +7,11 @@ {"type": "RECORD", "record": {"stream": "reserved_keywords", "emitted_at": 1602637589000, "data": { "order" : "ascending" }}} {"type": "RECORD", "record": {"stream": "groups", "emitted_at": 1602637589000, "data": { "authorization" : "into" }}} {"type": "RECORD", "record": {"stream": "ProperCase", "emitted_at": 1602637589000, "data": { "ProperCase" : true }}} +{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589200, "data": { "some_id" : 101, "some_field" : "some_field_1", "some_next_field" : "some_next_field_1" }}} +{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589250, "data": { "some_id" : 102, "some_field" : "some_field_2" }}} +{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589300, "data": { "some_id" : 103, "some_next_field" : "some_next_field_3" }}} +{"type": "RECORD", "record": {"stream": "stream_name", "emitted_at": 1602637589350, "data": { "some_id" : 104 }}} +{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589400, "data": { "some_id" : 201, "next_field_name" : "next_field_name_1" }}} +{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589450, "data": { "some_id" : 202, "next_field_name" : "next_field_name_2" }}} +{"type": "RECORD", "record": {"stream": "stream_name_next", "emitted_at": 1602637589500, "data": { "some_id" : 203 }}} {"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}} diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 4bfa3c758c28..e0c2dd6d1a55 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.13 +LABEL io.airbyte.version=0.3.14 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index 60733d6b0d62..f2de11d45c2c 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -25,6 +25,7 @@ package io.airbyte.integrations.destination.redshift; import com.amazonaws.services.s3.AmazonS3; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; @@ -44,7 +45,8 @@ public RedshiftStreamCopier(String stagingFolder, S3Config s3Config, ExtendedNameTransformer nameTransformer, SqlOperations sqlOperations) { - super(stagingFolder, destSyncMode, schema, streamName, client, db, s3Config, nameTransformer, sqlOperations); + super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", 3) + "_" + streamName, client, db, s3Config, + nameTransformer, sqlOperations); } @Override diff --git a/docs/integrations/destinations/redshift.md b/docs/integrations/destinations/redshift.md index ff10bd3aac69..d259dfa2e002 100644 --- a/docs/integrations/destinations/redshift.md +++ b/docs/integrations/destinations/redshift.md @@ -107,6 +107,7 @@ See [docs](https://docs.aws.amazon.com/redshift/latest/dg/r_Character_types.html | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.14 | 2021-10-08 | [5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table | | 0.3.13 | 2021-09-02 | [5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance | | 0.3.12 | 2021-07-21 | [3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs | | 0.3.11 | 2021-07-20 | [4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec | From 9bcc52b3ad7118fe39dcd5cd3bf6f951c3ae2103 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 10 Sep 2021 14:49:50 +0300 Subject: [PATCH 15/19] added test data for test filed with bad first char --- .../get_avro_schema.json | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 409a7ce58e28..34a11244dd21 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -380,5 +380,31 @@ } ] } + }, + { + "schemaName": "field_with_bad_first_char", + "namespace": "namespace11", + "appendAirbyteFields": false, + "jsonSchema": { + "type": "object", + "properties": { + "5filed_name": { + "type": ["null", "string"] + } + } + }, + "avroSchema": { + "type": "record", + "name": "field_with_bad_first_char", + "namespace": "namespace11", + "fields": [ + { + "name": "_5filed_name", + "type": ["null", "string"], + "doc":"_airbyte_original_name:5filed_name", + "default": null + } + ] + } } ] From a2676b1b298405069cfdc64b0f9100534590216c Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 10 Sep 2021 15:03:23 +0300 Subject: [PATCH 16/19] updated s3 documentation --- docs/integrations/destinations/s3.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index 7cd03f5ab8c5..3f426715e46d 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -133,8 +133,9 @@ Under the hood, an Airbyte data stream in Json schema is converted to an Avro sc 2. Keyword `not` is not supported, as there is no equivalent validation mechanism in Avro schema. 3. Only alphanumeric characters and underscores (`/a-zA-Z0-9_/`) are allowed in a stream or field name. Any special character will be converted to an alphabet or underscore. For example, `spécial:character_names` will become `special_character_names`. The original names will be stored in the `doc` property in this format: `_airbyte_original_name:`. -4. All field will be nullable. For example, a `string` Json field will be typed as `["null", "string"]` in Avro. This is necessary because the incoming data stream may have optional fields. -5. For array fields in Json schema, when the `items` property is an array, it means that each element in the array should follow its own schema sequentially. For example, the following specification means the first item in the array should be a string, and the second a number. +4. The field name cannot start with a number, so an underscore will be added to the field name at the beginning. +5. All field will be nullable. For example, a `string` Json field will be typed as `["null", "string"]` in Avro. This is necessary because the incoming data stream may have optional fields. +6. For array fields in Json schema, when the `items` property is an array, it means that each element in the array should follow its own schema sequentially. For example, the following specification means the first item in the array should be a string, and the second a number. ```json { @@ -374,7 +375,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.11 | 2021-08-30 | [#5729](https://github.com/airbytehq/airbyte/pull/5729) | Updated the field names to start with `_` for formats `Parquet` and `Avro`. | +| 0.1.11 | 2021-10-08 | [#5729](https://github.com/airbytehq/airbyte/pull/5729) | For field names that start with a digit, a `_` will be appended at the beginning for the` Parquet` and `Avro` formats. | | 0.1.10 | 2021-09-13 | [#4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator | | 0.1.9 | 2021-07-12 | [#4666](https://github.com/airbytehq/airbyte/pull/4666) | Fix MinIO output for Parquet format. | | 0.1.8 | 2021-07-07 | [#4613](https://github.com/airbytehq/airbyte/pull/4613) | Patched schema converter to support combined restrictions. | From e36d1abfed3f0e751230271fd6fe71aa4c464a91 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 10 Sep 2021 18:04:21 +0300 Subject: [PATCH 17/19] fixed remarks --- .../airbyte/integrations/destination/s3/S3NameTransformer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java index 522e68978f0f..40875b060848 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3NameTransformer.java @@ -24,10 +24,8 @@ package io.airbyte.integrations.destination.s3; -import com.google.common.annotations.VisibleForTesting; import io.airbyte.integrations.destination.ExtendedNameTransformer; -@VisibleForTesting public class S3NameTransformer extends ExtendedNameTransformer { @Override From 9ffa857613bfd8417d68d21ffa71a4a391dce006 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 13 Sep 2021 10:38:50 +0300 Subject: [PATCH 18/19] fixed code style --- .../parquet/json_schema_converter/get_avro_schema.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json index 34a11244dd21..77a415baa277 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/get_avro_schema.json @@ -401,7 +401,7 @@ { "name": "_5filed_name", "type": ["null", "string"], - "doc":"_airbyte_original_name:5filed_name", + "doc": "_airbyte_original_name:5filed_name", "default": null } ] From e3f783a008d68820422105f85ff429d3714c9979 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Mon, 13 Sep 2021 16:50:29 +0300 Subject: [PATCH 19/19] fixed s3 tests --- .../destination/s3/S3DestinationAcceptanceTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 12eb55522f06..9f0bae2af0fa 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -53,6 +53,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER; + /** * When adding a new S3 destination acceptance test, extend this class and do the following: *
  • Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}
  • @@ -112,6 +114,7 @@ protected List getAllSyncedObjects(String streamName, String na .listObjects(config.getBucketName(), outputPrefix) .getObjectSummaries() .stream() + .filter(o -> o.getKey().contains(NAME_TRANSFORMER.convertStreamName(streamName) + "/")) .sorted(Comparator.comparingLong(o -> o.getLastModified().getTime())) .collect(Collectors.toList()); LOGGER.info(