diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java index 54fd133af34e..4ff1d5544d13 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDatabase.java @@ -21,6 +21,7 @@ import java.net.http.HttpResponse; import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.time.LocalTime; import java.util.Base64; import java.util.HashMap; import java.util.Map; @@ -59,7 +60,8 @@ private static DataSource createDataSource(final JsonNode config) { final Properties properties = new Properties(); final JsonNode credentials = config.get("credentials"); - if (credentials != null && credentials.has("auth_type") && "Client".equals(credentials.get("auth_type").asText())) { + if (credentials != null && credentials.has("auth_type") && "Client".equals( + credentials.get("auth_type").asText())) { LOGGER.info("OAuth login mode is used"); // OAuth login option is selected on UI final String accessToken; @@ -82,14 +84,15 @@ private static DataSource createDataSource(final JsonNode config) { properties.put("username", username); SnowflakeDestination.isAlive = true; //is used to enable another thread to refresh oauth token - } else if(credentials != null && credentials.has("password")) { + } else if (credentials != null && credentials.has("password")) { LOGGER.info("User/password login mode is used"); // Username and pass login option is selected on UI dataSource.setUsername(username); dataSource.setPassword(credentials.get("password").asText()); } else { - LOGGER.warn("Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version"); + LOGGER.warn( + "Obsolete User/password login mode is used. Please re-create a connection to use the latest connector's version"); // case to keep the backward compatibility dataSource.setUsername(username); dataSource.setPassword(config.get("password").asText()); @@ -125,9 +128,9 @@ private static DataSource createDataSource(final JsonNode config) { } private static String getAccessTokenUsingRefreshToken(final String hostName, - final String clientId, - final String clientSecret, - final String refreshCode) + final String clientId, + final String clientSecret, + final String refreshCode) throws IOException { final var refreshTokenUri = String.format(REFRESH_TOKEN_URL, hostName); final Map requestBody = new HashMap<>(); @@ -147,7 +150,7 @@ private static String getAccessTokenUsingRefreshToken(final String hostName, .uri(URI.create(refreshTokenUri)) .header("Content-Type", "application/x-www-form-urlencoded") .header("Accept", "application/json") - .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) .build(); final HttpResponse response = httpClient.send(request, @@ -173,6 +176,7 @@ public static JdbcDatabase getDatabase(final JsonNode config) { private static class DbConfigurationLoader extends Thread { private final HikariDataSource hikariDataSource; + LocalTime timeToCountFrom = LocalTime.now(); public DbConfigurationLoader(HikariDataSource hikariDataSource) { this.hikariDataSource = hikariDataSource; @@ -181,19 +185,27 @@ public DbConfigurationLoader(HikariDataSource hikariDataSource) { @Override public void run() { LOGGER.info("Refresh token thread's started"); + while (SnowflakeDestination.isAlive) { - var properties = hikariDataSource.getDataSourceProperties(); - try { - var token = getAccessTokenUsingRefreshToken(properties.getProperty("host"), - properties.getProperty("client_id"), properties.getProperty("client_secret"), - properties.getProperty("refresh_token")); - properties.setProperty("token", token); - LOGGER.info("New refresh token has been obtained"); - } catch (IOException e) { - LOGGER.error("Failed to obtain a fresh accessToken:" + e); + // refresh token every 7 minutes + if (LocalTime.now().plusMinutes(PAUSE_BETWEEN_TOKEN_REFRESH_MIN).isAfter(timeToCountFrom)) { + var properties = hikariDataSource.getDataSourceProperties(); + try { + var token = getAccessTokenUsingRefreshToken(properties.getProperty("host"), + properties.getProperty("client_id"), properties.getProperty("client_secret"), + properties.getProperty("refresh_token")); + properties.setProperty("token", token); + + timeToCountFrom = LocalTime.now(); + LOGGER.info("New refresh token has been obtained"); + } catch (IOException e) { + LOGGER.error("Failed to obtain a fresh accessToken:" + e); + } } + + // made some pause to do not use much resources, but if set it higher - then some processes will fail by timeout errors try { - TimeUnit.MINUTES.sleep(PAUSE_BETWEEN_TOKEN_REFRESH_MIN); + TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 5d961e96d000..bd48f653ea84 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -8,14 +8,10 @@ import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination; -public class SnowflakeDestination extends SwitchingDestination implements AutoCloseable { +public class SnowflakeDestination extends SwitchingDestination { - public static boolean isAlive; + public static boolean isAlive; // used for refresh Oauth token thread - @Override - public void close() throws Exception { - shutDown(); - } enum DestinationType { COPY_S3, @@ -31,9 +27,6 @@ public SnowflakeDestination() { public static void main(final String[] args) throws Exception { final Destination destination = new SnowflakeDestination(); new IntegrationRunner(destination).run(args); - } - - private static void shutDown(){ isAlive = false; }