diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 51275eed5c6f..54fd133af34e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -54,11 +54,13 @@ private static DataSource createDataSource(final JsonNode config) { final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?", config.get("host").asText())); + final String username = config.get("username").asText(); final Properties properties = new Properties(); final JsonNode credentials = config.get("credentials"); - if (credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) { + if (credentials != null && credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) { + LOGGER.info("OAuth login mode is used"); // OAuth login option is selected on UI final String accessToken; try { @@ -77,11 +79,20 @@ private static DataSource createDataSource(final JsonNode config) { properties.put("authenticator", "oauth"); properties.put("token", accessToken); // the username is required for DBT normalization in OAuth connection - properties.put("username", config.get("username").asText()); - } else { + properties.put("username", username); + SnowflakeDestination.isAlive = true; //is used to enable another thread to refresh oauth token + + } else if(credentials != null && credentials.has("password")) { + LOGGER.info("User/password login mode is used"); // Username and pass login option is selected on UI - dataSource.setUsername(config.get("username").asText()); + dataSource.setUsername(username); dataSource.setPassword(credentials.get("password").asText()); + + } else { + LOGGER.warn("Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version"); + // case to keep the backward compatibility + dataSource.setUsername(username); + dataSource.setPassword(config.get("password").asText()); } properties.put("warehouse", config.get("warehouse").asText()); @@ -128,13 +139,15 @@ private static String getAccessTokenUsingRefreshToken(final String hostName, .map(key -> key + "=" + URLEncoder.encode(requestBody.get(key), StandardCharsets.UTF_8)) .collect(joining("&"))); + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + final HttpRequest request = HttpRequest.newBuilder() .POST(bodyPublisher) .uri(URI.create(refreshTokenUri)) .header("Content-Type", "application/x-www-form-urlencoded") .header("Accept", "application/json") - .header("Authorization", "Basic " + new String( - Base64.getEncoder().encode((clientId + ":" + clientSecret).getBytes()))) + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) .build(); final HttpResponse response = httpClient.send(request, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 92a1a093be5d..5d961e96d000 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -14,7 +14,7 @@ public class SnowflakeDestination extends SwitchingDestination getTypeToDestination() { DestinationType.COPY_AZURE_BLOB, azureBlobStorageDestination, DestinationType.INTERNAL_STAGING, internalStagingDestination); } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java index b4c513eb017e..2fd44c800f42 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java @@ -22,7 +22,8 @@ public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations { - public static final String CREATE_STAGE_QUERY = "CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');"; + public static final String CREATE_STAGE_QUERY = + "CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');"; public static final String COPY_QUERY = "COPY INTO %s.%s FROM @%s file_format = " + "(type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')"; public static final String DROP_STAGE_QUERY = "DROP STAGE IF EXISTS %s;"; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json index 103e9a46d25e..449a8f1bbefe 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/resources/spec.json @@ -14,8 +14,7 @@ "warehouse", "database", "schema", - "username", - "credentials" + "username" ], "additionalProperties": true, "properties": { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.java index 4c47280e9a66..41d776feedcf 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationIntegrationTest.java @@ -15,7 +15,6 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.io.IOException; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.sql.SQLException; diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java index 2f9d2953f515..52ab30c6be48 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationTest.java @@ -11,10 +11,8 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,8 +26,8 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; -import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.integrations.destination.snowflake.SnowflakeDestination.DestinationType; +import io.airbyte.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; @@ -125,11 +123,9 @@ private static Stream destinationTypeToConfig() { return Stream.of( arguments("copy_gcs_config.json", DestinationType.COPY_GCS), arguments("copy_s3_config.json", DestinationType.COPY_S3), - arguments("insert_config.json", DestinationType.INTERNAL_STAGING) - ); + arguments("insert_config.json", DestinationType.INTERNAL_STAGING)); } - private List generateTestMessages() { return IntStream.range(0, 3) .boxed() diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java index cfb9cb5aaff6..a9d3cc3d7a75 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperationsTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.snowflake; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -8,12 +12,14 @@ class SnowflakeInternalStagingSqlOperationsTest { public static final String SCHEMA_NAME = "schemaName"; public static final String STAGE_NAME = "stageName"; - private final SnowflakeInternalStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeInternalStagingSqlOperations(new SnowflakeSQLNameTransformer()); + private final SnowflakeInternalStagingSqlOperations snowflakeStagingSqlOperations = + new SnowflakeInternalStagingSqlOperations(new SnowflakeSQLNameTransformer()); @Test void createStageIfNotExists() { String actualCreateStageQuery = snowflakeStagingSqlOperations.getCreateStageQuery(STAGE_NAME); - String expectedCreateStageQuery = "CREATE STAGE IF NOT EXISTS " + STAGE_NAME + " encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');"; + String expectedCreateStageQuery = + "CREATE STAGE IF NOT EXISTS " + STAGE_NAME + " encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');"; assertEquals(expectedCreateStageQuery, actualCreateStageQuery); } @@ -32,4 +38,4 @@ void dropStageIfExists() { assertEquals(expectedQuery, actualDropQuery); } -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java index c54cd05fa788..b4de44c0aa0e 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperationsTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.snowflake; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -48,4 +52,5 @@ void insertRecordsInternal() throws SQLException { snowflakeSqlOperations.insertRecordsInternal(db, List.of(new AirbyteRecordMessage()), SCHEMA_NAME, TABLE_NAME); verify(db, times(1)).execute(any(CheckedConsumer.class)); } -} \ No newline at end of file + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java index 9609b6173625..5f8206d6947a 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/DestinationSnowflakeOAuthFlow.java @@ -15,6 +15,7 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.HashMap; import java.util.Map; @@ -86,6 +87,9 @@ protected Map completeOAuthFlow(final String clientId, final JsonNode oAuthParamConfig) throws IOException { final var accessTokenUrl = getAccessTokenUrl(inputOAuthConfiguration); + + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); final HttpRequest request = HttpRequest.newBuilder() .POST(HttpRequest.BodyPublishers .ofString(tokenReqContentType.getConverter().apply( @@ -93,8 +97,7 @@ protected Map completeOAuthFlow(final String clientId, .uri(URI.create(accessTokenUrl)) .header("Content-Type", tokenReqContentType.getContentType()) .header("Accept", "application/json") - .header("Authorization", "Basic " + new String( - Base64.getEncoder().encode((clientId + ":" + clientSecret).getBytes()))) + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) .build(); try { final HttpResponse response = httpClient.send(request,