Skip to content

Commit

Permalink
[4654] Added backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
etsybaev committed Mar 16, 2022
1 parent b557731 commit 94720ba
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,13 @@ private static DataSource createDataSource(final JsonNode config) {

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?",
config.get("host").asText()));
final String username = config.get("username").asText();

final Properties properties = new Properties();

final JsonNode credentials = config.get("credentials");
if (credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) {
if (credentials != null && credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) {
LOGGER.info("OAuth login mode is used");
// OAuth login option is selected on UI
final String accessToken;
try {
Expand All @@ -77,11 +79,20 @@ private static DataSource createDataSource(final JsonNode config) {
properties.put("authenticator", "oauth");
properties.put("token", accessToken);
// the username is required for DBT normalization in OAuth connection
properties.put("username", config.get("username").asText());
} else {
properties.put("username", username);
SnowflakeDestination.isAlive = true; //is used to enable another thread to refresh oauth token

} else if(credentials != null && credentials.has("password")) {
LOGGER.info("User/password login mode is used");
// Username and pass login option is selected on UI
dataSource.setUsername(config.get("username").asText());
dataSource.setUsername(username);
dataSource.setPassword(credentials.get("password").asText());

} else {
LOGGER.warn("Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version");
// case to keep the backward compatibility
dataSource.setUsername(username);
dataSource.setPassword(config.get("password").asText());
}

properties.put("warehouse", config.get("warehouse").asText());
Expand Down Expand Up @@ -128,13 +139,15 @@ private static String getAccessTokenUsingRefreshToken(final String hostName,
.map(key -> key + "=" + URLEncoder.encode(requestBody.get(key), StandardCharsets.UTF_8))
.collect(joining("&")));

final byte[] authorization = Base64.getEncoder()
.encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8));

final HttpRequest request = HttpRequest.newBuilder()
.POST(bodyPublisher)
.uri(URI.create(refreshTokenUri))
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Accept", "application/json")
.header("Authorization", "Basic " + new String(
Base64.getEncoder().encode((clientId + ":" + clientSecret).getBytes())))
.header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8))
.build();

final HttpResponse<String> response = httpClient.send(request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestinat

@Override
public void close() throws Exception {
isAlive = false;
shutDown();
}

enum DestinationType {
Expand All @@ -33,4 +33,8 @@ public static void main(final String[] args) throws Exception {
new IntegrationRunner(destination).run(args);
}

private static void shutDown(){
isAlive = false;
}

}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -45,4 +49,5 @@ public static Map<DestinationType, Destination> getTypeToDestination() {
DestinationType.COPY_AZURE_BLOB, azureBlobStorageDestination,
DestinationType.INTERNAL_STAGING, internalStagingDestination);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@

public class SnowflakeInternalStagingSqlOperations extends SnowflakeSqlOperations implements StagingOperations {

public static final String CREATE_STAGE_QUERY = "CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');";
public static final String CREATE_STAGE_QUERY =
"CREATE STAGE IF NOT EXISTS %s encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');";
public static final String COPY_QUERY = "COPY INTO %s.%s FROM @%s file_format = " +
"(type = csv field_delimiter = ',' skip_header = 0 FIELD_OPTIONALLY_ENCLOSED_BY = '\"')";
public static final String DROP_STAGE_QUERY = "DROP STAGE IF EXISTS %s;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
"warehouse",
"database",
"schema",
"username",
"credentials"
"username"
],
"additionalProperties": true,
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.SQLException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -28,8 +26,8 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.integrations.destination.snowflake.SnowflakeDestination.DestinationType;
import io.airbyte.integrations.destination.staging.StagingConsumerFactory;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
Expand Down Expand Up @@ -125,11 +123,9 @@ private static Stream<Arguments> destinationTypeToConfig() {
return Stream.of(
arguments("copy_gcs_config.json", DestinationType.COPY_GCS),
arguments("copy_s3_config.json", DestinationType.COPY_S3),
arguments("insert_config.json", DestinationType.INTERNAL_STAGING)
);
arguments("insert_config.json", DestinationType.INTERNAL_STAGING));
}


