Skip to content

Commit

Permalink
Add a config for instance wide oauth parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristopheDuong committed Sep 1, 2021
1 parent 1f7edaa commit e17bd6c
Show file tree
Hide file tree
Showing 13 changed files with 344 additions and 8 deletions.
42 changes: 42 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,48 @@ paths:
$ref: "#/components/schemas/WebBackendConnectionRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/sources/create:
post:
tags:
- web_backend
summary: Create a source, optionally injecting extra oauth parameters if necessary
operationId: webBackendCreateSource
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SourceCreate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/SourceRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/destinations/create:
post:
tags:
- web_backend
summary: Create a destination, optionally injecting extra oauth parameters if necessary
operationId: webBackendCreateDestination
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/DestinationCreate"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/DestinationRead"
"422":
$ref: "#/components/responses/InvalidInputResponse"
/v1/web_backend/sources/recreate:
post:
tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public enum ConfigSchema implements AirbyteConfig {
StandardSyncOperation.class,
standardSyncOperation -> standardSyncOperation.getOperationId().toString(),
"operationId"),

SOURCE_OAUTH_PARAM("SourceOAuthParameter.yaml", SourceOAuthParameter.class,
sourceOAuthParameter -> sourceOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),
DESTINATION_OAUTH_PARAM("DestinationOAuthParameter.yaml", DestinationOAuthParameter.class,
destinationOAuthParameter -> destinationOAuthParameter.getOauthParameterId().toString(),
"oauthParameterId"),

STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml", StandardSyncSummary.class),

// worker
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/DestinationOAuthParameter.yaml
title: DestinationOAuthParameter
description: OAuth parameters used when connecting to destination
type: object
required:
- oauthParameterId
- destinationDefinitionId
- configuration
additionalProperties: false
properties:
oauthParameterId:
type: string
format: uuid
destinationDefinitionId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
configuration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/SourceOAuthParameter.yaml
title: SourceOAuthParameter
description: OAuth parameters used when connecting to source
type: object
required:
- oauthParameterId
- sourceDefinitionId
- configuration
additionalProperties: false
properties:
oauthParameterId:
type: string
format: uuid
sourceDefinitionId:
type: string
format: uuid
workspaceId:
type: string
format: uuid
configuration:
description: Integration specific blob. Must be a valid JSON string.
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -211,6 +213,33 @@ public List<StandardSyncOperation> listStandardSyncOperations() throws IOExcepti
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class);
}

public SourceOAuthParameter getSourceOAuthParams(final UUID SourceOAuthParameterId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameterId.toString(), SourceOAuthParameter.class);
}

public void writeSourceOAuthParam(final SourceOAuthParameter SourceOAuthParameter) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.getOauthParameterId().toString(), SourceOAuthParameter);
}

public List<SourceOAuthParameter> listSourceOAuthParam() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class);
}

public DestinationOAuthParameter getDestinationOAuthParams(final UUID destinationOAuthParameterId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameterId.toString(), DestinationOAuthParameter.class);
}

public void writeDestinationOAuthParam(final DestinationOAuthParameter destinationOAuthParameter) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameter.getOauthParameterId().toString(),
destinationOAuthParameter);
}

public List<DestinationOAuthParameter> listDestinationOAuthParam() throws JsonValidationException, IOException {
return persistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class);
}

public <T> void replaceAllConfigs(final Map<AirbyteConfig, Stream<T>> configs, final boolean dryRun) throws IOException {
persistence.replaceAllConfigs(configs, dryRun);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,14 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobHistoryHandler,
schedulerHandler,
operationsHandler);
webBackendSourceHandler = new WebBackendSourceHandler(sourceHandler, schedulerHandler, workspaceHelper);
webBackendDestinationHandler = new WebBackendDestinationHandler(destinationHandler, schedulerHandler, workspaceHelper);
oAuthHandler = new OAuthHandler(configRepository);
webBackendSourceHandler = new WebBackendSourceHandler(sourceHandler, schedulerHandler, workspaceHelper, oAuthHandler);
webBackendDestinationHandler = new WebBackendDestinationHandler(destinationHandler, schedulerHandler, workspaceHelper, oAuthHandler);
healthCheckHandler = new HealthCheckHandler(configRepository);
archiveHandler = new ArchiveHandler(configs.getAirbyteVersion(), configRepository, jobPersistence, workspaceHelper, archiveTtlManager);
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, jobsDatabase);
oAuthHandler = new OAuthHandler();
this.configs = configs;
}

