Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove foreign key from actor_oauth_parameter table #9112

Merged
merged 4 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -727,9 +727,9 @@ private static void populateConnectionOperation(final DSLContext ctx,
LOGGER.info("connection_operation table populated with " + connectionOperationRecords + " records");
}

private static <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig airbyteConfigType,
final Class<T> clazz,
final DSLContext ctx) {
static <T> List<ConfigWithMetadata<T>> listConfigsWithMetadata(final AirbyteConfig airbyteConfigType,
final Class<T> clazz,
final DSLContext ctx) {
final Field<String> configId = DSL.field("config_id", SQLDataType.VARCHAR(36).nullable(false));
final Field<String> configType = DSL.field("config_type", SQLDataType.VARCHAR(60).nullable(false));
final Field<OffsetDateTime> createdAt =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.actorDefinitionDoesNotExist;
import static io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.listConfigsWithMetadata;
import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.select;
import static org.jooq.impl.DSL.table;

import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.ConfigWithMetadata;
import io.airbyte.config.DestinationOAuthParameter;
import io.airbyte.config.SourceOAuthParameter;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_35_1_001__RemoveForeignKeyFromActorOauth extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_1_001__RemoveForeignKeyFromActorOauth.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
dropForeignKeyConstraintFromActorOauthTable(ctx);
populateActorOauthParameter(ctx);
}

private static void dropForeignKeyConstraintFromActorOauthTable(final DSLContext ctx) {
ctx.alterTable("actor_oauth_parameter").dropForeignKey("actor_oauth_parameter_workspace_id_fkey").execute();
}

private static void populateActorOauthParameter(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<UUID> actorDefinitionId = DSL.field("actor_definition_id", SQLDataType.UUID.nullable(false));
final Field<JSONB> configuration = DSL.field("configuration", SQLDataType.JSONB.nullable(false));
final Field<UUID> workspaceId = DSL.field("workspace_id", SQLDataType.UUID.nullable(true));
final Field<ActorType> actorType = DSL.field("actor_type", SQLDataType.VARCHAR.asEnumDataType(ActorType.class).nullable(false));
final Field<OffsetDateTime> createdAt =
DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt =
DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

final List<ConfigWithMetadata<SourceOAuthParameter>> sourceOauthParamsWithMetadata = listConfigsWithMetadata(
ConfigSchema.SOURCE_OAUTH_PARAM,
SourceOAuthParameter.class,
ctx);
long sourceOauthParamRecords = 0L;
for (final ConfigWithMetadata<SourceOAuthParameter> configWithMetadata : sourceOauthParamsWithMetadata) {
final SourceOAuthParameter sourceOAuthParameter = configWithMetadata.getConfig();
if (actorDefinitionDoesNotExist(sourceOAuthParameter.getSourceDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping source oauth parameter " + sourceOAuthParameter.getSourceDefinitionId() + " because the specified source definition "
+ sourceOAuthParameter.getSourceDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorOAuthParamExists(sourceOAuthParameter.getOauthParameterId(), ctx)) {
LOGGER.warn(
"Skipping source oauth parameter " + sourceOAuthParameter.getOauthParameterId()
+ " because the specified parameter already exists in the table.");
continue;
}
ctx.insertInto(DSL.table("actor_oauth_parameter"))
.set(id, sourceOAuthParameter.getOauthParameterId())
.set(workspaceId, sourceOAuthParameter.getWorkspaceId())
.set(actorDefinitionId, sourceOAuthParameter.getSourceDefinitionId())
.set(configuration, JSONB.valueOf(Jsons.serialize(sourceOAuthParameter.getConfiguration())))
.set(actorType, ActorType.source)
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
sourceOauthParamRecords++;
}

LOGGER.info("actor_oauth_parameter table populated with " + sourceOauthParamRecords + " source oauth params records");

final List<ConfigWithMetadata<DestinationOAuthParameter>> destinationOauthParamsWithMetadata = listConfigsWithMetadata(
ConfigSchema.DESTINATION_OAUTH_PARAM,
DestinationOAuthParameter.class,
ctx);
long destinationOauthParamRecords = 0L;
for (final ConfigWithMetadata<DestinationOAuthParameter> configWithMetadata : destinationOauthParamsWithMetadata) {
final DestinationOAuthParameter destinationOAuthParameter = configWithMetadata.getConfig();
if (actorDefinitionDoesNotExist(destinationOAuthParameter.getDestinationDefinitionId(), ctx)) {
LOGGER.warn(
"Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId()
+ " because the specified destination definition "
+ destinationOAuthParameter.getDestinationDefinitionId()
+ " doesn't exist and violates foreign key constraint.");
continue;
} else if (actorOAuthParamExists(destinationOAuthParameter.getOauthParameterId(), ctx)) {
LOGGER.warn(
"Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId()
+ " because the specified parameter already exists in the table.");
continue;
}
ctx.insertInto(DSL.table("actor_oauth_parameter"))
.set(id, destinationOAuthParameter.getOauthParameterId())
.set(workspaceId, destinationOAuthParameter.getWorkspaceId())
.set(actorDefinitionId, destinationOAuthParameter.getDestinationDefinitionId())
.set(configuration, JSONB.valueOf(Jsons.serialize(destinationOAuthParameter.getConfiguration())))
.set(actorType, ActorType.destination)
.set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC))
.set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC))
.execute();
destinationOauthParamRecords++;
}

LOGGER.info("actor_oauth_parameter table populated with " + destinationOauthParamRecords + " destination oauth params records");
}

static boolean actorOAuthParamExists(UUID oauthParamId, DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
return ctx.fetchExists(select()
.from(table("actor_oauth_parameter"))
.where(id.eq(oauthParamId)));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
Indexes:
"workspace_pkey" PRIMARY KEY, btree (id)
Referenced by:
TABLE "actor_oauth_parameter" CONSTRAINT "actor_oauth_parameter_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE
TABLE "actor" CONSTRAINT "actor_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE
TABLE "operation" CONSTRAINT "operation_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE

Expand Down Expand Up @@ -126,8 +125,6 @@ Indexes:
"actor_oauth_parameter_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
"actor_oauth_parameter_actor_definition_id_fkey" FOREIGN KEY (actor_definition_id) REFERENCES actor_definition(id) ON DELETE CASCADE
"actor_oauth_parameter_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE




Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ alter table "public"."actor_oauth_parameter"
add constraint "actor_oauth_parameter_actor_definition_id_fkey"
foreign key ("actor_definition_id")
references "public"."actor_definition" ("id");
alter table "public"."actor_oauth_parameter"
add constraint "actor_oauth_parameter_workspace_id_fkey"
foreign key ("workspace_id")
references "public"."workspace" ("id");
alter table "public"."connection"
add constraint "connection_destination_id_fkey"
foreign key ("destination_id")
Expand Down