Skip to content

Commit

Permalink
fixed oauth connection
Browse files Browse the repository at this point in the history
  • Loading branch information
etsybaev committed Mar 17, 2022
1 parent 36503d8 commit 24403aa
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalTime;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -59,7 +60,8 @@ private static DataSource createDataSource(final JsonNode config) {
final Properties properties = new Properties();

final JsonNode credentials = config.get("credentials");
if (credentials != null && credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) {
if (credentials != null && credentials.has("auth_type") && "Client".equals(
credentials.get("auth_type").asText())) {
LOGGER.info("OAuth login mode is used");
// OAuth login option is selected on UI
final String accessToken;
Expand All @@ -82,14 +84,15 @@ private static DataSource createDataSource(final JsonNode config) {
properties.put("username", username);
SnowflakeDestination.isAlive = true; //is used to enable another thread to refresh oauth token

} else if(credentials != null && credentials.has("password")) {
} else if (credentials != null && credentials.has("password")) {
LOGGER.info("User/password login mode is used");
// Username and pass login option is selected on UI
dataSource.setUsername(username);
dataSource.setPassword(credentials.get("password").asText());

} else {
LOGGER.warn("Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version");
LOGGER.warn(
"Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version");
// case to keep the backward compatibility
dataSource.setUsername(username);
dataSource.setPassword(config.get("password").asText());
Expand Down Expand Up @@ -125,9 +128,9 @@ private static DataSource createDataSource(final JsonNode config) {
}

private static String getAccessTokenUsingRefreshToken(final String hostName,
final String clientId,
final String clientSecret,
final String refreshCode)
final String clientId,
final String clientSecret,
final String refreshCode)
throws IOException {
final var refreshTokenUri = String.format(REFRESH_TOKEN_URL, hostName);
final Map<String, String> requestBody = new HashMap<>();
Expand All @@ -147,7 +150,7 @@ private static String getAccessTokenUsingRefreshToken(final String hostName,
.uri(URI.create(refreshTokenUri))
.header("Content-Type", "application/x-www-form-urlencoded")
.header("Accept", "application/json")
.header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8))
.header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8))
.build();

final HttpResponse<String> response = httpClient.send(request,
Expand All @@ -173,6 +176,7 @@ public static JdbcDatabase getDatabase(final JsonNode config) {
private static class DbConfigurationLoader extends Thread {

private final HikariDataSource hikariDataSource;
LocalTime timeToCountFrom = LocalTime.now();

public DbConfigurationLoader(HikariDataSource hikariDataSource) {
this.hikariDataSource = hikariDataSource;
Expand All @@ -181,19 +185,27 @@ public DbConfigurationLoader(HikariDataSource hikariDataSource) {
@Override
public void run() {
LOGGER.info("Refresh token thread's started");

while (SnowflakeDestination.isAlive) {
var properties = hikariDataSource.getDataSourceProperties();
try {
var token = getAccessTokenUsingRefreshToken(properties.getProperty("host"),
properties.getProperty("client_id"), properties.getProperty("client_secret"),
properties.getProperty("refresh_token"));
properties.setProperty("token", token);
LOGGER.info("New refresh token has been obtained");
} catch (IOException e) {
LOGGER.error("Failed to obtain a fresh accessToken:" + e);
// refresh token every 7 minutes
if (LocalTime.now().plusMinutes(PAUSE_BETWEEN_TOKEN_REFRESH_MIN).isAfter(timeToCountFrom)) {
var properties = hikariDataSource.getDataSourceProperties();
try {
var token = getAccessTokenUsingRefreshToken(properties.getProperty("host"),
properties.getProperty("client_id"), properties.getProperty("client_secret"),
properties.getProperty("refresh_token"));
properties.setProperty("token", token);

timeToCountFrom = LocalTime.now();
LOGGER.info("New refresh token has been obtained");
} catch (IOException e) {
LOGGER.error("Failed to obtain a fresh accessToken:" + e);
}
}

// made some pause to do not use much resources, but if set it higher - then some processes will fail by timeout errors
try {
TimeUnit.MINUTES.sleep(PAUSE_BETWEEN_TOKEN_REFRESH_MIN);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;

public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> implements AutoCloseable {
public class SnowflakeDestination extends SwitchingDestination<SnowflakeDestination.DestinationType> {

public static boolean isAlive;
public static boolean isAlive; // used for refresh Oauth token thread

@Override
public void close() throws Exception {
shutDown();
}

enum DestinationType {
COPY_S3,
Expand All @@ -31,9 +27,6 @@ public SnowflakeDestination() {
public static void main(final String[] args) throws Exception {
final Destination destination = new SnowflakeDestination();
new IntegrationRunner(destination).run(args);
}

private static void shutDown(){
isAlive = false;
}

Expand Down

1 comment on commit 24403aa

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Snowflake(#11093)

Measures

Name Value Name Value Name Value
Duplicated Lines (%) 0.0 Code Smells 21 Duplicated Blocks 0
Quality Gate Status OK Security Rating A Lines to Cover 8
Coverage 0.0 Lines of Code 992 Reliability Rating A
Bugs 1 Vulnerabilities 0 Blocker Issues 0
Critical Issues 7 Major Issues 13 Minor Issues 2

Detected Issues

Rule File Description Message
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:87 String literals should not be duplicated Define a constant instead of duplicating this literal "password" 3 times.
java:S112 (MAJOR) snowflake/SnowflakeAzureBlobStorageStreamCopier.java:61 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1192 (CRITICAL) snowflake/SnowflakeDestinationResolver.java:28 String literals should not be duplicated Define a constant instead of duplicating this literal "loading_method" 9 times.
java:S1118 (MAJOR) snowflake/SnowflakeDestinationResolver.java:13 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:72 String literals should not be duplicated Define a constant instead of duplicating this literal "client_id" 4 times.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:73 String literals should not be duplicated Define a constant instead of duplicating this literal "client_secret" 4 times.
java:S1192 (CRITICAL) snowflake/SnowflakeDatabase.java:73 String literals should not be duplicated Define a constant instead of duplicating this literal "refresh_token" 6 times.
java:S112 (MAJOR) snowflake/SnowflakeDatabase.java:75 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeDatabase.java:163 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S2142 (MAJOR) snowflake/SnowflakeDatabase.java:166 "InterruptedException" should not be ignored Either re-interrupt this method or rethrow the "InterruptedException" that can be caught here.
java:S1104 (MINOR) snowflake/SnowflakeDestination.java:13 Class variable fields should not have public accessibility Make isAlive a static final constant or non-public and provide accessors if needed.
java:S1444 (MINOR) snowflake/SnowflakeDestination.java:13 "public static" fields should be constant Make this "public static isAlive" field final
java:S112 (MAJOR) snowflake/SnowflakeGcsStreamCopier.java:79 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeParallelCopyStreamCopier.java:40 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeS3StreamCopier.java:119 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingSqlOperations.java:79 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1192 (CRITICAL) snowflake/SnowflakeInternalStagingSqlOperations.java:90 String literals should not be duplicated Define a constant instead of duplicating this literal "stage" 3 times.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingDestination.java:61 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S5361 (CRITICAL) snowflake/SnowflakeInternalStagingDestination.java:64 "String#replace" should be preferred to "String#replaceAll" Replace this call to "replaceAll()" by a call to the "replace()" method.
java:S112 (MAJOR) snowflake/SnowflakeInternalStagingSqlOperations.java:75 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1068 (MAJOR) snowflake/SnowflakeInsertDestination.java:19 Unused "private" fields should be removed Remove this unused "LOGGER" private field.
java:S1118 (MAJOR) snowflake/SnowflakeDatabase.java:37 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeAzureBlobStorageStreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeAzureBlobStorageStreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyAzureBlobStorageDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyGcsDestination.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestinationResolver.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeGcsStreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInsertDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingSqlOperations.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeParallelCopyStreamCopier.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java 0.0
src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSQLNameTransformer.java 0.0 src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeSqlOperations.java 0.0

Please sign in to comment.