Expand Down Expand Up @@ -570,6 +570,16 @@ public WebBackendConnectionReadList webBackendListConnectionsForWorkspace(final
return execute(() -> webBackendConnectionsHandler.webBackendListConnectionsForWorkspace(workspaceIdRequestBody));
}

@Override
public SourceRead webBackendCreateSource(SourceCreate sourceCreate) {
return execute(() -> webBackendSourceHandler.webBackendCreateSource(sourceCreate));
}

@Override
public DestinationRead webBackendCreateDestination(DestinationCreate destinationCreate) {
return execute(() -> webBackendDestinationHandler.webBackendCreateDestination(destinationCreate));
}

@Override
public DestinationRead webBackendRecreateDestination(final DestinationRecreate destinationRecreate) {
return execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,32 @@

package io.airbyte.server.handlers;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.api.model.CompleteDestinationOAuthRequest;
import io.airbyte.api.model.CompleteSourceOauthRequest;
import io.airbyte.api.model.DestinationOauthConsentRequest;
import io.airbyte.api.model.OAuthConsentRead;
import io.airbyte.api.model.SourceOauthConsentRequest;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.server.errors.ApplicationErrorKnownException;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
import java.util.UUID;

public class OAuthHandler {

public static final String OAUTH_PROPERTY_NAME = "oauth";
private final ConfigRepository configRepository;

public OAuthHandler(final ConfigRepository configRepository) {
this.configRepository = configRepository;
}

public OAuthConsentRead getSourceOAuthConsent(SourceOauthConsentRequest sourceDefinitionIdRequestBody) {
// TODO: Implement OAuth module to be called here https://github.com/airbytehq/airbyte/issues/5641
throw new ApplicationErrorKnownException("Source connector does not supports OAuth yet.");
Expand All @@ -54,4 +70,28 @@ public Map<String, Object> completeDestinationOAuth(CompleteDestinationOAuthRequ
throw new ApplicationErrorKnownException("Destination connector does not supports OAuth yet.");
}

public JsonNode injectSourceOAuthParameters(UUID sourceDefinitionId, UUID workspaceId, JsonNode sourceConnectorConfig)
throws JsonValidationException, IOException {
configRepository.listSourceOAuthParam().stream()
.filter(p -> sourceDefinitionId.equals(p.getSourceDefinitionId()))
.filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId()))
// we prefer params specific to a workspace before global ones
.min(Comparator.nullsLast(Comparator.comparing(SourceOAuthParameter::getWorkspaceId))
.thenComparing(SourceOAuthParameter::getOauthParameterId))
.ifPresent(oAuthParameter -> ((ObjectNode) sourceConnectorConfig).set(OAUTH_PROPERTY_NAME, oAuthParameter.getConfiguration()));
return sourceConnectorConfig;
}

public JsonNode injectDestinationOAuthParameters(UUID destinationDefinitionId, UUID workspaceId, JsonNode destinationConnectorConfig)
throws JsonValidationException, IOException {
configRepository.listDestinationOAuthParam().stream()
.filter(p -> destinationDefinitionId.equals(p.getDestinationDefinitionId()))
.filter(p -> p.getWorkspaceId() == null || workspaceId.equals(p.getWorkspaceId()))
// we prefer params specific to a workspace before global ones
.min(Comparator.nullsLast(Comparator.comparing(DestinationOAuthParameter::getWorkspaceId))
.thenComparing(DestinationOAuthParameter::getOauthParameterId))
.ifPresent(oAuthParameter -> ((ObjectNode) destinationConnectorConfig).set(OAUTH_PROPERTY_NAME, oAuthParameter.getConfiguration()));
return destinationConnectorConfig;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(ConnectionRead co

private SourceRead getSourceRead(ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId());
return sourceHandler.getSource(sourceIdRequestBody);
SourceRead sourceRead = sourceHandler.getSource(sourceIdRequestBody);

return sourceRead;
}

private DestinationRead getDestinationRead(ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,18 @@ public class WebBackendDestinationHandler {

private final DestinationHandler destinationHandler;

private final OAuthHandler oAuthHandler;
private final SchedulerHandler schedulerHandler;
private final WorkspaceHelper workspaceHelper;

public WebBackendDestinationHandler(final DestinationHandler destinationHandler,
final SchedulerHandler schedulerHandler,
final WorkspaceHelper workspaceHelper) {
final WorkspaceHelper workspaceHelper,
final OAuthHandler oAuthHandler) {
this.destinationHandler = destinationHandler;
this.schedulerHandler = schedulerHandler;
this.workspaceHelper = workspaceHelper;
this.oAuthHandler = oAuthHandler;
}

public DestinationRead webBackendRecreateDestinationAndCheck(DestinationRecreate destinationRecreate)
Expand Down Expand Up @@ -92,4 +95,13 @@ public DestinationRead webBackendRecreateDestinationAndCheck(DestinationRecreate
throw new ConnectFailureKnownException("Unable to connect to destination");
}

public DestinationRead webBackendCreateDestination(DestinationCreate destinationCreate)
throws JsonValidationException, ConfigNotFoundException, IOException {
destinationCreate.connectionConfiguration(oAuthHandler.injectDestinationOAuthParameters(
destinationCreate.getDestinationDefinitionId(),
destinationCreate.getWorkspaceId(),
destinationCreate.getConnectionConfiguration()));
return destinationHandler.createDestination(destinationCreate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,18 @@ public class WebBackendSourceHandler {

private final SourceHandler sourceHandler;

private final OAuthHandler oAuthHandler;
private final SchedulerHandler schedulerHandler;
private final WorkspaceHelper workspaceHelper;

public WebBackendSourceHandler(final SourceHandler sourceHandler, final SchedulerHandler schedulerHandler, final WorkspaceHelper workspaceHelper) {
public WebBackendSourceHandler(final SourceHandler sourceHandler,
final SchedulerHandler schedulerHandler,
final WorkspaceHelper workspaceHelper,
final OAuthHandler oAuthHandler) {
this.sourceHandler = sourceHandler;
this.schedulerHandler = schedulerHandler;
this.workspaceHelper = workspaceHelper;
this.oAuthHandler = oAuthHandler;
}

public SourceRead webBackendRecreateSourceAndCheck(SourceRecreate sourceRecreate)
Expand Down Expand Up @@ -86,4 +91,12 @@ public SourceRead webBackendRecreateSourceAndCheck(SourceRecreate sourceRecreate
throw new ConnectFailureKnownException("Unable to connect to source");
}

public SourceRead webBackendCreateSource(SourceCreate sourceCreate) throws JsonValidationException, ConfigNotFoundException, IOException {
sourceCreate.connectionConfiguration(oAuthHandler.injectSourceOAuthParameters(
sourceCreate.getSourceDefinitionId(),
sourceCreate.getWorkspaceId(),
sourceCreate.getConnectionConfiguration()));
return sourceHandler.createSource(sourceCreate);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class WebBackendDestinationHandlerTest {
private WebBackendDestinationHandler wbDestinationHandler;

private DestinationHandler destinationHandler;
private OAuthHandler oAuthHandler;
private SchedulerHandler schedulerHandler;
private WorkspaceHelper workspaceHelper;

Expand All @@ -66,7 +67,8 @@ public void setup() throws IOException {
destinationHandler = mock(DestinationHandler.class);
schedulerHandler = mock(SchedulerHandler.class);
workspaceHelper = mock(WorkspaceHelper.class);
wbDestinationHandler = new WebBackendDestinationHandler(destinationHandler, schedulerHandler, workspaceHelper);
oAuthHandler = mock(OAuthHandler.class);
wbDestinationHandler = new WebBackendDestinationHandler(destinationHandler, schedulerHandler, workspaceHelper, oAuthHandler);

final StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination();
DestinationConnection destination =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class WebBackendSourceHandlerTest {

private WebBackendSourceHandler wbSourceHandler;

private OAuthHandler oAuthHandler;
private SourceHandler sourceHandler;
private SchedulerHandler schedulerHandler;
private WorkspaceHelper workspaceHelper;
Expand All @@ -63,10 +64,11 @@ public class WebBackendSourceHandlerTest {

@BeforeEach
public void setup() throws IOException {
oAuthHandler = mock(OAuthHandler.class);
sourceHandler = mock(SourceHandler.class);
schedulerHandler = mock(SchedulerHandler.class);
workspaceHelper = mock(WorkspaceHelper.class);
wbSourceHandler = new WebBackendSourceHandler(sourceHandler, schedulerHandler, workspaceHelper);
wbSourceHandler = new WebBackendSourceHandler(sourceHandler, schedulerHandler, workspaceHelper, oAuthHandler);

final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSource();
SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID());
Expand Down
Loading

0 comments on commit e17bd6c

Please sign in to comment.