diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 63dae61c6306..3ec994c85491 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -751,7 +751,7 @@ - name: Snowflake sourceDefinitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2 dockerRepository: airbyte/source-snowflake - dockerImageTag: 0.1.10 + dockerImageTag: 0.1.11 documentationUrl: https://docs.airbyte.io/integrations/sources/snowflake icon: snowflake.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 0bb0de7c5594..333e61f9bada 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -7955,7 +7955,7 @@ - - "client_secret" oauthFlowOutputParameters: - - "refresh_token" -- dockerImage: "airbyte/source-snowflake:0.1.10" +- dockerImage: "airbyte/source-snowflake:0.1.11" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/snowflake" connectionSpecification: @@ -7968,10 +7968,77 @@ - "warehouse" - "database" - "schema" - - "username" - - "password" - additionalProperties: false + additionalProperties: true properties: + credentials: + title: "Authorization Method" + type: "object" + oneOf: + - type: "object" + title: "OAuth2.0" + order: 0 + required: + - "client_id" + - "client_secret" + - "auth_type" + properties: + auth_type: + type: "string" + const: "OAuth" + default: "OAuth" + order: 0 + client_id: + type: "string" + title: "Client ID" + description: "The Client ID of your Snowflake developer application." + airbyte_secret: true + order: 1 + client_secret: + type: "string" + title: "Client Secret" + description: "The Client Secret of your Snowflake developer application." + airbyte_secret: true + order: 2 + access_token: + type: "string" + title: "Access Token" + description: "Access Token for making authenticated requests." + airbyte_secret: true + order: 3 + refresh_token: + type: "string" + title: "Refresh Token" + description: "Refresh Token for making authenticated requests." + airbyte_secret: true + order: 4 + - title: "Username and Password" + type: "object" + required: + - "username" + - "password" + - "auth_type" + order: 1 + properties: + auth_type: + type: "string" + const: "username/password" + default: "username/password" + order: 0 + username: + description: "The username you created to allow Airbyte to access\ + \ the database." + examples: + - "AIRBYTE_USER" + type: "string" + title: "Username" + order: 1 + password: + description: "The password associated with the username." + type: "string" + airbyte_secret: true + title: "Password" + order: 2 + order: 0 host: description: "The host domain of the snowflake instance (must include the\ \ account, region, cloud environment, and end with snowflakecomputing.com)." @@ -7979,58 +8046,96 @@ - "accountname.us-east-2.aws.snowflakecomputing.com" type: "string" title: "Account Name" - order: 0 + order: 1 role: description: "The role you created for Airbyte to access Snowflake." examples: - "AIRBYTE_ROLE" type: "string" title: "Role" - order: 1 + order: 2 warehouse: description: "The warehouse you created for Airbyte to access data." examples: - "AIRBYTE_WAREHOUSE" type: "string" title: "Warehouse" - order: 2 + order: 3 database: description: "The database you created for Airbyte to access data." examples: - "AIRBYTE_DATABASE" type: "string" title: "Database" - order: 3 + order: 4 schema: description: "The source Snowflake schema tables." examples: - "AIRBYTE_SCHEMA" type: "string" title: "Schema" - order: 4 - username: - description: "The username you created to allow Airbyte to access the database." - examples: - - "AIRBYTE_USER" - type: "string" - title: "Username" order: 5 - password: - description: "The password associated with the username." - type: "string" - airbyte_secret: true - title: "Password" - order: 6 jdbc_url_params: description: "Additional properties to pass to the JDBC URL string when\ \ connecting to the database formatted as 'key=value' pairs separated\ \ by the symbol '&'. (example: key1=value1&key2=value2&key3=value3)." title: "JDBC URL Params" type: "string" - order: 7 + order: 6 supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] + advanced_auth: + auth_flow_type: "oauth2.0" + predicate_key: + - "credentials" + - "auth_type" + predicate_value: "OAuth" + oauth_config_specification: + oauth_user_input_from_connector_config_specification: + type: "object" + additionalProperties: false + properties: + host: + type: "string" + path_in_connector_config: + - "host" + complete_oauth_output_specification: + type: "object" + additionalProperties: false + properties: + access_token: + type: "string" + path_in_connector_config: + - "credentials" + - "access_token" + refresh_token: + type: "string" + path_in_connector_config: + - "credentials" + - "refresh_token" + complete_oauth_server_input_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + client_secret: + type: "string" + complete_oauth_server_output_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + path_in_connector_config: + - "credentials" + - "client_id" + client_secret: + type: "string" + path_in_connector_config: + - "credentials" + - "client_secret" - dockerImage: "airbyte/source-square:0.1.4" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/square" diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java index 02aeaacb3fde..9bc01f2f7208 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/AbstractSourceConnectorTest.java @@ -140,6 +140,13 @@ protected StandardCheckConnectionOutput runCheck() throws Exception { .run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot); } + protected String runCheckAndGetStatusAsString(JsonNode config) throws Exception { + return new DefaultCheckConnectionWorker( + workerConfigs, + new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory, workerConfigs.getResourceRequirements())) + .run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot).getStatus().toString(); + } + protected AirbyteCatalog runDiscover() throws Exception { return new DefaultDiscoverCatalogWorker( workerConfigs, diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index f85b4eebc0c5..1d6c237a3659 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -289,7 +289,7 @@ public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { final JsonNode jdbcConfig = toDatabaseConfig(config); final JdbcDatabase database = Databases.createStreamingJdbcDatabase( - jdbcConfig.get("username").asText(), + jdbcConfig.has("username") ? jdbcConfig.get("username").asText() : null, jdbcConfig.has("password") ? jdbcConfig.get("password").asText() : null, jdbcConfig.get("jdbc_url").asText(), driverClass, diff --git a/airbyte-integrations/connectors/source-snowflake/Dockerfile b/airbyte-integrations/connectors/source-snowflake/Dockerfile index 2756403f5960..f2d1461977b5 100644 --- a/airbyte-integrations/connectors/source-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/source-snowflake/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-snowflake COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.10 +LABEL io.airbyte.version=0.1.11 LABEL io.airbyte.name=airbyte/source-snowflake diff --git a/airbyte-integrations/connectors/source-snowflake/README.md b/airbyte-integrations/connectors/source-snowflake/README.md index 124f7a6c5ccf..759a0a649388 100644 --- a/airbyte-integrations/connectors/source-snowflake/README.md +++ b/airbyte-integrations/connectors/source-snowflake/README.md @@ -13,10 +13,28 @@ "warehouse": "AIRBYTE_WAREHOUSE", "database": "AIRBYTE_DATABASE", "schema": "AIRBYTE_SCHEMA", - "username": "AIRBYTE_USER", - "password": "SOMEPASSWORD" + "credentails" { + "auth_type": "username/password", + "username": "AIRBYTE_USER", + "password": "SOMEPASSWORD" + } +} +``` +3. Create a file at `secrets/config_auth.json` with the following format: +``` +{ + "host": "ACCOUNT.REGION.PROVIDER.snowflakecomputing.com", + "role": "AIRBYTE_ROLE", + "warehouse": "AIRBYTE_WAREHOUSE", + "database": "AIRBYTE_DATABASE", + "schema": "AIRBYTE_SCHEMA", + "credentails" { + "auth_type": "OAuth", + "client_id": "client_id", + "client_secret": "client_secret", + "refresh_token": "refresh_token" + } } ``` - ## For Airbyte employees Put the contents of the `Snowflake Insert Test Creds` secret on Lastpass into `secrets/config.json` to be able to run integration tests locally. diff --git a/airbyte-integrations/connectors/source-snowflake/build.gradle b/airbyte-integrations/connectors/source-snowflake/build.gradle index 84f73f77e172..c641b62056d6 100644 --- a/airbyte-integrations/connectors/source-snowflake/build.gradle +++ b/airbyte-integrations/connectors/source-snowflake/build.gradle @@ -17,6 +17,7 @@ dependencies { implementation project(':airbyte-protocol:models') implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) implementation group: 'net.snowflake', name: 'snowflake-jdbc', version: '3.13.9' + implementation 'com.zaxxer:HikariCP:5.0.1' testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) testImplementation project(':airbyte-test-utils') diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeDataSourceUtils.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeDataSourceUtils.java new file mode 100644 index 000000000000..9d0351943116 --- /dev/null +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeDataSourceUtils.java @@ -0,0 +1,197 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.snowflake; + +import static java.util.stream.Collectors.joining; + +import com.fasterxml.jackson.databind.JsonNode; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.airbyte.commons.json.Jsons; +import java.io.IOException; +import java.net.URI; +import java.net.URLEncoder; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpRequest.BodyPublisher; +import java.net.http.HttpRequest.BodyPublishers; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SnowflakeDataSourceUtils { + + public static final String OAUTH_METHOD = "OAuth"; + public static final String USERNAME_PASSWORD_METHOD = "username/password"; + public static final String UNRECOGNIZED = "Unrecognized"; + + private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDataSourceUtils.class); + private static final int PAUSE_BETWEEN_TOKEN_REFRESH_MIN = 7; // snowflake access token's TTL is 10min and can't be modified + private static final String REFRESH_TOKEN_URL = "https://%s/oauth/token-request"; + private static final HttpClient httpClient = HttpClient.newBuilder() + .version(HttpClient.Version.HTTP_2) + .connectTimeout(Duration.ofSeconds(10)) + .build(); + + /** + * Snowflake OAuth access token expires in 10 minutes. For the cases when sync duration is more than + * 10 min, it requires updating 'token' property after the start of connection pool. + * HikariDataSource brings support for this requirement. + * + * @param config source config JSON + * @return datasource + */ + public static HikariDataSource createDataSource(final JsonNode config) { + HikariDataSource dataSource = new HikariDataSource(); + dataSource.setJdbcUrl(buildJDBCUrl(config)); + + if (config.has("credentials")) { + JsonNode credentials = config.get("credentials"); + final String authType = credentials.has("auth_type") ? credentials.get("auth_type").asText() : UNRECOGNIZED; + switch (authType) { + case OAUTH_METHOD -> { + LOGGER.info("Authorization mode is OAuth"); + dataSource.setDataSourceProperties(buildAuthProperties(config)); + // thread to keep the refresh token up to date + SnowflakeSource.SCHEDULED_EXECUTOR_SERVICE.scheduleAtFixedRate( + getAccessTokenTask(dataSource), + PAUSE_BETWEEN_TOKEN_REFRESH_MIN, PAUSE_BETWEEN_TOKEN_REFRESH_MIN, TimeUnit.MINUTES); + } + case USERNAME_PASSWORD_METHOD -> { + LOGGER.info("Authorization mode is 'Username and password'"); + populateUsernamePasswordConfig(dataSource, config.get("credentials")); + } + default -> throw new IllegalArgumentException("Unrecognized auth type: " + authType); + } + } else { + LOGGER.info("Authorization mode is deprecated 'Username and password'. Please update your source configuration"); + populateUsernamePasswordConfig(dataSource, config); + } + + return dataSource; + } + + /** + * Method to make request for a new access token using refresh token and client credentials. + * + * @return access token + */ + public static String getAccessTokenUsingRefreshToken(final String hostName, + final String clientId, + final String clientSecret, + final String refreshToken) + throws IOException { + final var refreshTokenUri = String.format(REFRESH_TOKEN_URL, hostName); + final Map requestBody = new HashMap<>(); + requestBody.put("grant_type", "refresh_token"); + requestBody.put("refresh_token", refreshToken); + + try { + final BodyPublisher bodyPublisher = BodyPublishers.ofString(requestBody.keySet().stream() + .map(key -> key + "=" + URLEncoder.encode(requestBody.get(key), StandardCharsets.UTF_8)) + .collect(joining("&"))); + + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + + final HttpRequest request = HttpRequest.newBuilder() + .POST(bodyPublisher) + .uri(URI.create(refreshTokenUri)) + .header("Content-Type", "application/x-www-form-urlencoded") + .header("Accept", "application/json") + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) + .build(); + + final HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + final JsonNode jsonResponse = Jsons.deserialize(response.body()); + if (jsonResponse.has("access_token")) { + return jsonResponse.get("access_token").asText(); + } else { + LOGGER.error("Failed to obtain accessToken using refresh token. " + jsonResponse); + throw new RuntimeException( + "Failed to obtain accessToken using refresh token."); + } + } catch (final InterruptedException e) { + throw new IOException("Failed to refreshToken", e); + } + } + + public static String buildJDBCUrl(JsonNode config) { + final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?", + config.get("host").asText())); + + // Add required properties + jdbcUrl.append(String.format( + "role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s", + config.get("role").asText(), + config.get("warehouse").asText(), + config.get("database").asText(), + config.get("schema").asText(), + // Needed for JDK17 - see + // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow + "JSON", + true)); + + // https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string + if (config.has("jdbc_url_params")) { + jdbcUrl.append("&").append(config.get("jdbc_url_params").asText()); + } + return jdbcUrl.toString(); + } + + private static Runnable getAccessTokenTask(final HikariDataSource dataSource) { + return () -> { + LOGGER.info("Refresh token process started"); + var props = dataSource.getDataSourceProperties(); + try { + var token = getAccessTokenUsingRefreshToken(props.getProperty("host"), + props.getProperty("client_id"), props.getProperty("client_secret"), + props.getProperty("refresh_token")); + props.setProperty("token", token); + dataSource.setDataSourceProperties(props); + LOGGER.info("New access token has been obtained"); + } catch (IOException e) { + LOGGER.error("Failed to obtain a fresh accessToken:" + e); + } + }; + } + + public static Properties buildAuthProperties(JsonNode config) { + Properties properties = new Properties(); + try { + var credentials = config.get("credentials"); + properties.setProperty("client_id", credentials.get("client_id").asText()); + properties.setProperty("client_secret", credentials.get("client_secret").asText()); + properties.setProperty("refresh_token", credentials.get("refresh_token").asText()); + properties.setProperty("host", config.get("host").asText()); + properties.put("authenticator", "oauth"); + properties.put("account", config.get("host").asText()); + + String accessToken = getAccessTokenUsingRefreshToken( + config.get("host").asText(), credentials.get("client_id").asText(), + credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); + + properties.put("token", accessToken); + } catch (IOException e) { + LOGGER.error("Request access token was failed with error" + e.getMessage()); + } + return properties; + } + + private static void populateUsernamePasswordConfig(HikariConfig hikariConfig, JsonNode config) { + hikariConfig.setUsername(config.get("username").asText()); + hikariConfig.setPassword(config.get("password").asText()); + } + +} diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java index b404e4fc3b3b..33fe4f434671 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSource.java @@ -4,14 +4,25 @@ package io.airbyte.integrations.source.snowflake; +import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.OAUTH_METHOD; +import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.UNRECOGNIZED; +import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.USERNAME_PASSWORD_METHOD; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import java.io.IOException; import java.sql.JDBCType; +import java.sql.SQLException; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import javax.sql.DataSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,54 +30,79 @@ public class SnowflakeSource extends AbstractJdbcSource implements Sou private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSource.class); public static final String DRIVER_CLASS = "net.snowflake.client.jdbc.SnowflakeDriver"; + public static final ScheduledExecutorService SCHEDULED_EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); public SnowflakeSource() { - super(DRIVER_CLASS, new SnowflakeJdbcStreamingQueryConfiguration(), new SnowflakeSourceOperations()); + super(DRIVER_CLASS, new SnowflakeJdbcStreamingQueryConfiguration(), + new SnowflakeSourceOperations()); } public static void main(final String[] args) throws Exception { final Source source = new SnowflakeSource(); LOGGER.info("starting source: {}", SnowflakeSource.class); new IntegrationRunner(source).run(args); + SCHEDULED_EXECUTOR_SERVICE.shutdownNow(); LOGGER.info("completed source: {}", SnowflakeSource.class); } + @Override + public JdbcDatabase createDatabase(JsonNode config) throws SQLException { + final DataSource dataSource = SnowflakeDataSourceUtils.createDataSource(config); + var database = new StreamingJdbcDatabase(dataSource, new SnowflakeSourceOperations(), + new SnowflakeJdbcStreamingQueryConfiguration()); + quoteString = database.getMetaData().getIdentifierQuoteString(); + return database; + } + @Override public JsonNode toDatabaseConfig(final JsonNode config) { + final String jdbcUrl = SnowflakeDataSourceUtils.buildJDBCUrl(config); - final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?", - config.get("host").asText())); + if (config.has("credentials")) { + JsonNode credentials = config.get("credentials"); + final String authType = + credentials.has("auth_type") ? credentials.get("auth_type").asText() : UNRECOGNIZED; + return switch (authType) { + case OAUTH_METHOD -> buildOAuthConfig(config, jdbcUrl); + case USERNAME_PASSWORD_METHOD -> buildUsernamePasswordConfig(config.get("credentials"), + jdbcUrl); + default -> throw new IllegalArgumentException("Unrecognized auth type: " + authType); + }; + } else { + return buildUsernamePasswordConfig(config, jdbcUrl); + } + } - // Add required properties - jdbcUrl.append(String.format("role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s", - config.get("role").asText(), - config.get("warehouse").asText(), - config.get("database").asText(), - config.get("schema").asText(), - // Needed for JDK17 - see - // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow - "JSON", - true)); + @Override + public Set getExcludedInternalNameSpaces() { + return Set.of( + "INFORMATION_SCHEMA"); + } - // https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string - if (config.has("jdbc_url_params")) { - jdbcUrl.append("&").append(config.get("jdbc_url_params").asText()); + private JsonNode buildOAuthConfig(JsonNode config, String jdbcUrl) { + final String accessToken; + var credentials = config.get("credentials"); + try { + accessToken = SnowflakeDataSourceUtils.getAccessTokenUsingRefreshToken( + config.get("host").asText(), credentials.get("client_id").asText(), + credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); + } catch (IOException e) { + throw new RuntimeException(e); } + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() + .put("connection_properties", + String.join(";", "authenticator=oauth", "token=" + accessToken)) + .put("jdbc_url", jdbcUrl); + return Jsons.jsonNode(configBuilder.build()); + } - LOGGER.info(jdbcUrl.toString()); - + private JsonNode buildUsernamePasswordConfig(JsonNode config, String jdbcUrl) { final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) .put("password", config.get("password").asText()) - .put("jdbc_url", jdbcUrl.toString()); - + .put("jdbc_url", jdbcUrl); + LOGGER.info(jdbcUrl); return Jsons.jsonNode(configBuilder.build()); } - @Override - public Set getExcludedInternalNameSpaces() { - return Set.of( - "INFORMATION_SCHEMA"); - } - } diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/resources/spec.json b/airbyte-integrations/connectors/source-snowflake/src/main/resources/spec.json index 95b989811537..689926366c68 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-snowflake/src/main/resources/spec.json @@ -4,71 +4,183 @@ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Snowflake Source Spec", "type": "object", - "required": [ - "host", - "role", - "warehouse", - "database", - "schema", - "username", - "password" - ], - "additionalProperties": false, + "required": ["host", "role", "warehouse", "database", "schema"], + "additionalProperties": true, "properties": { + "credentials": { + "title": "Authorization Method", + "type": "object", + "oneOf": [ + { + "type": "object", + "title": "OAuth2.0", + "order": 0, + "required": ["client_id", "client_secret", "auth_type"], + "properties": { + "auth_type": { + "type": "string", + "const": "OAuth", + "default": "OAuth", + "order": 0 + }, + "client_id": { + "type": "string", + "title": "Client ID", + "description": "The Client ID of your Snowflake developer application.", + "airbyte_secret": true, + "order": 1 + }, + "client_secret": { + "type": "string", + "title": "Client Secret", + "description": "The Client Secret of your Snowflake developer application.", + "airbyte_secret": true, + "order": 2 + }, + "access_token": { + "type": "string", + "title": "Access Token", + "description": "Access Token for making authenticated requests.", + "airbyte_secret": true, + "order": 3 + }, + "refresh_token": { + "type": "string", + "title": "Refresh Token", + "description": "Refresh Token for making authenticated requests.", + "airbyte_secret": true, + "order": 4 + } + } + }, + { + "title": "Username and Password", + "type": "object", + "required": ["username", "password", "auth_type"], + "order": 1, + "properties": { + "auth_type": { + "type": "string", + "const": "username/password", + "default": "username/password", + "order": 0 + }, + "username": { + "description": "The username you created to allow Airbyte to access the database.", + "examples": ["AIRBYTE_USER"], + "type": "string", + "title": "Username", + "order": 1 + }, + "password": { + "description": "The password associated with the username.", + "type": "string", + "airbyte_secret": true, + "title": "Password", + "order": 2 + } + } + } + ], + "order": 0 + }, "host": { "description": "The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com).", "examples": ["accountname.us-east-2.aws.snowflakecomputing.com"], "type": "string", "title": "Account Name", - "order": 0 + "order": 1 }, "role": { "description": "The role you created for Airbyte to access Snowflake.", "examples": ["AIRBYTE_ROLE"], "type": "string", "title": "Role", - "order": 1 + "order": 2 }, "warehouse": { "description": "The warehouse you created for Airbyte to access data.", "examples": ["AIRBYTE_WAREHOUSE"], "type": "string", "title": "Warehouse", - "order": 2 + "order": 3 }, "database": { "description": "The database you created for Airbyte to access data.", "examples": ["AIRBYTE_DATABASE"], "type": "string", "title": "Database", - "order": 3 + "order": 4 }, "schema": { "description": "The source Snowflake schema tables.", "examples": ["AIRBYTE_SCHEMA"], "type": "string", "title": "Schema", - "order": 4 - }, - "username": { - "description": "The username you created to allow Airbyte to access the database.", - "examples": ["AIRBYTE_USER"], - "type": "string", - "title": "Username", "order": 5 }, - "password": { - "description": "The password associated with the username.", - "type": "string", - "airbyte_secret": true, - "title": "Password", - "order": 6 - }, "jdbc_url_params": { "description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).", "title": "JDBC URL Params", "type": "string", - "order": 7 + "order": 6 + } + } + }, + "advanced_auth": { + "auth_flow_type": "oauth2.0", + "predicate_key": ["credentials", "auth_type"], + "predicate_value": "OAuth", + "oauth_config_specification": { + "oauth_user_input_from_connector_config_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "host": { + "type": "string", + "path_in_connector_config": ["host"] + } + } + }, + "complete_oauth_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "access_token": { + "type": "string", + "path_in_connector_config": ["credentials", "access_token"] + }, + "refresh_token": { + "type": "string", + "path_in_connector_config": ["credentials", "refresh_token"] + } + } + }, + "complete_oauth_server_input_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string" + }, + "client_secret": { + "type": "string" + } + } + }, + "complete_oauth_server_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string", + "path_in_connector_config": ["credentials", "client_id"] + }, + "client_secret": { + "type": "string", + "path_in_connector_config": ["credentials", "client_secret"] + } + } } } } diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java index 011c4aad414b..9c81721ebc70 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java @@ -4,19 +4,25 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableSet; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.snowflake.SnowflakeSource; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import java.math.BigDecimal; import java.nio.file.Path; import java.sql.JDBCType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; class SnowflakeJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { @@ -26,10 +32,6 @@ class SnowflakeJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { static void init() { snConfig = Jsons .deserialize(IOs.readFile(Path.of("secrets/config.json"))); - } - - @BeforeEach - public void setup() throws Exception { // due to case sensitiveness in SnowflakeDB SCHEMA_NAME = "JDBC_INTEGRATION_TEST1"; SCHEMA_NAME2 = "JDBC_INTEGRATION_TEST2"; @@ -49,7 +51,10 @@ public void setup() throws Exception { ID_VALUE_3 = new BigDecimal(3); ID_VALUE_4 = new BigDecimal(4); ID_VALUE_5 = new BigDecimal(5); + } + @BeforeEach + public void setup() throws Exception { super.setup(); } @@ -79,4 +84,11 @@ public AbstractJdbcSource getJdbcSource() { return new SnowflakeSource(); } + @Test + void testCheckFailure() throws Exception { + ((ObjectNode) config.get("credentials")).put("password", "fake"); + final AirbyteConnectionStatus actual = source.check(config); + assertEquals(Status.FAILED, actual.getStatus()); + } + } diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAcceptanceTest.java index 9f7047ea306c..9b676083f03e 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAcceptanceTest.java @@ -4,7 +4,10 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; @@ -26,6 +29,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; public class SnowflakeSourceAcceptanceTest extends SourceAcceptanceTest { @@ -35,8 +39,8 @@ public class SnowflakeSourceAcceptanceTest extends SourceAcceptanceTest { private static final String STREAM_NAME2 = "ID_AND_NAME2"; // config which refers to the schema that the test is being run in. - private JsonNode config; - private JdbcDatabase database; + protected JsonNode config; + protected JdbcDatabase database; @Override protected String getImageName() { @@ -90,17 +94,7 @@ protected JsonNode getState() { // for each test we create a new schema in the database. run the test in there and then remove it. @Override protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { - config = Jsons.clone(getStaticConfig()); - database = Databases.createJdbcDatabase( - config.get("username").asText(), - config.get("password").asText(), - String.format("jdbc:snowflake://%s/", - config.get("host").asText()), - SnowflakeSource.DRIVER_CLASS, - Map.of("role", config.get("role").asText(), - "warehouse", config.get("warehouse").asText(), - "database", config.get("database").asText())); - + database = setupDataBase(); final String createSchemaQuery = String.format("CREATE SCHEMA IF NOT EXISTS %s", SCHEMA_NAME); final String createTableQuery1 = String .format("CREATE OR REPLACE TABLE %s.%s (ID INTEGER, NAME VARCHAR(200))", SCHEMA_NAME, @@ -130,4 +124,30 @@ protected void tearDown(final TestDestinationEnv testEnv) throws Exception { database.close(); } + protected JdbcDatabase setupDataBase() { + config = Jsons.clone(getStaticConfig()); + return Databases.createJdbcDatabase( + config.get("credentials").get("username").asText(), + config.get("credentials").get("password").asText(), + String.format("jdbc:snowflake://%s/", + config.get("host").asText()), + SnowflakeSource.DRIVER_CLASS, + Map.of("role", config.get("role").asText(), + "warehouse", config.get("warehouse").asText(), + "database", config.get("database").asText())); + } + + @Test + public void testBackwardCompatibilityAfterAddingOAuth() throws Exception { + final JsonNode deprecatedStyleConfig = Jsons.clone(config); + final JsonNode password = deprecatedStyleConfig.get("credentials").get("password"); + final JsonNode username = deprecatedStyleConfig.get("credentials").get("username"); + + ((ObjectNode) deprecatedStyleConfig).remove("credentials"); + ((ObjectNode) deprecatedStyleConfig).set("password", password); + ((ObjectNode) deprecatedStyleConfig).set("username", username); + + assertEquals("SUCCEEDED", runCheckAndGetStatusAsString(deprecatedStyleConfig).toUpperCase()); + } + } diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java new file mode 100644 index 000000000000..bdcc57e9e08c --- /dev/null +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceAuthAcceptanceTest.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.zaxxer.hikari.HikariDataSource; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.StreamingJdbcDatabase; +import io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils; +import io.airbyte.integrations.source.snowflake.SnowflakeJdbcStreamingQueryConfiguration; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; +import javax.sql.DataSource; + +public class SnowflakeSourceAuthAcceptanceTest extends SnowflakeSourceAcceptanceTest { + + @Override + protected JdbcDatabase setupDataBase() { + config = getStaticConfig(); + final DataSource dataSource = createDataSource(getStaticConfig()); + return new StreamingJdbcDatabase(dataSource, + JdbcUtils.getDefaultSourceOperations(), + new SnowflakeJdbcStreamingQueryConfiguration()); + } + + private HikariDataSource createDataSource(final JsonNode config) { + HikariDataSource dataSource = new HikariDataSource(); + Properties properties = new Properties(); + + final StringBuilder jdbcUrl = new StringBuilder( + String.format("jdbc:snowflake://%s/?", config.get("host").asText())); + jdbcUrl.append(String.format( + "role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s", + config.get("role").asText(), + config.get("warehouse").asText(), + config.get("database").asText(), + config.get("schema").asText(), + // Needed for JDK17 - see + // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow + "JSON", + true)); + if (config.has("jdbc_url_params")) { + jdbcUrl.append(config.get("jdbc_url_params").asText()); + } + + var credentials = config.get("credentials"); + try { + properties.setProperty("client_id", credentials.get("client_id").asText()); + properties.setProperty("client_secret", credentials.get("client_secret").asText()); + properties.setProperty("refresh_token", credentials.get("refresh_token").asText()); + properties.setProperty("host", config.get("host").asText()); + var accessToken = SnowflakeDataSourceUtils.getAccessTokenUsingRefreshToken( + config.get("host").asText(), credentials.get("client_id").asText(), + credentials.get("client_secret").asText(), credentials.get("refresh_token").asText()); + properties.put("authenticator", "oauth"); + properties.put("token", accessToken); + } catch (IOException e) { + throw new RuntimeException(e); + } + + properties.put("warehouse", config.get("warehouse").asText()); + properties.put("account", config.get("host").asText()); + properties.put("role", config.get("role").asText()); + // allows queries to contain any number of statements + properties.put("MULTI_STATEMENT_COUNT", "0"); + // https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application + // identify airbyte traffic to snowflake to enable partnership & optimization opportunities + properties.put("dataSource.application", "airbyte"); + // Needed for JDK17 - see + // https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow + properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON"); + + dataSource.setDriverClassName("net.snowflake.client.jdbc.SnowflakeDriver"); + dataSource.setJdbcUrl(jdbcUrl.toString()); + dataSource.setDataSourceProperties(properties); + return dataSource; + } + + JsonNode getStaticConfig() { + return Jsons + .deserialize(IOs.readFile(Path.of("secrets/config_auth.json"))); + } + + @Override + public void testBackwardCompatibilityAfterAddingOAuth() throws Exception { + // this test case is not valid for OAuth method + } +} diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java index bf739e15246b..df49c9884d3f 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeSourceDatatypeTest.java @@ -51,8 +51,8 @@ protected Database setupDatabase() throws Exception { private Database getDatabase() { return Databases.createDatabase( - config.get("username").asText(), - config.get("password").asText(), + config.get("credentials").get("username").asText(), + config.get("credentials").get("password").asText(), String.format("jdbc:snowflake://%s/", config.get("host").asText()), SnowflakeSource.DRIVER_CLASS, diff --git a/airbyte-integrations/connectors/source-snowflake/src/test/java/io/airbyte/integrations/source/snowflake/SnowflakeDataSourceUtilsTest.java b/airbyte-integrations/connectors/source-snowflake/src/test/java/io/airbyte/integrations/source/snowflake/SnowflakeDataSourceUtilsTest.java new file mode 100644 index 000000000000..bf7080d82b0a --- /dev/null +++ b/airbyte-integrations/connectors/source-snowflake/src/test/java/io/airbyte/integrations/source/snowflake/SnowflakeDataSourceUtilsTest.java @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.snowflake; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import org.junit.jupiter.api.Test; + +class SnowflakeDataSourceUtilsTest { + + private final String config = """ + { + "host": "host", + "role": "role", + "schema": "SOURCE_SCHEMA", + "database": "DATABASE", + "warehouse": "WAREHOUSE", + "credentials": { + "auth_type": "OAuth", + "client_id": "someid", + "access_token": "**********", + "client_secret": "clientSecret", + "refresh_token": "token" + } + } + """; + private final String expectedJdbcUrl = + "jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE&schema=SOURCE_SCHEMA&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true"; + + @Test + void testBuildJDBCUrl() { + JsonNode expectedConfig = Jsons.deserialize(config); + + String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig); + + assertEquals(expectedJdbcUrl, jdbcURL); + } + + @Test + void testBuildJDBCUrlWithParams() { + JsonNode expectedConfig = Jsons.deserialize(config); + String params = "someParameter1¶m2=someParameter2"; + ((ObjectNode) expectedConfig).put("jdbc_url_params", params); + + String jdbcURL = SnowflakeDataSourceUtils.buildJDBCUrl(expectedConfig); + + assertEquals(expectedJdbcUrl + "&" + params, jdbcURL); + } + +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index 9fb4056af3f5..06723d5b0ef3 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -63,6 +63,7 @@ public OAuthImplementationFactory(final ConfigRepository configRepository, final .put("airbyte/source-shopify", new ShopifyOAuthFlow(configRepository, httpClient)) .put("airbyte/source-tiktok-marketing", new TikTokMarketingOAuthFlow(configRepository, httpClient)) .put("airbyte/destination-snowflake", new DestinationSnowflakeOAuthFlow(configRepository, httpClient)) + .put("airbyte/source-snowflake", new SourceSnowflakeOAuthFlow(configRepository, httpClient)) .build(); } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SourceSnowflakeOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SourceSnowflakeOAuthFlow.java new file mode 100644 index 000000000000..d9c976cf5ea3 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SourceSnowflakeOAuthFlow.java @@ -0,0 +1,144 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.oauth.BaseOAuth2Flow; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.function.Supplier; +import org.apache.http.client.utils.URIBuilder; + +public class SourceSnowflakeOAuthFlow extends BaseOAuth2Flow { + + private static final String AUTHORIZE_URL = "https://%s/oauth/authorize"; + private static final String ACCESS_TOKEN_URL = "https://%s/oauth/token-request"; + + public SourceSnowflakeOAuthFlow(ConfigRepository configRepository, HttpClient httpClient) { + super(configRepository, httpClient); + } + + @VisibleForTesting + public SourceSnowflakeOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, final Supplier stateSupplier) { + super(configRepository, httpClient, stateSupplier); + } + + @Override + protected String formatConsentUrl(UUID definitionId, + String clientId, + String redirectUrl, + JsonNode inputOAuthConfiguration) + throws IOException { + try { + return new URIBuilder(String.format(AUTHORIZE_URL, extractUrl(inputOAuthConfiguration))) + .addParameter("client_id", clientId) + .addParameter("redirect_uri", redirectUrl) + .addParameter("response_type", "code") + .addParameter("state", getState()) + .build().toString(); + } catch (final URISyntaxException e) { + throw new IOException("Failed to format Consent URL for OAuth flow", e); + } + } + + @Override + protected String getAccessTokenUrl(JsonNode inputOAuthConfiguration) { + return String.format(ACCESS_TOKEN_URL, extractUrl(inputOAuthConfiguration)); + } + + @Override + protected String extractCodeParameter(Map queryParams) throws IOException { + return super.extractCodeParameter(queryParams); + } + + @Override + protected Map getAccessTokenQueryParameters(String clientId, + String clientSecret, + String authCode, + String redirectUrl) { + return ImmutableMap.builder() + // required + .put("grant_type", "authorization_code") + .put("code", authCode) + .put("redirect_uri", redirectUrl) + .build(); + } + + @Override + protected Map completeOAuthFlow(final String clientId, + final String clientSecret, + final String authCode, + final String redirectUrl, + final JsonNode inputOAuthConfiguration, + final JsonNode oAuthParamConfig) + throws IOException { + final var accessTokenUrl = getAccessTokenUrl(inputOAuthConfiguration); + final byte[] authorization = Base64.getEncoder() + .encode((clientId + ":" + clientSecret).getBytes(StandardCharsets.UTF_8)); + final HttpRequest request = HttpRequest.newBuilder() + .POST(HttpRequest.BodyPublishers + .ofString(tokenReqContentType.getConverter().apply( + getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl)))) + .uri(URI.create(accessTokenUrl)) + .header("Content-Type", tokenReqContentType.getContentType()) + .header("Accept", "application/json") + .header("Authorization", "Basic " + new String(authorization, StandardCharsets.UTF_8)) + .build(); + try { + final HttpResponse response = httpClient.send(request, + HttpResponse.BodyHandlers.ofString()); + + return extractOAuthOutput(Jsons.deserialize(response.body()), accessTokenUrl); + } catch (final InterruptedException e) { + throw new IOException("Failed to complete OAuth flow", e); + } + } + + @Override + protected Map extractOAuthOutput(JsonNode data, String accessTokenUrl) + throws IOException { + final Map result = new HashMap<>(); + // access_token is valid for only 10 minutes + if (data.has("access_token")) { + result.put("access_token", data.get("access_token").asText()); + } else { + throw new IOException(String.format("Missing 'access_token' in query params from %s", + accessTokenUrl)); + } + + if (data.has("refresh_token")) { + result.put("refresh_token", data.get("refresh_token").asText()); + } else { + throw new IOException(String.format("Missing 'refresh_token' in query params from %s", + accessTokenUrl)); + } + if (data.has("username")) { + result.put("username", data.get("username").asText()); + } else { + throw new IOException(String.format("Missing 'username' in query params from %s", + accessTokenUrl)); + } + return result; + } + + private String extractUrl(JsonNode inputOAuthConfiguration) { + var url = inputOAuthConfiguration.get("host"); + return url == null ? "snowflakecomputing.com" : url.asText(); + } + +} diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SnowflakeOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SnowflakeOAuthFlowTest.java new file mode 100644 index 000000000000..e982170807f0 --- /dev/null +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SnowflakeOAuthFlowTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.json.Jsons; +import io.airbyte.oauth.BaseOAuthFlow; +import io.airbyte.oauth.MoreOAuthParameters; +import java.util.Map; +import org.junit.jupiter.api.Test; + +public class SnowflakeOAuthFlowTest extends BaseOAuthFlowTest { + + @Override + protected BaseOAuthFlow getOAuthFlow() { + return new SourceSnowflakeOAuthFlow(getConfigRepository(), getHttpClient(), this::getConstantState); + } + + @Override + protected String getExpectedConsentUrl() { + return "https://account.aws.snowflakecomputing.com/oauth/authorize?client_id=test_client_id&redirect_uri=https%3A%2F%2Fairbyte.io&response_type=code&state=state"; + } + + @Override + protected Map getExpectedOutput() { + return Map.of( + "access_token", "access_token_response", + "refresh_token", "refresh_token_response", + "username", "username"); + } + + @Override + protected JsonNode getCompleteOAuthOutputSpecification() { + return getJsonSchema(Map.of("access_token", Map.of("type", "string"), "refresh_token", Map.of("type", "string"))); + } + + @Override + protected Map getExpectedFilteredOutput() { + return Map.of( + "access_token", "access_token_response", + "refresh_token", "refresh_token_response", + "client_id", MoreOAuthParameters.SECRET_MASK); + } + + protected JsonNode getOAuthParamConfig() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", "test_client_id") + .put("client_secret", "test_client_secret") + .build()); + } + + @Override + protected JsonNode getInputOAuthConfiguration() { + return Jsons.jsonNode(ImmutableMap.builder() + .put("host", "account.aws.snowflakecomputing.com") + .build()); + } + + protected JsonNode getUserInputFromConnectorConfigSpecification() { + return getJsonSchema(Map.of("host", Map.of("type", "string"))); + } + + @Test + @Override + public void testGetSourceConsentUrlEmptyOAuthSpec() {} + + @Test + @Override + public void testGetDestinationConsentUrlEmptyOAuthSpec() {} + + @Test + @Override + public void testDeprecatedCompleteDestinationOAuth() {} + + @Test + @Override + public void testDeprecatedCompleteSourceOAuth() {} + +} diff --git a/docs/integrations/sources/snowflake.md b/docs/integrations/sources/snowflake.md index 0c9a15e483ba..f500081c8b2a 100644 --- a/docs/integrations/sources/snowflake.md +++ b/docs/integrations/sources/snowflake.md @@ -72,10 +72,38 @@ You can limit this grant down to specific schemas instead of the whole database. Your database user should now be ready for use with Airbyte. +###Authentication +#### There are 2 way ways of oauth supported: login\pass and oauth2. + +### Login and Password +| Field | Description | +|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| [Host](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html) | The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: `accountname.us-east-2.aws.snowflakecomputing.com` | +| [Role](https://docs.snowflake.com/en/user-guide/security-access-control-overview.html#roles) | The role you created in Step 1 for Airbyte to access Snowflake. Example: `AIRBYTE_ROLE` | +| [Warehouse](https://docs.snowflake.com/en/user-guide/warehouses-overview.html#overview-of-warehouses) | The warehouse you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_WAREHOUSE` | +| [Database](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The database you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_DATABASE` | +| [Schema](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name. | +| Username | The username you created in Step 2 to allow Airbyte to access the database. Example: `AIRBYTE_USER` | +| Password | The password associated with the username. | +| [JDBC URL Params](https://docs.snowflake.com/en/user-guide/jdbc-parameters.html) (Optional) | Additional properties to pass to the JDBC URL string when connecting to the database formatted as `key=value` pairs separated by the symbol `&`. Example: `key1=value1&key2=value2&key3=value3` | + + +### OAuth 2.0 +Field | Description | +|---|---| +| [Host](https://docs.snowflake.com/en/user-guide/admin-account-identifier.html) | The host domain of the snowflake instance (must include the account, region, cloud environment, and end with snowflakecomputing.com). Example: `accountname.us-east-2.aws.snowflakecomputing.com` | +| [Role](https://docs.snowflake.com/en/user-guide/security-access-control-overview.html#roles) | The role you created in Step 1 for Airbyte to access Snowflake. Example: `AIRBYTE_ROLE` | +| [Warehouse](https://docs.snowflake.com/en/user-guide/warehouses-overview.html#overview-of-warehouses) | The warehouse you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_WAREHOUSE` | +| [Database](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The database you created in Step 1 for Airbyte to sync data into. Example: `AIRBYTE_DATABASE` | +| [Schema](https://docs.snowflake.com/en/sql-reference/ddl-database.html#database-schema-share-ddl) | The default schema used as the target schema for all statements issued from the connection that do not explicitly specify a schema name. | +| OAuth2 | The Login name and password to obtain auth token. | +| [JDBC URL Params](https://docs.snowflake.com/en/user-guide/jdbc-parameters.html) (Optional) | Additional properties to pass to the JDBC URL string when connecting to the database formatted as `key=value` pairs separated by the symbol `&`. Example: `key1=value1&key2=value2&key3=value3` | + ## Changelog | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.11 | 2022-04-27 | [10953](https://github.com/airbytehq/airbyte/pull/10953) | Implement OAuth flow | | 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats | | 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds | | 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |