Skip to content

Commit

Permalink
implement migration to create workspace_service_account table (#11943)
Browse files Browse the repository at this point in the history
* implement migration to create workspace_service_account table

* make all columns non nullable

* introduce persistence code for service account table (#11944)

* implement persistence code for workspace_service_account table

* update yaml

* implement secret handling for workspace_service_account table (#11946)

* implement secret handling for workspace_service_account table

* add new line to the mock json

* get rid of file

* address review comments

* update method name and add comment
  • Loading branch information
subodh1810 authored Apr 26, 2022
1 parent 0c12ad9 commit 367b863
Show file tree
Hide file tree
Showing 21 changed files with 743 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.59.004", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.65.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ public enum ConfigSchema implements AirbyteConfig {
standardWorkspace -> standardWorkspace.getWorkspaceId().toString(),
"workspaceId"),

WORKSPACE_SERVICE_ACCOUNT("WorkspaceServiceAccount.yaml",
WorkspaceServiceAccount.class,
workspaceServiceAccount -> workspaceServiceAccount.getWorkspaceId().toString(),
"workspaceId"),

// source
STANDARD_SOURCE_DEFINITION("StandardSourceDefinition.yaml",
StandardSourceDefinition.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/WorkspaceServiceAccount.yaml
title: WorkspaceServiceAccount
description: service account attached to a workspace
type: object
required:
- workspaceId
- serviceAccountId
- serviceAccountEmail
- jsonCredential
- hmacKey
additionalProperties: false
properties:
workspaceId:
type: string
format: uuid
serviceAccountId:
type: string
serviceAccountEmail:
type: string
jsonCredential:
# Ref : io.airbyte.config.persistence.MockData#workspaceServiceAccounts() for sample data
description: Represents the JSON key generated for the service account
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
hmacKey:
# Ref : io.airbyte.config.persistence.MockData#workspaceServiceAccounts() for sample data
description: Represents the secret and access id of generated HMAC key for the service account
type: object
existingJavaType: com.fasterxml.jackson.databind.JsonNode
1 change: 1 addition & 0 deletions airbyte-config/persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies {
implementation 'commons-io:commons-io:2.7'
implementation 'com.google.cloud:google-cloud-secretmanager:2.0.5'

testImplementation 'org.hamcrest:hamcrest-all:1.3'
testImplementation "org.testcontainers:postgresql:1.15.3"
testImplementation project(':airbyte-test-utils')
integrationTestJavaImplementation project(':airbyte-config:persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.enums.ActorType;
Expand Down Expand Up @@ -970,4 +971,15 @@ private Condition includeTombstones(final Field<Boolean> tombstoneField, final b
}
}

public WorkspaceServiceAccount getWorkspaceServiceAccountNoSecrets(final UUID workspaceId)
throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.WORKSPACE_SERVICE_ACCOUNT, workspaceId.toString(), WorkspaceServiceAccount.class);
}

public void writeWorkspaceServiceAccountNoSecrets(final WorkspaceServiceAccount workspaceServiceAccount)
throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.WORKSPACE_SERVICE_ACCOUNT, workspaceServiceAccount.getWorkspaceId().toString(),
workspaceServiceAccount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.STATE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE_SERVICE_ACCOUNT;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.select;

Expand Down Expand Up @@ -45,6 +46,7 @@
import io.airbyte.config.StandardSyncState;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.State;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
Expand Down Expand Up @@ -127,6 +129,8 @@ public <T> T getConfig(final AirbyteConfig configType, final String configId, fi
return (T) getActorCatalog(configId);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (T) getActorCatalogFetchEvent(configId);
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
return (T) getWorkspaceServiceAccount(configId);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand All @@ -138,6 +142,12 @@ private StandardWorkspace getStandardWorkspace(final String configId) throws IOE
return result.get(0).getConfig();
}

private WorkspaceServiceAccount getWorkspaceServiceAccount(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<WorkspaceServiceAccount>> result = listWorkspaceServiceAccountWithMetadata(Optional.of(UUID.fromString(configId)));
validate(configId, result, ConfigSchema.WORKSPACE_SERVICE_ACCOUNT);
return result.get(0).getConfig();
}

private StandardSourceDefinition getStandardSourceDefinition(final String configId) throws IOException, ConfigNotFoundException {
final List<ConfigWithMetadata<StandardSourceDefinition>> result =
listStandardSourceDefinitionWithMetadata(Optional.of(UUID.fromString(configId)));
Expand Down Expand Up @@ -272,6 +282,8 @@ public <T> ConfigWithMetadata<T> getConfigWithMetadata(final AirbyteConfig confi
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listActorCatalogFetchEventWithMetadata(configIdOpt), configType);
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
return (ConfigWithMetadata<T>) validateAndReturn(configId, listWorkspaceServiceAccountWithMetadata(configIdOpt), configType);
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -304,6 +316,8 @@ public <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConf
listActorCatalogWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
listActorCatalogFetchEventWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
listWorkspaceServiceAccountWithMetadata().forEach(c -> configWithMetadata.add((ConfigWithMetadata<T>) c));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -337,6 +351,33 @@ private List<ConfigWithMetadata<StandardWorkspace>> listStandardWorkspaceWithMet
return standardWorkspaces;
}

private List<ConfigWithMetadata<WorkspaceServiceAccount>> listWorkspaceServiceAccountWithMetadata() throws IOException {
return listWorkspaceServiceAccountWithMetadata(Optional.empty());
}

private List<ConfigWithMetadata<WorkspaceServiceAccount>> listWorkspaceServiceAccountWithMetadata(final Optional<UUID> configId)
throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(WORKSPACE_SERVICE_ACCOUNT);
if (configId.isPresent()) {
return query.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(configId.get())).fetch();
}
return query.fetch();
});

final List<ConfigWithMetadata<WorkspaceServiceAccount>> workspaceServiceAccounts = new ArrayList<>();
for (final Record record : result) {
final WorkspaceServiceAccount workspaceServiceAccount = DbConverter.buildWorkspaceServiceAccount(record);
workspaceServiceAccounts.add(new ConfigWithMetadata<>(
record.get(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID).toString(),
ConfigSchema.WORKSPACE_SERVICE_ACCOUNT.name(),
record.get(WORKSPACE_SERVICE_ACCOUNT.CREATED_AT).toInstant(),
record.get(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT).toInstant(),
workspaceServiceAccount));
}
return workspaceServiceAccounts;
}

private List<ConfigWithMetadata<StandardSourceDefinition>> listStandardSourceDefinitionWithMetadata() throws IOException {
return listStandardSourceDefinitionWithMetadata(Optional.empty());
}
Expand Down Expand Up @@ -697,6 +738,8 @@ public <T> void writeConfig(final AirbyteConfig configType, final String configI
writeActorCatalog(Collections.singletonList((ActorCatalog) config));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(Collections.singletonList((ActorCatalogFetchEvent) config));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
writeWorkspaceServiceAccount(Collections.singletonList((WorkspaceServiceAccount) config));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -758,6 +801,44 @@ private void writeStandardWorkspace(final List<StandardWorkspace> configs, final
});
}

private void writeWorkspaceServiceAccount(final List<WorkspaceServiceAccount> configs) throws IOException {
database.transaction(ctx -> {
writeWorkspaceServiceAccount(configs, ctx);
return null;
});
}

private void writeWorkspaceServiceAccount(final List<WorkspaceServiceAccount> configs, final DSLContext ctx) {
final OffsetDateTime timestamp = OffsetDateTime.now();
configs.forEach((workspaceServiceAccount) -> {
final boolean isExistingConfig = ctx.fetchExists(select()
.from(WORKSPACE_SERVICE_ACCOUNT)
.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(workspaceServiceAccount.getWorkspaceId())));

if (isExistingConfig) {
ctx.update(WORKSPACE_SERVICE_ACCOUNT)
.set(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, workspaceServiceAccount.getWorkspaceId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID, workspaceServiceAccount.getServiceAccountId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL, workspaceServiceAccount.getServiceAccountEmail())
.set(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getJsonCredential())))
.set(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getHmacKey())))
.set(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT, timestamp)
.where(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID.eq(workspaceServiceAccount.getWorkspaceId()))
.execute();
} else {
ctx.insertInto(WORKSPACE_SERVICE_ACCOUNT)
.set(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, workspaceServiceAccount.getWorkspaceId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID, workspaceServiceAccount.getServiceAccountId())
.set(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL, workspaceServiceAccount.getServiceAccountEmail())
.set(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getJsonCredential())))
.set(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY, JSONB.valueOf(Jsons.serialize(workspaceServiceAccount.getHmacKey())))
.set(WORKSPACE_SERVICE_ACCOUNT.CREATED_AT, timestamp)
.set(WORKSPACE_SERVICE_ACCOUNT.UPDATED_AT, timestamp)
.execute();
}
});
}