private List<AirbyteMessage> generateTestMessages() {
return IntStream.range(0, 3)
.boxed()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -8,12 +12,14 @@ class SnowflakeInternalStagingSqlOperationsTest {

public static final String SCHEMA_NAME = "schemaName";
public static final String STAGE_NAME = "stageName";
private final SnowflakeInternalStagingSqlOperations snowflakeStagingSqlOperations = new SnowflakeInternalStagingSqlOperations(new SnowflakeSQLNameTransformer());
private final SnowflakeInternalStagingSqlOperations snowflakeStagingSqlOperations =
new SnowflakeInternalStagingSqlOperations(new SnowflakeSQLNameTransformer());

@Test
void createStageIfNotExists() {
String actualCreateStageQuery = snowflakeStagingSqlOperations.getCreateStageQuery(STAGE_NAME);
String expectedCreateStageQuery = "CREATE STAGE IF NOT EXISTS " + STAGE_NAME + " encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');";
String expectedCreateStageQuery =
"CREATE STAGE IF NOT EXISTS " + STAGE_NAME + " encryption = (type = 'SNOWFLAKE_SSE') copy_options = (on_error='skip_file');";
assertEquals(expectedCreateStageQuery, actualCreateStageQuery);
}

Expand All @@ -32,4 +38,4 @@ void dropStageIfExists() {
assertEquals(expectedQuery, actualDropQuery);
}

}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -48,4 +52,5 @@ void insertRecordsInternal() throws SQLException {
snowflakeSqlOperations.insertRecordsInternal(db, List.of(new AirbyteRecordMessage()), SCHEMA_NAME, TABLE_NAME);
verify(db, times(1)).execute(any(CheckedConsumer.class));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -86,15 +87,17 @@ protected Map<String, Object> completeOAuthFlow(final String clientId,
final JsonNode oAuthParamConfig)
throws IOException {
final var accessTokenUrl = getAccessTokenUrl(inputOAuthConfiguration);

final byte[] authorization = Base64.getEncoder()
.encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8));
final HttpRequest request = HttpRequest.newBuilder()
.POST(HttpRequest.BodyPublishers
.ofString(tokenReqContentType.getConverter().apply(
getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))))
.uri(URI.create(accessTokenUrl))
.header("Content-Type", tokenReqContentType.getContentType())
.header("Accept", "application/json")
.header("Authorization", "Basic " + new String(
Base64.getEncoder().encode((clientId + ":" + clientSecret).getBytes())))
.header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8))
.build();
try {
final HttpResponse<String> response = httpClient.send(request,
Expand Down

1 comment on commit 94720ba

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Snowflake(#11093)

Measures

Name Value Name Value Name Value
Reliability Rating C Quality Gate Status OK Bugs 1
Coverage 0.0 Security Rating A Lines to Cover 407
Vulnerabilities 0 Duplicated Lines (%) 0.0 Code Smells 21
Lines of Code 991 Duplicated Blocks 0 Blocker Issues 0
Critical Issues 7 Major Issues 13 Minor Issues 2

Detected Issues

Rule File Description Message
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:85 String literals should not be duplicated Define a constant instead of duplicating this literal "password" 3 times.
java:S112 (MAJOR) snowflake/SnowflakeAzureBlobStorageStreamCopier.java:61 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1192 (CRITICAL) snowflake/SnowflakeDestinationResolver.java:28 String literals should not be duplicated Define a constant instead of duplicating this literal "loading_method" 9 times.
java:S1118 (MAJOR) snowflake/SnowflakeDestinationResolver.java:13 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:70 String literals should not be duplicated Define a constant instead of duplicating this literal "client_id" 4 times.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:71 String literals should not be duplicated Define a constant instead of duplicating this literal "client_secret" 4 times.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:71 String literals should not be duplicated Define a constant instead of duplicating this literal "refresh_token" 6 times.
java:S112 (MAJOR) snowflake/SnowflakeDatabase.java:73 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeDatabase.java:160 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S2142 (MAJOR) snowflake/SnowflakeDatabase.java:163 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S1104 (MINOR) snowflake/SnowflakeDestination.java:13 Class variable fields should not have public accessibility Make isAlive a static final constant or non-public and provide accessors if needed.
java:S1444 (MINOR) snowflake/SnowflakeDestination.java:13 "public static" fields should be constant Make this "public static isAlive" field final
java:S112 (MAJOR) snowflake/SnowflakeGcsStreamCopier.java:79 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeParallelCopyStreamCopier.java:40 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeS3StreamCopier.java:119 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingSqlOperations.java:79 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1192 (CRITICAL) snowflake/SnowflakeInternalStagingSqlOperations.java:90 String literals should not be duplicated Define a constant instead of duplicating this literal "stage" 3 times.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingDestination.java:61 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S5361 (CRITICAL) snowflake/SnowflakeInternalStagingDestination.java:64 "String#replace" should be preferred to "String#replaceAll" Replace this call to "replaceAll()" by a call to the "replace()" method.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingSqlOperations.java:75 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1068 (MAJOR) snowflake/SnowflakeInsertDestination.java:19 Unused "private" fields should be removed Remove this unused "LOGGER" private field.
java:S1118 (MAJOR) snowflake/SnowflakeDatabase.java:36 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeAzureBlobStorageStreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeAzureBlobStorageStreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyAzureBlobStorageDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyGcsDestination.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationResolver.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeParallelCopyStreamCopier.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java 0.0

Please sign in to comment.