Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate queries on OAuth table to use direct sql #11370

Merged
merged 7 commits into from
Mar 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.59.003", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.59.004", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_CATALOG_FETCH_EVENT;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION_WORKSPACE_GRANT;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION_OPERATION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;
Expand Down Expand Up @@ -50,7 +51,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
Expand All @@ -66,6 +66,7 @@
import org.jooq.Record1;
import org.jooq.Record2;
import org.jooq.Result;
import org.jooq.SelectJoinStep;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -607,14 +608,18 @@ public SourceOAuthParameter getSourceOAuthParams(final UUID SourceOAuthParameter
}

public Optional<SourceOAuthParameter> getSourceOAuthParamByDefinitionIdOptional(final UUID workspaceId, final UUID sourceDefinitionId)
throws JsonValidationException, IOException {
for (final SourceOAuthParameter oAuthParameter : listSourceOAuthParam()) {
if (sourceDefinitionId.equals(oAuthParameter.getSourceDefinitionId()) &&
Objects.equals(workspaceId, oAuthParameter.getWorkspaceId())) {
return Optional.of(oAuthParameter);
}
throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_OAUTH_PARAMETER);
return query.where(ACTOR_OAUTH_PARAMETER.ACTOR_TYPE.eq(ActorType.source),
ACTOR_OAUTH_PARAMETER.WORKSPACE_ID.eq(workspaceId),
ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID.eq(sourceDefinitionId)).fetch();
});

if (result.size() == 0) {
return Optional.empty();
}
return Optional.empty();
return Optional.of(DbConverter.buildSourceOAuthParameter(result.get(0)));
}

