From bf8615a4b44612c93c8e286885fb232f615dad6a Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 14:00:02 +0300 Subject: [PATCH 01/14] fixed snowflake. split the snowflake insert query into multiple queries with a specific batch size --- .../airbyte/db/jdbc/DefaultJdbcDatabase.java | 4 ++ .../destination/jdbc/SqlOperationsUtils.java | 42 +++++++++++++++++++ .../snowflake/SnowflakeDatabase.java | 5 ++- .../snowflake/SnowflakeSqlOperations.java | 18 ++++++-- .../src/main/resources/spec.json | 9 +++- 5 files changed, 72 insertions(+), 6 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index 134d8f38cb5d..3e061d0fa602 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -136,6 +136,10 @@ public interface CloseableConnectionSupplier extends AutoCloseable { } + public CloseableConnectionSupplier getConnectionSupplier() { + return connectionSupplier; + } + public static final class DataSourceConnectionSupplier implements CloseableConnectionSupplier { private final DataSource dataSource; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java index 2785b4fe1bae..99fbaf94e6a9 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java @@ -26,8 +26,11 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteRecordMessage; + +import javax.annotation.Nullable; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; @@ -35,6 +38,8 @@ import java.util.List; import java.util.UUID; import java.util.function.Supplier; +import java.util.stream.IntStream; +import java.util.stream.Stream; public class SqlOperationsUtils { @@ -57,6 +62,33 @@ public static void insertRawRecordsInSingleQuery(String insertQueryComponent, insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); } + /** + * Inserts "raw" records in batches queries. The purpose of helper to abstract away database-specific + * SQL syntax from this query. Internally uses {@code insertRawRecordsInSingleQuery} method. + * + * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id, + * data, emitted_at) + * @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?) + * @param jdbcDatabase jdbc database + * @param records records to write + * @param batchSize batch size + * @throws SQLException exception + */ + public static void insertRawRecordsInBatchesQueries(String insertQueryComponent, + String recordQueryComponent, + JdbcDatabase jdbcDatabase, + List records, + @Nullable Integer batchSize) + throws SQLException { + if (batchSize == null || batchSize <= 0) { + insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); + } else { + batches(records, batchSize).forEach(recordList -> Exceptions.toRuntime(() -> { + insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, recordList, UUID::randomUUID, true); + })); + } + } + /** * Inserts "raw" records in a single query. The purpose of helper to abstract away database-specific * SQL syntax from this query. @@ -119,4 +151,14 @@ static void insertRawRecordsInSingleQuery(String insertQueryComponent, }); } + private static Stream> batches(List source, int length) { + int size = source.size(); + if (size <= 0) { + return Stream.empty(); + } + int fullChunks = (size - 1) / length; + return IntStream.range(0, fullChunks + 1).mapToObj( + n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); + } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index a0e6f01acf41..a07522289231 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -70,7 +70,7 @@ public static JdbcDatabase getDatabase(JsonNode config) { return new DefaultJdbcDatabase(new SnowflakeConnectionSupplier(config)); } - private static final class SnowflakeConnectionSupplier implements CloseableConnectionSupplier { + public static final class SnowflakeConnectionSupplier implements CloseableConnectionSupplier { private final JsonNode config; @@ -88,6 +88,9 @@ public void close() { // no op. } + public JsonNode getConfig() { + return config; + } } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index fcec0f55fde5..39fb57e6b59d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -24,17 +24,22 @@ package io.airbyte.integrations.destination.snowflake; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.jdbc.DefaultSqlOperations; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils; import io.airbyte.protocol.models.AirbyteRecordMessage; -import java.sql.SQLException; -import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + class SnowflakeSqlOperations extends DefaultSqlOperations implements SqlOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); @@ -53,7 +58,12 @@ public void createTableIfNotExists(JdbcDatabase database, String schemaName, Str @Override public void insertRecords(JdbcDatabase database, List records, String schemaName, String tableName) throws SQLException { - LOGGER.info("actual size of batch: {}", records.size()); + LOGGER.info("actual size of records: {}", records.size()); + var config = ((SnowflakeDatabase.SnowflakeConnectionSupplier) ((DefaultJdbcDatabase) database).getConnectionSupplier()).getConfig(); + Map map = new HashMap<>(); + config.fields().forEachRemaining(entry -> map.put(entry.getKey(), entry.getValue())); + Integer batchSize = map.containsKey("batch_size") ? map.get("batch_size").intValue() : null; + LOGGER.info("batch size: {}", batchSize == null ? records.size() : batchSize); // snowflake query syntax: // requires selecting from a set of values in order to invoke the parse_json function. @@ -65,7 +75,7 @@ public void insertRecords(JdbcDatabase database, List reco "INSERT INTO %s.%s (%s, %s, %s) SELECT column1, parse_json(column2), column3 FROM VALUES\n", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); final String recordQuery = "(?, ?, ?),\n"; - SqlOperationsUtils.insertRawRecordsInSingleQuery(insertQuery, recordQuery, database, records); + SqlOperationsUtils.insertRawRecordsInBatchesQueries(insertQuery, recordQuery, database, records, batchSize); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index 77ffaf50f1b5..ad96c48578c9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -68,11 +68,18 @@ "title": "Password", "order": 6 }, + "batch_size": { + "description": "Batch size.", + "title": "Batch size that will be written to the table with one query. If the batch size is not specified, all records will be added one query.", + "type": "integer", + "examples": ["1000000"], + "order": 7 + }, "loading_method": { "type": "object", "title": "Loading Method", "description": "Loading method used to send data to Snowflake.", - "order": 7, + "order": 8, "oneOf": [ { "title": "Standard Inserts", From 13cca447fc47fd33d820d2e39d3f85c3479e38e2 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 14:17:56 +0300 Subject: [PATCH 02/14] fixed snowflake. split the snowflake insert query into multiple queries with a specific batch size --- .../destination/snowflake/SnowflakeSqlOperations.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index bd29880da21e..db78f5ddd5e9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -35,13 +35,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations { import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; -class SnowflakeSqlOperations extends DefaultSqlOperations implements SqlOperations { +class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class); @@ -65,7 +64,7 @@ public void insertRecordsInternal(JdbcDatabase database, List map = new HashMap<>(); config.fields().forEachRemaining(entry -> map.put(entry.getKey(), entry.getValue())); Integer batchSize = map.containsKey("batch_size") ? map.get("batch_size").intValue() : null; - LOGGER.info("batch size: {}", batchSize == null ? records.size() : batchSize); + LOGGER.info("batch size: {}", batchSize == null || batchSize <=0 ? records.size() : batchSize); // snowflake query syntax: From cc61a8d2c21b4608194c3d82cb0ac02214f7cc4a Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 14:21:54 +0300 Subject: [PATCH 03/14] fixed code style --- .../destination/snowflake/SnowflakeSqlOperations.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index db78f5ddd5e9..4a410bbb2297 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -64,8 +64,7 @@ public void insertRecordsInternal(JdbcDatabase database, List map = new HashMap<>(); config.fields().forEachRemaining(entry -> map.put(entry.getKey(), entry.getValue())); Integer batchSize = map.containsKey("batch_size") ? map.get("batch_size").intValue() : null; - LOGGER.info("batch size: {}", batchSize == null || batchSize <=0 ? records.size() : batchSize); - + LOGGER.info("batch size: {}", batchSize == null || batchSize <= 0 ? records.size() : batchSize); // snowflake query syntax: // requires selecting from a set of values in order to invoke the parse_json function. From 81a282a83991f72ee7fe76dcee7d7d1948d3a4ae Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 15:38:11 +0300 Subject: [PATCH 04/14] fixed snowflake destination code style --- .../destination/jdbc/SqlOperationsUtils.java | 12 ++++++------ .../destination/snowflake/SnowflakeDatabase.java | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java index 99fbaf94e6a9..b8e5ad27d4ca 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java @@ -29,8 +29,6 @@ import io.airbyte.commons.lang.Exceptions; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteRecordMessage; - -import javax.annotation.Nullable; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Timestamp; @@ -40,6 +38,7 @@ import java.util.function.Supplier; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.annotation.Nullable; public class SqlOperationsUtils { @@ -63,8 +62,9 @@ public static void insertRawRecordsInSingleQuery(String insertQueryComponent, } /** - * Inserts "raw" records in batches queries. The purpose of helper to abstract away database-specific - * SQL syntax from this query. Internally uses {@code insertRawRecordsInSingleQuery} method. + * Inserts "raw" records in batches queries. The purpose of helper to abstract away + * database-specific SQL syntax from this query. Internally uses + * {@code insertRawRecordsInSingleQuery} method. * * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id, * data, emitted_at) @@ -79,7 +79,7 @@ public static void insertRawRecordsInBatchesQueries(String insertQueryComponent, JdbcDatabase jdbcDatabase, List records, @Nullable Integer batchSize) - throws SQLException { + throws SQLException { if (batchSize == null || batchSize <= 0) { insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); } else { @@ -158,7 +158,7 @@ private static Stream> batches(List source, int length) { } int fullChunks = (size - 1) / length; return IntStream.range(0, fullChunks + 1).mapToObj( - n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); + n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index a07522289231..ea21bbd56611 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -91,6 +91,7 @@ public void close() { public JsonNode getConfig() { return config; } + } } From c1d1a179adb229dff6a501ce997638dfce0f9ba0 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 15:46:55 +0300 Subject: [PATCH 05/14] fixed snowflake destination code style --- .../destination/snowflake/SnowflakeSqlOperations.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index 4a410bbb2297..af154ff77a47 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -32,13 +32,12 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils; import io.airbyte.protocol.models.AirbyteRecordMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.sql.SQLException; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class SnowflakeSqlOperations extends JdbcSqlOperations implements SqlOperations { From 54c11e87ae998f4457bca42abf8faa85443a6ef6 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 17:37:55 +0300 Subject: [PATCH 06/14] updated documentation and snowflake dockerImageTag --- .../424892c4-daac-4491-b35d-c6688ba547ba.json | 2 +- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../connectors/destination-snowflake/Dockerfile | 2 +- docs/integrations/destinations/snowflake.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 994c239bd047..6391160f1556 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.12", + "dockerImageTag": "0.3.13", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 24797629fe83..94f7d836919a 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -42,7 +42,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.12 + dockerImageTag: 0.3.13 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 4b9a429fb607..c5d2b2996002 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.12 +LABEL io.airbyte.version=0.3.13 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 5f101d04337c..8d54bd44a029 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -189,6 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours | | 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | | 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer | | 0.3.10 | 2021-07-12 | [#4713](https://github.com/airbytehq/airbyte/pull/4713)| Tag traffic with `airbyte` label to enable optimization opportunities from Snowflake | From 5509439aa6d23eb47c90400f4283906384d67f54 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 1 Sep 2021 17:51:13 +0300 Subject: [PATCH 07/14] updated documentation and snowflake dockerImageTag --- .../424892c4-daac-4491-b35d-c6688ba547ba.json | 2 +- .../init/src/main/resources/seed/destination_definitions.yaml | 2 +- .../connectors/destination-snowflake/Dockerfile | 2 +- docs/integrations/destinations/snowflake.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 6391160f1556..2edb43670955 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.3.13", + "dockerImageTag": "0.3.14", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 94f7d836919a..6014b50bab82 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -42,7 +42,7 @@ - destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba name: Snowflake dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.3.13 + dockerImageTag: 0.3.14 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake - destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 name: S3 diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index c5d2b2996002..d0fd15acba2d 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.13 +LABEL io.airbyte.version=0.3.14 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index 8d54bd44a029..e73606dd6dfb 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -189,6 +189,7 @@ Finally, you need to add read/write permissions to your bucket with that email. | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.3.14 | 2021-09-01 | [#5783](https://github.com/airbytehq/airbyte/pull/5783) | Added `batch size` to spec.json and split the snowflake insert query into multiple queries with a specific batch size | | 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours | | 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json | | 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer | From de34252d858a8c0ba49950bc8ce601a37b95fe5c Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 3 Sep 2021 17:25:47 +0300 Subject: [PATCH 08/14] updated documentation --- .../connectors/destination-snowflake/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/README.md b/airbyte-integrations/connectors/destination-snowflake/README.md index 6709d48b7006..2ac4db8bddf5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/README.md +++ b/airbyte-integrations/connectors/destination-snowflake/README.md @@ -14,7 +14,8 @@ "database": "AIRBYTE_DATABASE", "schema": "AIRBYTE_SCHEMA", "username": "AIRBYTE_USER", - "password": "SOMEPASSWORD" + "password": "SOMEPASSWORD", + "batch_size": "BATCH SIZE" - not required field } ``` From 332144ebbb4ce41833df736d727b5686120e5d2f Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 17 Sep 2021 14:43:38 +0300 Subject: [PATCH 09/14] updated snowflake s3 and gcs stream copiers and streamCopierFactories --- .../airbyte/db/jdbc/DefaultJdbcDatabase.java | 4 - .../destination/jdbc/SqlOperationsUtils.java | 38 ------ .../jdbc/copy/CopyConsumerFactory.java | 3 +- .../destination/jdbc/copy/StreamCopier.java | 9 +- .../jdbc/copy/gcs/GcsStreamCopier.java | 60 +++++--- .../jdbc/copy/s3/S3StreamCopier.java | 129 +++++++++--------- .../destination-snowflake/README.md | 3 +- .../snowflake/SnowflakeDatabase.java | 4 - .../snowflake/SnowflakeSqlOperations.java | 9 +- .../src/main/resources/spec.json | 9 +- ...flakeGcsCopyDestinationAcceptanceTest.java | 2 +- docs/integrations/destinations/snowflake.md | 1 - 12 files changed, 118 insertions(+), 153 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index 471f769a8929..28986cc639b0 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -146,10 +146,6 @@ public interface CloseableConnectionSupplier extends AutoCloseable { } - public CloseableConnectionSupplier getConnectionSupplier() { - return connectionSupplier; - } - public static final class DataSourceConnectionSupplier implements CloseableConnectionSupplier { private final DataSource dataSource; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java index b8e5ad27d4ca..18195c472656 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java @@ -61,34 +61,6 @@ public static void insertRawRecordsInSingleQuery(String insertQueryComponent, insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); } - /** - * Inserts "raw" records in batches queries. The purpose of helper to abstract away - * database-specific SQL syntax from this query. Internally uses - * {@code insertRawRecordsInSingleQuery} method. - * - * @param insertQueryComponent the first line of the query e.g. INSERT INTO public.users (ab_id, - * data, emitted_at) - * @param recordQueryComponent query template for a full record e.g. (?, ?::jsonb ?) - * @param jdbcDatabase jdbc database - * @param records records to write - * @param batchSize batch size - * @throws SQLException exception - */ - public static void insertRawRecordsInBatchesQueries(String insertQueryComponent, - String recordQueryComponent, - JdbcDatabase jdbcDatabase, - List records, - @Nullable Integer batchSize) - throws SQLException { - if (batchSize == null || batchSize <= 0) { - insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, records, UUID::randomUUID, true); - } else { - batches(records, batchSize).forEach(recordList -> Exceptions.toRuntime(() -> { - insertRawRecordsInSingleQuery(insertQueryComponent, recordQueryComponent, jdbcDatabase, recordList, UUID::randomUUID, true); - })); - } - } - /** * Inserts "raw" records in a single query. The purpose of helper to abstract away database-specific * SQL syntax from this query. @@ -151,14 +123,4 @@ static void insertRawRecordsInSingleQuery(String insertQueryComponent, }); } - private static Stream> batches(List source, int length) { - int size = source.size(); - if (size <= 0) { - return Stream.empty(); - } - int fullChunks = (size - 1) / length; - return IntStream.range(0, fullChunks + 1).mapToObj( - n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length)); - } - } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 9dcb075575bb..7317531165db 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -111,13 +111,14 @@ private static RecordWriter recordWriterFunction(Map pairToIgnoredRecordCount) { return (AirbyteStreamNameNamespacePair pair, List records) -> { + var fileName = pairToCopier.get(pair).prepareStagingFile(); for (AirbyteRecordMessage recordMessage : records) { var id = UUID.randomUUID(); if (sqlOperations.isValidData(recordMessage.getData())) { // TODO Truncate json data instead of throwing whole record away? // or should we upload it into a special rejected record folder in s3 instead? var emittedAt = Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt())); - pairToCopier.get(pair).write(id, Jsons.serialize(recordMessage.getData()), emittedAt); + pairToCopier.get(pair).write(id, Jsons.serialize(recordMessage.getData()), emittedAt, fileName); } else { pairToIgnoredRecordCount.put(pair, pairToIgnoredRecordCount.getOrDefault(pair, 0L) + 1L); } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java index 54d2f25ea3c4..bbff700d8b35 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/StreamCopier.java @@ -36,7 +36,7 @@ public interface StreamCopier { /** * Writes a value to a staging file for the stream. */ - void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception; + void write(UUID id, String jsonDataString, Timestamp emittedAt, String s3FileName) throws Exception; /** * Closes the writer for the stream to the staging persistence. This method should block until all @@ -78,4 +78,11 @@ public interface StreamCopier { */ void removeFileAndDropTmpTable() throws Exception; + /** + * Creates the staging file and all the necessary items to write data to this file. + * + * @return the name of the staging file + */ + String prepareStagingFile(); + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index c74ee13e8389..dd7979434163 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -30,6 +30,7 @@ import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; @@ -44,6 +45,9 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; import java.util.UUID; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVPrinter; @@ -54,11 +58,11 @@ public abstract class GcsStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); - private final String gcsStagingFile; + //private final String gcsStagingFile; private final Storage storageClient; private final GcsConfig gcsConfig; - private final WriteChannel channel; - private final CSVPrinter csvPrinter; + //private final WriteChannel channel; + //private final CSVPrinter csvPrinter; private final String tmpTableName; private final DestinationSyncMode destSyncMode; private final String schemaName; @@ -66,6 +70,10 @@ public abstract class GcsStreamCopier implements StreamCopier { private final JdbcDatabase db; private final ExtendedNameTransformer nameTransformer; private final SqlOperations sqlOperations; + private final Set gcsStagingFiles = new HashSet<>(); + private final HashMap channels = new HashMap<>(); + private final HashMap csvPrinters = new HashMap<>(); + private final String stagingFolder; public GcsStreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, @@ -79,57 +87,73 @@ public GcsStreamCopier(String stagingFolder, this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; + this.stagingFolder = stagingFolder; this.db = db; this.nameTransformer = nameTransformer; this.sqlOperations = sqlOperations; this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.storageClient = storageClient; this.gcsConfig = gcsConfig; + } - this.gcsStagingFile = String.join("/", stagingFolder, schemaName, streamName); - - var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + @Override + public String prepareStagingFile() { + var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); + gcsStagingFiles.add(name); + var blobId = BlobId.of(gcsConfig.getBucketName(), name); var blobInfo = BlobInfo.newBuilder(blobId).build(); var blob = storageClient.create(blobInfo); - this.channel = blob.writer(); + var channel = blob.writer(); + channels.put(name, channel); OutputStream outputStream = Channels.newOutputStream(channel); var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); } catch (IOException e) { throw new RuntimeException(e); } + return name; } @Override - public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { - csvPrinter.printRecord(id, jsonDataString, emittedAt); + public void write(UUID id, String jsonDataString, Timestamp emittedAt, String gcsFileName) throws Exception { + if (csvPrinters.containsKey(gcsFileName)) { + csvPrinters.get(gcsFileName).printRecord(id, jsonDataString, emittedAt); + } } @Override public void closeStagingUploader(boolean hasFailed) throws Exception { LOGGER.info("Uploading remaining data for {} stream.", streamName); - csvPrinter.close(); - channel.close(); + for (var csvPrinter : csvPrinters.values()) { + csvPrinter.close(); + } + for (var channel : channels.values()) { + channel.close(); + } LOGGER.info("All data for {} stream uploaded.", streamName); } @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}.", tmpTableName, streamName, schemaName); - copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig); + for (var gcsStagingFile : gcsStagingFiles) { + copyGcsCsvFileIntoTable(db, getFullGcsPath(gcsConfig.getBucketName(), gcsStagingFile), schemaName, tmpTableName, gcsConfig); + } LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } @Override public void removeFileAndDropTmpTable() throws Exception { - LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile); - var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); - if (storageClient.get(blobId).exists()) { - storageClient.delete(blobId); + for (var gcsStagingFile : gcsStagingFiles) { + LOGGER.info("Begin cleaning gcs staging file {}.", gcsStagingFile); + var blobId = BlobId.of(gcsConfig.getBucketName(), gcsStagingFile); + if (storageClient.get(blobId).exists()) { + storageClient.delete(blobId); + } + LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile); } - LOGGER.info("GCS staging file {} cleaned.", gcsStagingFile); LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index e350099a3716..3c96ae4402c0 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -32,39 +32,45 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import io.airbyte.commons.lang.Exceptions; +import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.protocol.models.DestinationSyncMode; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.sql.Timestamp; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.UUID; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public abstract class S3StreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamCopier.class); - private static final int DEFAULT_UPLOAD_THREADS = 10; // The S3 cli uses 10 threads by default. + private static final int DEFAULT_UPLOAD_THREADS = 1; // The S3 cli uses 10 threads by default. private static final int DEFAULT_QUEUE_CAPACITY = DEFAULT_UPLOAD_THREADS; // The smallest part size is 5MB. An S3 upload can be maximally formed of 10,000 parts. This gives // us an upper limit of 10,000 * 10 / 1000 = 100 GB per table with a 10MB part size limit. // WARNING: Too large a part size can cause potential OOM errors. public static final int DEFAULT_PART_SIZE_MB = 10; - private final String s3StagingFile; private final AmazonS3 s3Client; private final S3Config s3Config; - private final StreamTransferManager multipartUploadManager; - private final MultiPartOutputStream outputStream; - private final CSVPrinter csvPrinter; private final String tmpTableName; private final DestinationSyncMode destSyncMode; private final String schemaName; @@ -72,11 +78,18 @@ public abstract class S3StreamCopier implements StreamCopier { private final JdbcDatabase db; private final ExtendedNameTransformer nameTransformer; private final SqlOperations sqlOperations; + private final Set s3StagingFiles = new HashSet<>(); + private final Map multipartUploadManagers = new HashMap<>(); + private final Map outputStreams = new HashMap<>(); + private final Map csvPrinters = new HashMap<>(); + private final String s3FileName; + private final String stagingFolder; public S3StreamCopier(String stagingFolder, DestinationSyncMode destSyncMode, String schema, String streamName, + String s3FileName, AmazonS3 client, JdbcDatabase db, S3Config s3Config, @@ -85,14 +98,20 @@ public S3StreamCopier(String stagingFolder, this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; + this.s3FileName = s3FileName; + this.stagingFolder = stagingFolder; this.db = db; this.nameTransformer = nameTransformer; this.sqlOperations = sqlOperations; this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.s3Client = client; this.s3Config = s3Config; + } - this.s3StagingFile = prepareS3StagingFile(stagingFolder, streamName); + @Override + public String prepareStagingFile() { + var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + s3FileName); + s3StagingFiles.add(name); LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not // have support for streaming multipart uploads; @@ -101,73 +120,37 @@ public S3StreamCopier(String stagingFolder, // Data is chunked into parts. A part is sent off to a queue to be uploaded once it has reached it's // configured part size. // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. - this.multipartUploadManager = - new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) + var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) .numUploadThreads(DEFAULT_UPLOAD_THREADS) .queueCapacity(DEFAULT_QUEUE_CAPACITY) .partSize(s3Config.getPartSize()); + multipartUploadManagers.put(name, manager); + var outputStream = manager.getMultiPartOutputStreams().get(0); // We only need one output stream as we only have one input stream. This is reasonably performant. // See the above comment. - this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); - - var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); - try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public S3StreamCopier(String stagingFolder, - DestinationSyncMode destSyncMode, - String schema, - String streamName, - String s3FileName, - AmazonS3 client, - JdbcDatabase db, - S3Config s3Config, - ExtendedNameTransformer nameTransformer, - SqlOperations sqlOperations) { - this.destSyncMode = destSyncMode; - this.schemaName = schema; - this.streamName = streamName; - this.db = db; - this.nameTransformer = nameTransformer; - this.sqlOperations = sqlOperations; - this.tmpTableName = nameTransformer.getTmpTableName(streamName); - this.s3Client = client; - this.s3Config = s3Config; - - this.s3StagingFile = prepareS3StagingFile(stagingFolder, s3FileName); - LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); - this.multipartUploadManager = - new StreamTransferManager(s3Config.getBucketName(), s3StagingFile, client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); - this.outputStream = multipartUploadManager.getMultiPartOutputStreams().get(0); - + outputStreams.put(name, outputStream); var writer = new PrintWriter(outputStream, true, StandardCharsets.UTF_8); try { - this.csvPrinter = new CSVPrinter(writer, CSVFormat.DEFAULT); + csvPrinters.put(name, new CSVPrinter(writer, CSVFormat.DEFAULT)); } catch (IOException e) { throw new RuntimeException(e); } - } - - private String prepareS3StagingFile(String stagingFolder, String s3FileName) { - return String.join("/", stagingFolder, schemaName, s3FileName); + return name; } @Override - public void write(UUID id, String jsonDataString, Timestamp emittedAt) throws Exception { - csvPrinter.printRecord(id, jsonDataString, emittedAt); + public void write(UUID id, String jsonDataString, Timestamp emittedAt, String s3FileName) throws Exception { + if (csvPrinters.containsKey(s3FileName)) { + csvPrinters.get(s3FileName).printRecord(id, jsonDataString, emittedAt); + } } @Override public void closeStagingUploader(boolean hasFailed) throws Exception { if (hasFailed) { - multipartUploadManager.abort(); + for (var multipartUploadManager : multipartUploadManagers.values()) { + multipartUploadManager.abort(); + } } closeAndWaitForUpload(); } @@ -187,7 +170,9 @@ public void createTemporaryTable() throws Exception { @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); + s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> { + copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); + })); LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); } @@ -215,11 +200,13 @@ public String generateMergeStatement(String destTableName) throws Exception { @Override public void removeFileAndDropTmpTable() throws Exception { - LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile); - if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) { - s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile); - } - LOGGER.info("S3 staging file {} cleaned.", s3StagingFile); + s3StagingFiles.forEach(s3StagingFile -> { + LOGGER.info("Begin cleaning s3 staging file {}.", s3StagingFile); + if (s3Client.doesObjectExist(s3Config.getBucketName(), s3StagingFile)) { + s3Client.deleteObject(s3Config.getBucketName(), s3StagingFile); + } + LOGGER.info("S3 staging file {} cleaned.", s3StagingFile); + }); LOGGER.info("Begin cleaning {} tmp table in destination.", tmpTableName); sqlOperations.dropTableIfExists(db, schemaName, tmpTableName); @@ -235,9 +222,15 @@ private static String getFullS3Path(String s3BucketName, String s3StagingFile) { */ private void closeAndWaitForUpload() throws IOException { LOGGER.info("Uploading remaining data for {} stream.", streamName); - csvPrinter.close(); - outputStream.close(); - multipartUploadManager.complete(); + for (var csvPrinter : csvPrinters.values()) { + csvPrinter.close(); + } + for (var outputStream : outputStreams.values()) { + outputStream.close(); + } + for (var multipartUploadManager : multipartUploadManagers.values()) { + multipartUploadManager.complete(); + } LOGGER.info("All data for {} stream uploaded.", streamName); } diff --git a/airbyte-integrations/connectors/destination-snowflake/README.md b/airbyte-integrations/connectors/destination-snowflake/README.md index 2ac4db8bddf5..6709d48b7006 100644 --- a/airbyte-integrations/connectors/destination-snowflake/README.md +++ b/airbyte-integrations/connectors/destination-snowflake/README.md @@ -14,8 +14,7 @@ "database": "AIRBYTE_DATABASE", "schema": "AIRBYTE_SCHEMA", "username": "AIRBYTE_USER", - "password": "SOMEPASSWORD", - "batch_size": "BATCH SIZE" - not required field + "password": "SOMEPASSWORD" } ``` diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 6d18b23cc0da..1f6b2c656d87 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -88,10 +88,6 @@ public void close() { // no op. } - public JsonNode getConfig() { - return config; - } - } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index af154ff77a47..5f445a485302 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -58,12 +58,7 @@ public void createTableIfNotExists(JdbcDatabase database, String schemaName, Str @Override public void insertRecordsInternal(JdbcDatabase database, List records, String schemaName, String tableName) throws SQLException { - LOGGER.info("actual size of records: {}", records.size()); - var config = ((SnowflakeDatabase.SnowflakeConnectionSupplier) ((DefaultJdbcDatabase) database).getConnectionSupplier()).getConfig(); - Map map = new HashMap<>(); - config.fields().forEachRemaining(entry -> map.put(entry.getKey(), entry.getValue())); - Integer batchSize = map.containsKey("batch_size") ? map.get("batch_size").intValue() : null; - LOGGER.info("batch size: {}", batchSize == null || batchSize <= 0 ? records.size() : batchSize); + LOGGER.info("actual size of batch: {}", records.size()); // snowflake query syntax: // requires selecting from a set of values in order to invoke the parse_json function. @@ -75,7 +70,7 @@ public void insertRecordsInternal(JdbcDatabase database, List Date: Fri, 17 Sep 2021 14:48:51 +0300 Subject: [PATCH 10/14] updated code style --- .../destination/jdbc/SqlOperationsUtils.java | 4 ---- .../jdbc/copy/gcs/GcsStreamCopier.java | 5 +---- .../jdbc/copy/s3/S3StreamCopier.java | 20 ++++++++----------- .../s3/S3DestinationAcceptanceTest.java | 4 ++-- .../snowflake/SnowflakeSqlOperations.java | 4 ---- 5 files changed, 11 insertions(+), 26 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java index 18195c472656..2785b4fe1bae 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/SqlOperationsUtils.java @@ -26,7 +26,6 @@ import com.google.common.annotations.VisibleForTesting; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.lang.Exceptions; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.sql.PreparedStatement; @@ -36,9 +35,6 @@ import java.util.List; import java.util.UUID; import java.util.function.Supplier; -import java.util.stream.IntStream; -import java.util.stream.Stream; -import javax.annotation.Nullable; public class SqlOperationsUtils { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index dd7979434163..418721d16e8d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -58,11 +58,8 @@ public abstract class GcsStreamCopier implements StreamCopier { private static final Logger LOGGER = LoggerFactory.getLogger(GcsStreamCopier.class); - //private final String gcsStagingFile; private final Storage storageClient; private final GcsConfig gcsConfig; - //private final WriteChannel channel; - //private final CSVPrinter csvPrinter; private final String tmpTableName; private final DestinationSyncMode destSyncMode; private final String schemaName; @@ -98,7 +95,7 @@ public GcsStreamCopier(String stagingFolder, @Override public String prepareStagingFile() { - var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); + var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); gcsStagingFiles.add(name); var blobId = BlobId.of(gcsConfig.getBucketName(), name); var blobInfo = BlobInfo.newBuilder(blobId).build(); diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 3c96ae4402c0..09c3117630bf 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -32,7 +32,6 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.S3ObjectInputStream; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.string.Strings; import io.airbyte.db.jdbc.JdbcDatabase; @@ -40,14 +39,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperations; import io.airbyte.integrations.destination.jdbc.copy.StreamCopier; import io.airbyte.protocol.models.DestinationSyncMode; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVPrinter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.io.PrintWriter; import java.nio.charset.StandardCharsets; import java.sql.SQLException; @@ -57,6 +49,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVPrinter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class S3StreamCopier implements StreamCopier { @@ -121,9 +117,9 @@ public String prepareStagingFile() { // configured part size. // Memory consumption is queue capacity * part size = 10 * 10 = 100 MB at current configurations. var manager = new StreamTransferManager(s3Config.getBucketName(), name, s3Client) - .numUploadThreads(DEFAULT_UPLOAD_THREADS) - .queueCapacity(DEFAULT_QUEUE_CAPACITY) - .partSize(s3Config.getPartSize()); + .numUploadThreads(DEFAULT_UPLOAD_THREADS) + .queueCapacity(DEFAULT_QUEUE_CAPACITY) + .partSize(s3Config.getPartSize()); multipartUploadManagers.put(name, manager); var outputStream = manager.getMultiPartOutputStreams().get(0); // We only need one output stream as we only have one input stream. This is reasonably performant. @@ -170,7 +166,7 @@ public void createTemporaryTable() throws Exception { @Override public void copyStagingFileToTemporaryTable() throws Exception { LOGGER.info("Starting copy to tmp table: {} in destination for stream: {}, schema: {}, .", tmpTableName, streamName, schemaName); - s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> { + s3StagingFiles.forEach(s3StagingFile -> Exceptions.toRuntime(() -> { copyS3CsvFileIntoTable(db, getFullS3Path(s3Config.getBucketName(), s3StagingFile), schemaName, tmpTableName, s3Config); })); LOGGER.info("Copy to tmp table {} in destination for stream {} complete.", tmpTableName, streamName); diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java index 9f0bae2af0fa..20a92927dfa6 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/java/io/airbyte/integrations/destination/s3/S3DestinationAcceptanceTest.java @@ -24,6 +24,8 @@ package io.airbyte.integrations.destination.s3; +import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER; + import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; @@ -53,8 +55,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.airbyte.integrations.destination.s3.S3DestinationConstants.NAME_TRANSFORMER; - /** * When adding a new S3 destination acceptance test, extend this class and do the following: *
  • Implement {@link #getFormatConfig} that returns a {@link S3FormatConfig}
  • diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java index 5f445a485302..057c8c9da398 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java @@ -24,8 +24,6 @@ package io.airbyte.integrations.destination.snowflake; -import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations; @@ -33,9 +31,7 @@ import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils; import io.airbyte.protocol.models.AirbyteRecordMessage; import java.sql.SQLException; -import java.util.HashMap; import java.util.List; -import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 9dfb63fd48895db79527b952fe1c87be354b15ed Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 17 Sep 2021 14:56:32 +0300 Subject: [PATCH 11/14] updated code style --- .../snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java index ba0a396e00d3..5a68845aed66 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsCopyDestinationAcceptanceTest.java @@ -34,7 +34,7 @@ public class SnowflakeGcsCopyDestinationAcceptanceTest extends SnowflakeInsertDe @Override public JsonNode getStaticConfig() { - final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/config.json"))); + final JsonNode copyConfig = Jsons.deserialize(IOs.readFile(Path.of("secrets/copy_gcs_config.json"))); Preconditions.checkArgument(SnowflakeDestination.isGcsCopy(copyConfig)); Preconditions.checkArgument(!SnowflakeDestination.isS3Copy(copyConfig)); return copyConfig; From 47358c014b9074a0cdc5a9affea3cd78fb3f83a7 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Fri, 17 Sep 2021 14:57:59 +0300 Subject: [PATCH 12/14] updated SnowflakeDatabase --- .../integrations/destination/snowflake/SnowflakeDatabase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 1f6b2c656d87..347af87c5bfa 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -70,7 +70,7 @@ public static JdbcDatabase getDatabase(JsonNode config) { return new DefaultJdbcDatabase(new SnowflakeConnectionSupplier(config)); } - public static final class SnowflakeConnectionSupplier implements CloseableConnectionSupplier { + private static final class SnowflakeConnectionSupplier implements CloseableConnectionSupplier { private final JsonNode config; From a57459683d121f0e945d7f13670ef4fbb66352d7 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Tue, 21 Sep 2021 13:31:26 +0300 Subject: [PATCH 13/14] fixed remarks --- .../destination/jdbc/copy/CopyConsumerFactory.java | 2 -- .../destination/jdbc/copy/gcs/GcsStreamCopier.java | 10 +++++++--- .../destination/jdbc/copy/s3/S3StreamCopier.java | 12 ++++++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 0ddd2962f430..7baa0874ff89 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -36,8 +36,6 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; - -import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java index 22f8d870167a..7981469f3f61 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/gcs/GcsStreamCopier.java @@ -96,9 +96,13 @@ public GcsStreamCopier(String stagingFolder, this.gcsConfig = gcsConfig; } + private String prepareGcsStagingFile() { + return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); + } + @Override public String prepareStagingFile() { - var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + streamName); + var name = prepareGcsStagingFile(); gcsStagingFiles.add(name); var blobId = BlobId.of(gcsConfig.getBucketName(), name); var blobInfo = BlobInfo.newBuilder(blobId).build(); @@ -120,8 +124,8 @@ public String prepareStagingFile() { public void write(UUID id, AirbyteRecordMessage recordMessage, String gcsFileName) throws Exception { if (csvPrinters.containsKey(gcsFileName)) { csvPrinters.get(gcsFileName).printRecord(id, - Jsons.serialize(recordMessage.getData()), - Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); } } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 5ad41c36a113..b8481fc1e156 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -32,9 +32,9 @@ import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.string.Strings; -import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.SqlOperations; @@ -107,9 +107,13 @@ public S3StreamCopier(String stagingFolder, this.s3Config = s3Config; } + private String prepareS3StagingFile() { + return String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + s3FileName); + } + @Override public String prepareStagingFile() { - var name = String.join("/", stagingFolder, schemaName, Strings.addRandomSuffix("", "", 3) + "_" + s3FileName); + var name = prepareS3StagingFile(); s3StagingFiles.add(name); LOGGER.info("S3 upload part size: {} MB", s3Config.getPartSize()); // The stream transfer manager lets us greedily stream into S3. The native AWS SDK does not @@ -141,8 +145,8 @@ public String prepareStagingFile() { public void write(UUID id, AirbyteRecordMessage recordMessage, String s3FileName) throws Exception { if (csvPrinters.containsKey(s3FileName)) { csvPrinters.get(s3FileName).printRecord(id, - Jsons.serialize(recordMessage.getData()), - Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); + Jsons.serialize(recordMessage.getData()), + Timestamp.from(Instant.ofEpochMilli(recordMessage.getEmittedAt()))); } } From 3c83807c20939f5313b5f7d081bb4cf58c67fb57 Mon Sep 17 00:00:00 2001 From: andriikorotkov Date: Wed, 22 Sep 2021 10:41:11 +0300 Subject: [PATCH 14/14] updated DatabricksStreamCopier --- .../destination/databricks/DatabricksStreamCopier.java | 9 ++++++++- .../connectors/destination-jdbc/Dockerfile | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java index 9844d684711c..6b780528d067 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java @@ -75,6 +75,7 @@ public class DatabricksStreamCopier implements StreamCopier { private final S3ParquetWriter parquetWriter; private final String tmpTableLocation; private final String destTableLocation; + private final String stagingFolder; public DatabricksStreamCopier(String stagingFolder, String schema, @@ -98,6 +99,7 @@ public DatabricksStreamCopier(String stagingFolder, this.tmpTableName = nameTransformer.getTmpTableName(streamName); this.destTableName = nameTransformer.getIdentifier(streamName); + this.stagingFolder = stagingFolder; S3DestinationConfig stagingS3Config = getStagingS3DestinationConfig(s3Config, stagingFolder); this.parquetWriter = (S3ParquetWriter) writerFactory.create(stagingS3Config, s3Client, configuredStream, uploadTime); @@ -116,7 +118,12 @@ public DatabricksStreamCopier(String stagingFolder, } @Override - public void write(UUID id, AirbyteRecordMessage recordMessage) throws Exception { + public String prepareStagingFile() { + return String.join("/", s3Config.getBucketPath(), stagingFolder); + } + + @Override + public void write(UUID id, AirbyteRecordMessage recordMessage, String fileName) throws Exception { parquetWriter.write(id, recordMessage); } diff --git a/airbyte-integrations/connectors/destination-jdbc/Dockerfile b/airbyte-integrations/connectors/destination-jdbc/Dockerfile index 612b4d004600..1de711a53ec7 100644 --- a/airbyte-integrations/connectors/destination-jdbc/Dockerfile +++ b/airbyte-integrations/connectors/destination-jdbc/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.4 +LABEL io.airbyte.version=0.3.5 LABEL io.airbyte.name=airbyte/destination-jdbc