Skip to content

Commit

Permalink
Asana oAuth backend. (airbytehq#7049)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro authored and schlattk committed Jan 4, 2022
1 parent 4ed0540 commit 99af76f
Show file tree
Hide file tree
Showing 7 changed files with 352 additions and 39 deletions.
27 changes: 21 additions & 6 deletions airbyte-oauth/src/main/java/io/airbyte/oauth/BaseOAuthFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.oauth;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand Down Expand Up @@ -116,23 +117,37 @@ private Map<String, Object> completeOAuthFlow(final String clientId, final Strin
}
}

/**
* Query parameters to provide the access token url with.
*/
protected Map<String, String> getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) {
return ImmutableMap.<String, String>builder()
// required
.put("client_id", clientId)
.put("redirect_uri", redirectUrl)
.put("client_secret", clientSecret)
.put("code", authCode)
.build();
}

/**
* Once the user is redirected after getting their consent, the API should redirect them to a
* specific redirection URL along with query parameters. This function should parse and extract the
* code from these query parameters in order to continue the OAuth Flow.
*/
protected abstract String extractCodeParameter(Map<String, Object> queryParams) throws IOException;
protected String extractCodeParameter(Map<String, Object> queryParams) throws IOException {
if (queryParams.containsKey("code")) {
return (String) queryParams.get("code");
} else {
throw new IOException("Undefined 'code' from consent redirected url.");
}
}

/**
* Returns the URL where to retrieve the access token from.
*/
protected abstract String getAccessTokenUrl();

/**
* Query parameters to provide the access token url with.
*/
protected abstract Map<String, String> getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl);