private void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs) throws IOException {
database.transaction(ctx -> {
ConfigWriter.writeStandardSourceDefinition(configs, ctx);
Expand Down Expand Up @@ -1190,6 +1271,8 @@ public <T> void writeConfigs(final AirbyteConfig configType, final Map<String, T
writeActorCatalog(configs.values().stream().map(c -> (ActorCatalog) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
writeActorCatalogFetchEvent(configs.values().stream().map(c -> (ActorCatalogFetchEvent) c).collect(Collectors.toList()));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
writeWorkspaceServiceAccount(configs.values().stream().map(c -> (WorkspaceServiceAccount) c).collect(Collectors.toList()));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -1221,6 +1304,8 @@ public void deleteConfig(final AirbyteConfig configType, final String configId)
deleteConfig(ACTOR_CATALOG, ACTOR_CATALOG.ID, UUID.fromString(configId));
} else if (configType == ConfigSchema.ACTOR_CATALOG_FETCH_EVENT) {
deleteConfig(ACTOR_CATALOG_FETCH_EVENT, ACTOR_CATALOG_FETCH_EVENT.ID, UUID.fromString(configId));
} else if (configType == ConfigSchema.WORKSPACE_SERVICE_ACCOUNT) {
deleteConfig(WORKSPACE_SERVICE_ACCOUNT, WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID, UUID.fromString(configId));
} else {
throw new IllegalArgumentException("Unknown Config Type " + configType);
}
Expand Down Expand Up @@ -1278,6 +1363,7 @@ public void replaceAllConfigs(final Map<AirbyteConfig, Stream<?>> configs, final
ctx.truncate(STATE).restartIdentity().cascade().execute();
ctx.truncate(ACTOR_CATALOG).restartIdentity().cascade().execute();
ctx.truncate(ACTOR_CATALOG_FETCH_EVENT).restartIdentity().cascade().execute();
ctx.truncate(WORKSPACE_SERVICE_ACCOUNT).restartIdentity().cascade().execute();

if (configs.containsKey(ConfigSchema.STANDARD_WORKSPACE)) {
configs.get(ConfigSchema.STANDARD_WORKSPACE).map(c -> (StandardWorkspace) c)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE_SERVICE_ACCOUNT;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
Expand All @@ -26,6 +27,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
Expand Down Expand Up @@ -147,4 +149,15 @@ public static ActorCatalog buildActorCatalog(final Record record) {
.withCatalogHash(record.get(ACTOR_CATALOG.CATALOG_HASH));
}

public static WorkspaceServiceAccount buildWorkspaceServiceAccount(final Record record) {
return new WorkspaceServiceAccount()
.withWorkspaceId(record.get(WORKSPACE_SERVICE_ACCOUNT.WORKSPACE_ID))
.withServiceAccountId(record.get(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_ID))
.withServiceAccountEmail(record.get(WORKSPACE_SERVICE_ACCOUNT.SERVICE_ACCOUNT_EMAIL))
.withJsonCredential(record.get(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL) == null ? null
: Jsons.deserialize(record.get(WORKSPACE_SERVICE_ACCOUNT.JSON_CREDENTIAL).data()))
.withHmacKey(record.get(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY) == null ? null
: Jsons.deserialize(record.get(WORKSPACE_SERVICE_ACCOUNT.HMAC_KEY).data()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.WorkspaceServiceAccount;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -96,4 +97,18 @@ private void hydrateValuesIfKeyPresent(final String key, final Map<String, Strea
}
}

public WorkspaceServiceAccount getWorkspaceServiceAccountWithSecrets(final UUID workspaceId)
throws JsonValidationException, ConfigNotFoundException, IOException {
final WorkspaceServiceAccount workspaceServiceAccount = configRepository.getWorkspaceServiceAccountNoSecrets(workspaceId);

final JsonNode jsonCredential =
workspaceServiceAccount.getJsonCredential() != null ? secretsHydrator.hydrateSecretCoordinate(workspaceServiceAccount.getJsonCredential())
: null;

final JsonNode hmacKey =
workspaceServiceAccount.getHmacKey() != null ? secretsHydrator.hydrateSecretCoordinate(workspaceServiceAccount.getHmacKey()) : null;

return Jsons.clone(workspaceServiceAccount).withJsonCredential(jsonCredential).withHmacKey(hmacKey);
}

}
Loading

0 comments on commit 367b863

Please sign in to comment.