public void writeSourceOAuthParam(final SourceOAuthParameter SourceOAuthParameter) throws JsonValidationException, IOException {
Expand All @@ -632,14 +637,18 @@ public DestinationOAuthParameter getDestinationOAuthParams(final UUID destinatio

public Optional<DestinationOAuthParameter> getDestinationOAuthParamByDefinitionIdOptional(final UUID workspaceId,
final UUID destinationDefinitionId)
throws JsonValidationException, IOException {
for (final DestinationOAuthParameter oAuthParameter : listDestinationOAuthParam()) {
if (destinationDefinitionId.equals(oAuthParameter.getDestinationDefinitionId()) &&
Objects.equals(workspaceId, oAuthParameter.getWorkspaceId())) {
return Optional.of(oAuthParameter);
}
throws IOException {
final Result<Record> result = database.query(ctx -> {
final SelectJoinStep<Record> query = ctx.select(asterisk()).from(ACTOR_OAUTH_PARAMETER);
return query.where(ACTOR_OAUTH_PARAMETER.ACTOR_TYPE.eq(ActorType.destination),
ACTOR_OAUTH_PARAMETER.WORKSPACE_ID.eq(workspaceId),
ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID.eq(destinationDefinitionId)).fetch();
});

if (result.size() == 0) {
return Optional.empty();
}
return Optional.empty();
return Optional.of(DbConverter.buildDestinationOAuthParameter(result.get(0)));
}

public void writeDestinationOAuthParam(final DestinationOAuthParameter destinationOAuthParameter) throws JsonValidationException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ private List<ConfigWithMetadata<SourceOAuthParameter>> listSourceOauthParamWithM

final List<ConfigWithMetadata<SourceOAuthParameter>> sourceOAuthParameters = new ArrayList<>();
for (final Record record : result) {
final SourceOAuthParameter sourceOAuthParameter = buildSourceOAuthParameter(record);
final SourceOAuthParameter sourceOAuthParameter = DbConverter.buildSourceOAuthParameter(record);
sourceOAuthParameters.add(new ConfigWithMetadata<>(
record.get(ACTOR_OAUTH_PARAMETER.ID).toString(),
ConfigSchema.SOURCE_OAUTH_PARAM.name(),
Expand All @@ -488,14 +488,6 @@ private List<ConfigWithMetadata<SourceOAuthParameter>> listSourceOauthParamWithM
return sourceOAuthParameters;
}

private SourceOAuthParameter buildSourceOAuthParameter(final Record record) {
return new SourceOAuthParameter()
.withOauthParameterId(record.get(ACTOR_OAUTH_PARAMETER.ID))
.withConfiguration(Jsons.deserialize(record.get(ACTOR_OAUTH_PARAMETER.CONFIGURATION).data()))
.withWorkspaceId(record.get(ACTOR_OAUTH_PARAMETER.WORKSPACE_ID))
.withSourceDefinitionId(record.get(ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID));
}

private List<ConfigWithMetadata<DestinationOAuthParameter>> listDestinationOauthParamWithMetadata() throws IOException {
return listDestinationOauthParamWithMetadata(Optional.empty());
}
Expand All @@ -512,7 +504,7 @@ private List<ConfigWithMetadata<DestinationOAuthParameter>> listDestinationOauth

final List<ConfigWithMetadata<DestinationOAuthParameter>> destinationOAuthParameters = new ArrayList<>();
for (final Record record : result) {
final DestinationOAuthParameter destinationOAuthParameter = buildDestinationOAuthParameter(record);
final DestinationOAuthParameter destinationOAuthParameter = DbConverter.buildDestinationOAuthParameter(record);
destinationOAuthParameters.add(new ConfigWithMetadata<>(
record.get(ACTOR_OAUTH_PARAMETER.ID).toString(),
ConfigSchema.DESTINATION_OAUTH_PARAM.name(),
Expand All @@ -523,14 +515,6 @@ private List<ConfigWithMetadata<DestinationOAuthParameter>> listDestinationOauth
return destinationOAuthParameters;
}

private DestinationOAuthParameter buildDestinationOAuthParameter(final Record record) {
return new DestinationOAuthParameter()
.withOauthParameterId(record.get(ACTOR_OAUTH_PARAMETER.ID))
.withConfiguration(Jsons.deserialize(record.get(ACTOR_OAUTH_PARAMETER.CONFIGURATION).data()))
.withWorkspaceId(record.get(ACTOR_OAUTH_PARAMETER.WORKSPACE_ID))
.withDestinationDefinitionId(record.get(ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID));
}

private List<ConfigWithMetadata<StandardSyncOperation>> listStandardSyncOperationWithMetadata() throws IOException {
return listStandardSyncOperationWithMetadata(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@
package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_DEFINITION;
import static io.airbyte.db.instance.configs.jooq.Tables.ACTOR_OAUTH_PARAMETER;
import static io.airbyte.db.instance.configs.jooq.Tables.CONNECTION;
import static io.airbyte.db.instance.configs.jooq.Tables.WORKSPACE;

import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Notification;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.Schedule;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
Expand Down Expand Up @@ -119,4 +122,20 @@ public static StandardDestinationDefinition buildStandardDestinationDefinition(f
: Jsons.deserialize(record.get(ACTOR_DEFINITION.RESOURCE_REQUIREMENTS).data(), ActorDefinitionResourceRequirements.class));
}

public static DestinationOAuthParameter buildDestinationOAuthParameter(final Record record) {
return new DestinationOAuthParameter()
.withOauthParameterId(record.get(ACTOR_OAUTH_PARAMETER.ID))
.withConfiguration(Jsons.deserialize(record.get(ACTOR_OAUTH_PARAMETER.CONFIGURATION).data()))
.withWorkspaceId(record.get(ACTOR_OAUTH_PARAMETER.WORKSPACE_ID))
.withDestinationDefinitionId(record.get(ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID));
}

public static SourceOAuthParameter buildSourceOAuthParameter(final Record record) {
return new SourceOAuthParameter()
.withOauthParameterId(record.get(ACTOR_OAUTH_PARAMETER.ID))
.withConfiguration(Jsons.deserialize(record.get(ACTOR_OAUTH_PARAMETER.CONFIGURATION).data()))
.withWorkspaceId(record.get(ACTOR_OAUTH_PARAMETER.WORKSPACE_ID))
.withSourceDefinitionId(record.get(ACTOR_OAUTH_PARAMETER.ACTOR_DEFINITION_ID));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSourceDefinition.SourceType;
Expand Down Expand Up @@ -100,6 +102,12 @@ void setup() throws IOException, JsonValidationException {
for (final StandardSync sync : MockData.standardSyncs()) {
configRepository.writeStandardSync(sync);
}
for (final SourceOAuthParameter oAuthParameter : MockData.sourceOauthParameters()) {
configRepository.writeSourceOAuthParam(oAuthParameter);
}
for (final DestinationOAuthParameter oAuthParameter : MockData.destinationOauthParameters()) {
configRepository.writeDestinationOAuthParam(oAuthParameter);
}
}

@AfterAll
Expand Down Expand Up @@ -292,4 +300,47 @@ public void testDestinationDefinitionGrants() throws IOException {
Map.entry(grantableDefinition2, false)));
}

@Test
public void testGetDestinationOAuthByDefinitionId() throws IOException {

final DestinationOAuthParameter destinationOAuthParameter = MockData.destinationOauthParameters().get(0);
final Optional<DestinationOAuthParameter> result = configRepository.getDestinationOAuthParamByDefinitionIdOptional(
destinationOAuthParameter.getWorkspaceId(), destinationOAuthParameter.getDestinationDefinitionId());
assertTrue(result.isPresent());
assertEquals(destinationOAuthParameter, result.get());
}

@Test
public void testMissingDestinationOAuthByDefinitionId() throws IOException {
final UUID missingId = UUID.fromString("fc59cfa0-06de-4c8b-850b-46d4cfb65629");
final DestinationOAuthParameter destinationOAuthParameter = MockData.destinationOauthParameters().get(0);
Optional<DestinationOAuthParameter> result =
configRepository.getDestinationOAuthParamByDefinitionIdOptional(destinationOAuthParameter.getWorkspaceId(), missingId);
assertFalse(result.isPresent());

result = configRepository.getDestinationOAuthParamByDefinitionIdOptional(missingId, destinationOAuthParameter.getDestinationDefinitionId());
assertFalse(result.isPresent());
}

@Test
public void testGetSourceOAuthByDefinitionId() throws IOException {
final SourceOAuthParameter sourceOAuthParameter = MockData.sourceOauthParameters().get(0);
final Optional<SourceOAuthParameter> result = configRepository.getSourceOAuthParamByDefinitionIdOptional(sourceOAuthParameter.getWorkspaceId(),
sourceOAuthParameter.getSourceDefinitionId());
assertTrue(result.isPresent());
assertEquals(sourceOAuthParameter, result.get());
}

@Test
public void testMissingSourceOAuthByDefinitionId() throws IOException {
final UUID missingId = UUID.fromString("fc59cfa0-06de-4c8b-850b-46d4cfb65629");
final SourceOAuthParameter sourceOAuthParameter = MockData.sourceOauthParameters().get(0);
Optional<SourceOAuthParameter> result =
configRepository.getSourceOAuthParamByDefinitionIdOptional(sourceOAuthParameter.getWorkspaceId(), missingId);
assertFalse(result.isPresent());

result = configRepository.getSourceOAuthParamByDefinitionIdOptional(missingId, sourceOAuthParameter.getSourceDefinitionId());
assertFalse(result.isPresent());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_35_59_004__AddOauthParamIndex extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_59_004__AddOauthParamIndex.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

final DSLContext ctx = DSL.using(context.getConnection());
ctx.createIndexIfNotExists("actor_oauth_parameter_workspace_definition_idx").on("actor_oauth_parameter", "workspace_id", "actor_definition_id")
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ create unique index "actor_definition_workspace_gr_workspace_id_actor_definition
"actor_definition_id" asc
);
create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc);
create index "actor_oauth_parameter_workspace_definition_idx" on "public"."actor_oauth_parameter"(
"workspace_id" asc,
"actor_definition_id" asc
);
create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc);
create index "airbyte_configs_migrations_s_idx" on "public"."airbyte_configs_migrations"("success" asc);
create index "connection_destination_id_idx" on "public"."connection"("destination_id" asc);
Expand Down