/**
* Once the auth code is exchange for a refresh token, the oauth flow implementation can extract and
* returns the values of fields to be used in the connector's configurations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.collect.ImmutableMap;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.flows.AsanaOAuthFlow;
import io.airbyte.oauth.flows.FacebookMarketingOAuthFlow;
import io.airbyte.oauth.flows.TrelloOAuthFlow;
import io.airbyte.oauth.flows.google.GoogleAdsOAuthFlow;
Expand All @@ -19,6 +20,7 @@ public class OAuthImplementationFactory {

public OAuthImplementationFactory(final ConfigRepository configRepository) {
OAUTH_FLOW_MAPPING = ImmutableMap.<String, OAuthFlowImplementation>builder()
.put("airbyte/source-asana", new AsanaOAuthFlow(configRepository))
.put("airbyte/source-facebook-marketing", new FacebookMarketingOAuthFlow(configRepository))
.put("airbyte/source-google-ads", new GoogleAdsOAuthFlow(configRepository))
.put("airbyte/source-google-analytics-v4", new GoogleAnalyticsOAuthFlow(configRepository))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.BaseOAuthFlow;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.util.Map;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.http.client.utils.URIBuilder;

/**
* Following docs from https://developers.asana.com/docs/oauth
*/
public class AsanaOAuthFlow extends BaseOAuthFlow {

private static final String AUTHORIZE_URL = "https://app.asana.com/-/oauth_authorize";
private static final String ACCESS_TOKEN_URL = "https://app.asana.com/-/oauth_token";

public AsanaOAuthFlow(ConfigRepository configRepository) {
super(configRepository);
}

@VisibleForTesting
AsanaOAuthFlow(ConfigRepository configRepository, HttpClient httpClient, Supplier<String> stateSupplier) {
super(configRepository, httpClient, stateSupplier);
}

@Override
protected String formatConsentUrl(UUID definitionId, String clientId, String redirectUrl) throws IOException {
try {
return new URIBuilder(AUTHORIZE_URL)
.addParameter("client_id", clientId)
.addParameter("redirect_uri", redirectUrl)
.addParameter("response_type", "code")
.addParameter("state", getState())
.build().toString();
} catch (URISyntaxException e) {
throw new IOException("Failed to format Consent URL for OAuth flow", e);
}
}

@Override
protected String getAccessTokenUrl() {
return ACCESS_TOKEN_URL;
}

@Override
protected Map<String, String> getAccessTokenQueryParameters(String clientId, String clientSecret, String authCode, String redirectUrl) {
return ImmutableMap.<String, String>builder()
.putAll(super.getAccessTokenQueryParameters(clientId, clientSecret, authCode, redirectUrl))
.put("grant_type", "authorization_code")
.build();
}

@Override
protected Map<String, Object> extractRefreshToken(JsonNode data) throws IOException {
System.out.println(Jsons.serialize(data));
if (data.has("refresh_token")) {
final String refreshToken = data.get("refresh_token").asText();
return Map.of("credentials", Map.of("refresh_token", refreshToken));
} else {
throw new IOException(String.format("Missing 'refresh_token' in query params from %s", ACCESS_TOKEN_URL));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.oauth.BaseOAuthFlow;
import java.io.IOException;
Expand Down Expand Up @@ -54,34 +53,11 @@ protected String formatConsentUrl(final UUID definitionId, final String clientId
}
}

@Override
protected String extractCodeParameter(final Map<String, Object> queryParams) throws IOException {
if (queryParams.containsKey("code")) {
return (String) queryParams.get("code");
} else {
throw new IOException("Undefined 'code' from consent redirected url.");
}
}

@Override
protected String getAccessTokenUrl() {
return ACCESS_TOKEN_URL;
}

@Override
protected Map<String, String> getAccessTokenQueryParameters(final String clientId,
final String clientSecret,
final String authCode,
final String redirectUrl) {
return ImmutableMap.<String, String>builder()
// required
.put("client_id", clientId)
.put("redirect_uri", redirectUrl)
.put("client_secret", clientSecret)
.put("code", authCode)
.build();
}

@Override
protected Map<String, Object> extractRefreshToken(final JsonNode data) throws IOException {
// Facebook does not have refresh token but calls it "long lived access token" instead:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,6 @@ protected String formatConsentUrl(final UUID definitionId, final String clientId
*/
protected abstract String getScope();

@Override
protected String extractCodeParameter(final Map<String, Object> queryParams) throws IOException {
if (queryParams.containsKey("code")) {
return (String) queryParams.get("code");
} else {
throw new IOException("Undefined 'code' from consent redirected url.");
}
}

@Override
protected String getAccessTokenUrl() {
return ACCESS_TOKEN_URL;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.oauth.flows;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AsanaOAuthFlowIntegrationTest {

private static final Logger LOGGER = LoggerFactory.getLogger(AsanaOAuthFlowIntegrationTest.class);
private static final String REDIRECT_URL = "http://localhost:8000/code";
private static final Path CREDENTIALS_PATH = Path.of("secrets/asana.json");

private ConfigRepository configRepository;
private AsanaOAuthFlow asanaOAuthFlow;
private HttpServer server;
private ServerHandler serverHandler;

@BeforeEach
public void setup() throws IOException {
if (!Files.exists(CREDENTIALS_PATH)) {
throw new IllegalStateException(
"Must provide path to a oauth credentials file.");
}
configRepository = mock(ConfigRepository.class);
asanaOAuthFlow = new AsanaOAuthFlow(configRepository);

server = HttpServer.create(new InetSocketAddress(8000), 0);
server.setExecutor(null); // creates a default executor
server.start();
serverHandler = new ServerHandler("code");
server.createContext("/code", serverHandler);
}

@AfterEach
void tearDown() {
server.stop(1);
}

@Test
public void testFullAsanaOAuthFlow() throws InterruptedException, ConfigNotFoundException, IOException, JsonValidationException {
int limit = 20;
final UUID workspaceId = UUID.randomUUID();
final UUID definitionId = UUID.randomUUID();
final String fullConfigAsString = new String(Files.readAllBytes(CREDENTIALS_PATH));
final JsonNode credentialsJson = Jsons.deserialize(fullConfigAsString);
final String clientId = credentialsJson.get("client_id").asText();
when(configRepository.listSourceOAuthParam()).thenReturn(List.of(new SourceOAuthParameter()
.withOauthParameterId(UUID.randomUUID())
.withSourceDefinitionId(definitionId)
.withWorkspaceId(workspaceId)
.withConfiguration(Jsons.jsonNode(ImmutableMap.builder()
.put("client_id", clientId)
.put("client_secret", credentialsJson.get("client_secret").asText())
.build()))));
final String url = asanaOAuthFlow.getSourceConsentUrl(workspaceId, definitionId, REDIRECT_URL);
LOGGER.info("Waiting for user consent at: {}", url);
// TODO: To automate, start a selenium job to navigate to the Consent URL and click on allowing
// access...
while (!serverHandler.isSucceeded() && limit > 0) {
Thread.sleep(1000);
limit -= 1;
}
assertTrue(serverHandler.isSucceeded(), "Failed to get User consent on time");
final Map<String, Object> params = asanaOAuthFlow.completeSourceOAuth(workspaceId, definitionId,
Map.of("code", serverHandler.getParamValue()), REDIRECT_URL);
LOGGER.info("Response from completing OAuth Flow is: {}", params.toString());
assertTrue(params.containsKey("credentials"));
final Map creds = (Map) params.get("credentials");
assertTrue(creds.containsKey("refresh_token"));
assertTrue(creds.get("refresh_token").toString().length() > 0);
}

static class ServerHandler implements HttpHandler {

final private String expectedParam;
private Map responseQuery;
private String paramValue;
private boolean succeeded;

public ServerHandler(String expectedParam) {
this.expectedParam = expectedParam;
this.paramValue = "";
this.succeeded = false;
}

public boolean isSucceeded() {
return succeeded;
}

public String getParamValue() {
return paramValue;
}

public Map getResponseQuery() {
return responseQuery;
}

@Override
public void handle(HttpExchange t) {
final String query = t.getRequestURI().getQuery();
LOGGER.info("Received query: '{}'", query);
final Map<String, String> data;
try {
data = deserialize(query);
final String response;
if (data != null && data.containsKey(expectedParam)) {
paramValue = data.get(expectedParam);
response = String.format("Successfully extracted %s:\n'%s'\nTest should be continuing the OAuth Flow to retrieve the refresh_token...",
expectedParam, paramValue);
responseQuery = data;
LOGGER.info(response);
t.sendResponseHeaders(200, response.length());
succeeded = true;
} else {
response = String.format("Unable to parse query params from redirected url: %s", query);
t.sendResponseHeaders(500, response.length());
}
final OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
} catch (RuntimeException | IOException e) {
LOGGER.error("Failed to parse from body {}", query, e);
}
}

private static Map<String, String> deserialize(String query) {
if (query == null) {
return null;
}
final Map<String, String> result = new HashMap<>();
for (String param : query.split("&")) {
String[] entry = param.split("=");
if (entry.length > 1) {
result.put(entry[0], entry[1]);
} else {
result.put(entry[0], "");
}
}
return result;
}

}

}
Loading

0 comments on commit 99af76f

Please sign in to comment.