From bc1f795701b38f7dfeb6b32d4e49ccec39581588 Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Thu, 14 Oct 2021 12:37:03 +0300 Subject: [PATCH] Asana oAuth backend. --- .../java/io/airbyte/oauth/BaseOAuthFlow.java | 27 ++- .../oauth/OAuthImplementationFactory.java | 2 + .../airbyte/oauth/flows/AsanaOAuthFlow.java | 75 ++++++++ .../flows/FacebookMarketingOAuthFlow.java | 20 -- .../oauth/flows/google/GoogleOAuthFlow.java | 9 - .../AsanaOAuthFlowIntegrationTest.java | 173 ++++++++++++++++++ .../oauth/flows/AsanaOAuthFlowTest.java | 79 ++++++++ 7 files changed, 350 insertions(+), 35 deletions(-) create mode 100644 airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java create mode 100644 airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java create mode 100644 airbyte-oauth/src/test/java/io/airbyte/oauth/flows/AsanaOAuthFlowTest.java diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java index 039b12e96fb0..4445e6f609f3 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java @@ -5,6 +5,7 @@ package io.airbyte.oauth; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -110,23 +111,37 @@ private Map completeOAuthFlow(String clientId, String clientSecr } } + /** + * Query parameters to provide the access token url with. + */ + protected Map getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) { + return ImmutableMap.builder() + // required + .put("client_id", clientId) + .put("redirect_uri", redirectUrl) + .put("client_secret", clientSecret) + .put("code", authCode) + .build(); + } + /** * Once the user is redirected after getting their consent, the API should redirect them to a * specific redirection URL along with query parameters. This function should parse and extract the * code from these query parameters in order to continue the OAuth Flow. */ - protected abstract String extractCodeParameter(Map queryParams) throws IOException; + protected String extractCodeParameter(Map queryParams) throws IOException { + if (queryParams.containsKey("code")) { + return (String) queryParams.get("code"); + } else { + throw new IOException("Undefined 'code' from consent redirected url."); + } + } /** * Returns the URL where to retrieve the access token from. */ protected abstract String getAccessTokenUrl(); - /** - * Query parameters to provide the access token url with. - */ - protected abstract Map getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl); - /** * Once the auth code is exchange for a refresh token, the oauth flow implementation can extract and * returns the values of fields to be used in the connector's configurations. diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java index 22658cf07027..d2d81ad0186d 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/OAuthImplementationFactory.java @@ -8,6 +8,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.oauth.flows.FacebookMarketingOAuthFlow; import io.airbyte.oauth.flows.TrelloOAuthFlow; +import io.airbyte.oauth.flows.AsanaOAuthFlow; import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow; import io.airbyte.oauth.flows.google.GoogleAnalyticsOAuthFlow; import io.airbyte.oauth.flows.google.GoogleSearchConsoleOAuthFlow; @@ -24,6 +25,7 @@ public OAuthImplementationFactory(ConfigRepository configRepository) { .put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOAuthFlow(configRepository)) .put("airbyte/source-google-search-console", new GoogleSearchConsoleOAuthFlow(configRepository)) .put("airbyte/source-trello", new TrelloOAuthFlow(configRepository)) + .put("airbyte/source-asana", new AsanaOAuthFlow(configRepository)) .build(); } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java new file mode 100644 index 000000000000..5cba63f6d001 --- /dev/null +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/AsanaOAuthFlow.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.airbyte.oauth.BaseOAuthFlow; +import io.airbyte.config.persistence.ConfigRepository; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.net.URISyntaxException; +import java.net.http.HttpClient; +import java.util.function.Supplier; +import org.apache.http.client.utils.URIBuilder; + +/** + * Following docs from https://developers.asana.com/docs/oauth + */ +public class AsanaOAuthFlow extends BaseOAuthFlow { + + private static final String AUTHORIZE_URL = "https://app.asana.com/-/oauth_authorize"; + private static final String ACCESS_TOKEN_URL = "https://app.asana.com/-/oauth_token"; + + public AsanaOAuthFlow(ConfigRepository configRepository) { + super(configRepository); + } + + @VisibleForTesting + AsanaOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, Supplier stateSupplier) { + super(configRepository, httpClient, stateSupplier); + } + + @Override + protected String formatConsentUrl(UUID definitionId, String clientId, String redirectUrl) throws IOException{ + try { + return new URIBuilder(AUTHORIZE_URL) + .addParameter("client_id", clientId) + .addParameter("redirect_uri", redirectUrl) + .addParameter("response_type", "code") + .addParameter("state", getState()) + .build().toString(); + } catch (URISyntaxException e) { + throw new IOException("Failed to format Consent URL for OAuth flow", e); + } + } + + + @Override + protected String getAccessTokenUrl() { + return ACCESS_TOKEN_URL; + } + + protected Map getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) { + return ImmutableMap.builder() + .putAll(super.getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl)) + .put("grant_type", "authorization_code") + .build(); + } + + @Override + protected Map extractRefreshToken(JsonNode data) throws IOException { + System.out.println(Jsons.serialize(data)); + if (data.has("refresh_token")) { + final String refreshToken = data.get("refresh_token").asText(); + return Map.of("credentials", Map.of("refresh_token", refreshToken)); + } else { + throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL)); + } + } +} diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java index c4af0df41bb3..8a6a747aa5b6 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/FacebookMarketingOAuthFlow.java @@ -6,7 +6,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.oauth.BaseOAuthFlow; import java.io.IOException; @@ -54,30 +53,11 @@ protected String formatConsentUrl(UUID definitionId, String clientId, String red } } - @Override - protected String extractCodeParameter(Map queryParams) throws IOException { - if (queryParams.containsKey("code")) { - return (String) queryParams.get("code"); - } else { - throw new IOException("Undefined 'code' from consent redirected url."); - } - } - @Override protected String getAccessTokenUrl() { return ACCESS_TOKEN_URL; } - @Override - protected Map getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) { - return ImmutableMap.builder() - // required - .put("client_id", clientId) - .put("redirect_uri", redirectUrl) - .put("client_secret", clientSecret) - .put("code", authCode) - .build(); - } @Override protected Map extractRefreshToken(JsonNode data) throws IOException { diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java index 7ca08141b747..5818a6713066 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/google/GoogleOAuthFlow.java @@ -63,15 +63,6 @@ protected String formatConsentUrl(UUID definitionId, String clientId, String red */ protected abstract String getScope(); - @Override - protected String extractCodeParameter(Map queryParams) throws IOException { - if (queryParams.containsKey("code")) { - return (String) queryParams.get("code"); - } else { - throw new IOException("Undefined 'code' from consent redirected url."); - } - } - @Override protected String getAccessTokenUrl() { return ACCESS_TOKEN_URL; diff --git a/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java new file mode 100644 index 000000000000..4f393a68ac8c --- /dev/null +++ b/airbyte-oauth/src/test-integration/java/io.airbyte.oauth.flows/AsanaOAuthFlowIntegrationTest.java @@ -0,0 +1,173 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsanaOAuthFlowIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(AsanaOAuthFlowIntegrationTest.class); + private static final String REDIRECT_URL = "http://localhost:8000/code"; + private static final Path CREDENTIALS_PATH = Path.of("secrets/asana.json"); + + private ConfigRepository configRepository; + private AsanaOAuthFlow asanaOAuthFlow; + private HttpServer server; + private ServerHandler serverHandler; + + @BeforeEach + public void setup() throws IOException { + if (!Files.exists(CREDENTIALS_PATH)) { + throw new IllegalStateException( + "Must provide path to a oauth credentials file."); + } + configRepository = mock(ConfigRepository.class); + asanaOAuthFlow = new AsanaOAuthFlow(configRepository); + + server = HttpServer.create(new InetSocketAddress(8000), 0); + server.setExecutor(null); // creates a default executor + server.start(); + serverHandler = new ServerHandler("code"); + server.createContext("/code", serverHandler); + } + + @AfterEach + void tearDown() { + server.stop(1); + } + + @Test + public void testFullAsanaOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException { + int limit = 20; + final UUID workspaceId = UUID.randomUUID(); + final UUID definitionId = UUID.randomUUID(); + final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH)); + final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString); + final String clientId = credentialsJson.get("client_id").asText(); + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", clientId) + .put("client_secret", credentialsJson.get("client_secret").asText()) + .build())))); + final String url = asanaOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); + LOGGER.info("Waiting for user consent at: {}", url); + // TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing + // access... + while (!serverHandler.isSucceeded() && limit > 0) { + Thread.sleep(1000); + limit -= 1; + } + assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time"); + final Map params = asanaOAuthFlow.completeSourceOAuth(workspaceId, definitionId, + Map.of("code", serverHandler.getParamValue()), REDIRECT_URL); + LOGGER.info("Response from completing OAuth Flow is: {}", params.toString()); + assertTrue(params.containsKey("credentials")); + final Map creds = (Map)params.get("credentials"); + assertTrue(creds.containsKey("refresh_token")); + assertTrue(creds.get("refresh_token").toString().length() > 0); + } + + static class ServerHandler implements HttpHandler { + + final private String expectedParam; + private Map responseQuery; + private String paramValue; + private boolean succeeded; + + public ServerHandler(String expectedParam) { + this.expectedParam = expectedParam; + this.paramValue = ""; + this.succeeded = false; + } + + public boolean isSucceeded() { + return succeeded; + } + + public String getParamValue() { + return paramValue; + } + + public Map getResponseQuery() { + return responseQuery; + } + + @Override + public void handle(HttpExchange t) { + final String query = t.getRequestURI().getQuery(); + LOGGER.info("Received query: '{}'", query); + final Map data; + try { + data = deserialize(query); + final String response; + if (data != null && data.containsKey(expectedParam)) { + paramValue = data.get(expectedParam); + response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...", + expectedParam, paramValue); + responseQuery = data; + LOGGER.info(response); + t.sendResponseHeaders(200, response.length()); + succeeded = true; + } else { + response = String.format("Unable to parse query params from redirected url: %s", query); + t.sendResponseHeaders(500, response.length()); + } + final OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } catch (RuntimeException | IOException e) { + LOGGER.error("Failed to parse from body {}", query, e); + } + } + + private static Map deserialize(String query) { + if (query == null) { + return null; + } + final Map result = new HashMap<>(); + for (String param : query.split("&")) { + String[] entry = param.split("="); + if (entry.length > 1) { + result.put(entry[0], entry[1]); + } else { + result.put(entry[0], ""); + } + } + return result; + } + + } + +} diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/AsanaOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/AsanaOAuthFlowTest.java new file mode 100644 index 000000000000..f2f42bc222c6 --- /dev/null +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/AsanaOAuthFlowTest.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.oauth.flows; + +import static org.mockito.ArgumentMatchers.any; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import io.airbyte.validation.json.JsonValidationException; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigRepository; +import java.net.http.HttpClient; +import java.net.http.HttpResponse; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import java.net.URLEncoder; + +public class AsanaOAuthFlowTest { + private UUID workspaceId; + private UUID definitionId; + private ConfigRepository configRepository; + private AsanaOAuthFlow asanaoAuthFlow; + private HttpClient httpClient; + + private static final String REDIRECT_URL = "https://airbyte.io"; + + private static String getConstantState() { + return "state"; + } + + @BeforeEach + public void setup() throws IOException, JsonValidationException { + workspaceId = UUID.randomUUID(); + definitionId = UUID.randomUUID(); + configRepository = mock(ConfigRepository.class); + httpClient = mock(HttpClient.class); + when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter() + .withOauthParameterId(UUID.randomUUID()) + .withSourceDefinitionId(definitionId) + .withWorkspaceId(workspaceId) + .withConfiguration(Jsons.jsonNode(ImmutableMap.builder() + .put("client_id", "test_client_id") + .put("client_secret", "test_client_secret") + .build())))); + asanaoAuthFlow = new AsanaOAuthFlow(configRepository, httpClient, AsanaOAuthFlowTest::getConstantState); + + } + + @Test + public void testGetSourceConcentUrl() throws IOException, InterruptedException, ConfigNotFoundException { + final String concentUrl = + asanaoAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL); + assertEquals(concentUrl, "https://app.asana.com/-/oauth_authorize?client_id=test_client_id&redirect_uri=https%3A%2F%2Fairbyte.io&response_type=code&state=state"); + } + + @Test + public void testCompleteSourceOAuth() throws IOException, JsonValidationException, InterruptedException, ConfigNotFoundException { + + Map returnedCredentials = Map.of("refresh_token", "refresh_token_response"); + final HttpResponse response = mock(HttpResponse.class); + when(response.body()).thenReturn(Jsons.serialize(returnedCredentials)); + when(httpClient.send(any(), any())).thenReturn(response); + final Map queryParams = Map.of("code", "test_code"); + final Map actualQueryParams = + asanaoAuthFlow.completeSourceOAuth(workspaceId, definitionId, queryParams, REDIRECT_URL); + assertEquals(Jsons.serialize(Map.of("credentials", returnedCredentials)), Jsons.serialize(actualQueryParams)); + } + +}