diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 76ca4bdf0de5..9f3da9177372 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -76,7 +76,7 @@ void testBootloaderAppBlankDb() throws Exception { mockedConfigs.getConfigDatabaseUrl()) .getAndInitialize(); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName()); - assertEquals("0.32.8.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.35.1.001", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(version, jobsPersistence.getVersion().get()); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java index 4466de916c0c..3459d380c791 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java @@ -5,6 +5,9 @@ package io.airbyte.config.persistence; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; import io.airbyte.config.ConfigSchema; @@ -26,7 +29,6 @@ import java.io.IOException; import java.util.List; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -67,28 +69,37 @@ public void test() throws JsonValidationException, IOException, ConfigNotFoundEx private void deletion() throws ConfigNotFoundException, IOException, JsonValidationException { // Deleting the workspace should delete everything except for definitions configPersistence.deleteConfig(ConfigSchema.STANDARD_WORKSPACE, MockData.standardWorkspace().getWorkspaceId().toString()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_STATE, StandardSyncState.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, SourceConnection.class).isEmpty()); - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class).isEmpty()); - - Assertions.assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class).isEmpty()); - Assertions - .assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_STATE, StandardSyncState.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.DESTINATION_CONNECTION, SourceConnection.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class).isEmpty()); + + assertFalse(configPersistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class).isEmpty()); + assertFalse(configPersistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class).isEmpty()); + assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class).isEmpty()); + assertFalse(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); + + for (final SourceOAuthParameter sourceOAuthParameter : MockData.sourceOauthParameters()) { + configPersistence.deleteConfig(ConfigSchema.SOURCE_OAUTH_PARAM, sourceOAuthParameter.getOauthParameterId().toString()); + } + assertTrue(configPersistence.listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class).isEmpty()); + + for (final DestinationOAuthParameter destinationOAuthParameter : MockData.destinationOauthParameters()) { + configPersistence.deleteConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameter.getOauthParameterId().toString()); + } + assertTrue(configPersistence.listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class).isEmpty()); for (final StandardSourceDefinition standardSourceDefinition : MockData.standardSourceDefinitions()) { configPersistence.deleteConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, standardSourceDefinition.getSourceDefinitionId().toString()); } - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class).isEmpty()); for (final StandardDestinationDefinition standardDestinationDefinition : MockData.standardDestinationDefinitions()) { configPersistence .deleteConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, standardDestinationDefinition.getDestinationDefinitionId().toString()); } - Assertions.assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); + assertTrue(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class).isEmpty()); } private void standardSyncState() throws JsonValidationException, IOException, ConfigNotFoundException { @@ -99,11 +110,11 @@ private void standardSyncState() throws JsonValidationException, IOException, Co final StandardSyncState standardSyncStateFromDB = configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_STATE, standardSyncState.getConnectionId().toString(), StandardSyncState.class); - Assertions.assertEquals(standardSyncState, standardSyncStateFromDB); + assertEquals(standardSyncState, standardSyncStateFromDB); } final List standardSyncStates = configPersistence .listConfigs(ConfigSchema.STANDARD_SYNC_STATE, StandardSyncState.class); - Assertions.assertEquals(MockData.standardSyncStates().size(), standardSyncStates.size()); + assertEquals(MockData.standardSyncStates().size(), standardSyncStates.size()); assertThat(MockData.standardSyncStates()).hasSameElementsAs(standardSyncStates); } @@ -115,11 +126,11 @@ private void standardSync() throws JsonValidationException, IOException, ConfigN final StandardSync standardSyncFromDB = configPersistence.getConfig(ConfigSchema.STANDARD_SYNC, standardSync.getConnectionId().toString(), StandardSync.class); - Assertions.assertEquals(standardSync, standardSyncFromDB); + assertEquals(standardSync, standardSyncFromDB); } final List standardSyncs = configPersistence .listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class); - Assertions.assertEquals(MockData.standardSyncs().size(), standardSyncs.size()); + assertEquals(MockData.standardSyncs().size(), standardSyncs.size()); assertThat(MockData.standardSyncs()).hasSameElementsAs(standardSyncs); } @@ -131,11 +142,11 @@ private void standardSyncOperation() throws JsonValidationException, IOException final StandardSyncOperation standardSyncOperationFromDB = configPersistence.getConfig(ConfigSchema.STANDARD_SYNC_OPERATION, standardSyncOperation.getOperationId().toString(), StandardSyncOperation.class); - Assertions.assertEquals(standardSyncOperation, standardSyncOperationFromDB); + assertEquals(standardSyncOperation, standardSyncOperationFromDB); } final List standardSyncOperations = configPersistence .listConfigs(ConfigSchema.STANDARD_SYNC_OPERATION, StandardSyncOperation.class); - Assertions.assertEquals(MockData.standardSyncOperations().size(), standardSyncOperations.size()); + assertEquals(MockData.standardSyncOperations().size(), standardSyncOperations.size()); assertThat(MockData.standardSyncOperations()).hasSameElementsAs(standardSyncOperations); } @@ -147,11 +158,11 @@ private void destinationOauthParam() throws JsonValidationException, IOException final DestinationOAuthParameter destinationOAuthParameterFromDB = configPersistence.getConfig(ConfigSchema.DESTINATION_OAUTH_PARAM, destinationOAuthParameter.getOauthParameterId().toString(), DestinationOAuthParameter.class); - Assertions.assertEquals(destinationOAuthParameter, destinationOAuthParameterFromDB); + assertEquals(destinationOAuthParameter, destinationOAuthParameterFromDB); } final List destinationOAuthParameters = configPersistence .listConfigs(ConfigSchema.DESTINATION_OAUTH_PARAM, DestinationOAuthParameter.class); - Assertions.assertEquals(MockData.destinationOauthParameters().size(), destinationOAuthParameters.size()); + assertEquals(MockData.destinationOauthParameters().size(), destinationOAuthParameters.size()); assertThat(MockData.destinationOauthParameters()).hasSameElementsAs(destinationOAuthParameters); } @@ -163,11 +174,11 @@ private void sourceOauthParam() throws JsonValidationException, IOException, Con final SourceOAuthParameter sourceOAuthParameterFromDB = configPersistence.getConfig(ConfigSchema.SOURCE_OAUTH_PARAM, sourceOAuthParameter.getOauthParameterId().toString(), SourceOAuthParameter.class); - Assertions.assertEquals(sourceOAuthParameter, sourceOAuthParameterFromDB); + assertEquals(sourceOAuthParameter, sourceOAuthParameterFromDB); } final List sourceOAuthParameters = configPersistence .listConfigs(ConfigSchema.SOURCE_OAUTH_PARAM, SourceOAuthParameter.class); - Assertions.assertEquals(MockData.sourceOauthParameters().size(), sourceOAuthParameters.size()); + assertEquals(MockData.sourceOauthParameters().size(), sourceOAuthParameters.size()); assertThat(MockData.sourceOauthParameters()).hasSameElementsAs(sourceOAuthParameters); } @@ -179,11 +190,11 @@ private void destinationConnection() throws JsonValidationException, IOException final DestinationConnection destinationConnectionFromDB = configPersistence.getConfig(ConfigSchema.DESTINATION_CONNECTION, destinationConnection.getDestinationId().toString(), DestinationConnection.class); - Assertions.assertEquals(destinationConnection, destinationConnectionFromDB); + assertEquals(destinationConnection, destinationConnectionFromDB); } final List destinationConnections = configPersistence .listConfigs(ConfigSchema.DESTINATION_CONNECTION, DestinationConnection.class); - Assertions.assertEquals(MockData.destinationConnections().size(), destinationConnections.size()); + assertEquals(MockData.destinationConnections().size(), destinationConnections.size()); assertThat(MockData.destinationConnections()).hasSameElementsAs(destinationConnections); } @@ -195,11 +206,11 @@ private void sourceConnection() throws JsonValidationException, IOException, Con final SourceConnection sourceConnectionFromDB = configPersistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceConnection.getSourceId().toString(), SourceConnection.class); - Assertions.assertEquals(sourceConnection, sourceConnectionFromDB); + assertEquals(sourceConnection, sourceConnectionFromDB); } final List sourceConnections = configPersistence .listConfigs(ConfigSchema.SOURCE_CONNECTION, SourceConnection.class); - Assertions.assertEquals(MockData.sourceConnections().size(), sourceConnections.size()); + assertEquals(MockData.sourceConnections().size(), sourceConnections.size()); assertThat(MockData.sourceConnections()).hasSameElementsAs(sourceConnections); } @@ -212,11 +223,11 @@ private void standardDestinationDefinition() throws JsonValidationException, IOE .getConfig(ConfigSchema.STANDARD_DESTINATION_DEFINITION, standardDestinationDefinition.getDestinationDefinitionId().toString(), StandardDestinationDefinition.class); - Assertions.assertEquals(standardDestinationDefinition, standardDestinationDefinitionFromDB); + assertEquals(standardDestinationDefinition, standardDestinationDefinitionFromDB); } final List standardDestinationDefinitions = configPersistence .listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class); - Assertions.assertEquals(MockData.standardDestinationDefinitions().size(), standardDestinationDefinitions.size()); + assertEquals(MockData.standardDestinationDefinitions().size(), standardDestinationDefinitions.size()); assertThat(MockData.standardDestinationDefinitions()).hasSameElementsAs(standardDestinationDefinitions); } @@ -228,11 +239,11 @@ private void standardSourceDefinition() throws JsonValidationException, IOExcept final StandardSourceDefinition standardSourceDefinitionFromDB = configPersistence.getConfig(ConfigSchema.STANDARD_SOURCE_DEFINITION, standardSourceDefinition.getSourceDefinitionId().toString(), StandardSourceDefinition.class); - Assertions.assertEquals(standardSourceDefinition, standardSourceDefinitionFromDB); + assertEquals(standardSourceDefinition, standardSourceDefinitionFromDB); } final List standardSourceDefinitions = configPersistence .listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class); - Assertions.assertEquals(MockData.standardSourceDefinitions().size(), standardSourceDefinitions.size()); + assertEquals(MockData.standardSourceDefinitions().size(), standardSourceDefinitions.size()); assertThat(MockData.standardSourceDefinitions()).hasSameElementsAs(standardSourceDefinitions); } @@ -244,9 +255,9 @@ private void standardWorkspace() throws JsonValidationException, IOException, Co MockData.standardWorkspace().getWorkspaceId().toString(), StandardWorkspace.class); final List standardWorkspaces = configPersistence.listConfigs(ConfigSchema.STANDARD_WORKSPACE, StandardWorkspace.class); - Assertions.assertEquals(MockData.standardWorkspace(), standardWorkspace); - Assertions.assertEquals(1, standardWorkspaces.size()); - Assertions.assertTrue(standardWorkspaces.contains(MockData.standardWorkspace())); + assertEquals(MockData.standardWorkspace(), standardWorkspace); + assertEquals(1, standardWorkspaces.size()); + assertTrue(standardWorkspaces.contains(MockData.standardWorkspace())); } } diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization.java index 2cffb8606968..a2f6c15971f3 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization.java @@ -727,9 +727,9 @@ private static void populateConnectionOperation(final DSLContext ctx, LOGGER.info("connection_operation table populated with " + connectionOperationRecords + " records"); } - private static List> listConfigsWithMetadata(final AirbyteConfig airbyteConfigType, - final Class clazz, - final DSLContext ctx) { + static List> listConfigsWithMetadata(final AirbyteConfig airbyteConfigType, + final Class clazz, + final DSLContext ctx) { final Field configId = DSL.field("config_id", SQLDataType.VARCHAR(36).nullable(false)); final Field configType = DSL.field("config_type", SQLDataType.VARCHAR(60).nullable(false)); final Field createdAt = diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth.java new file mode 100644 index 000000000000..4a8ffcc2ad94 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.actorDefinitionDoesNotExist; +import static io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.listConfigsWithMetadata; +import static org.jooq.impl.DSL.currentOffsetDateTime; +import static org.jooq.impl.DSL.select; +import static org.jooq.impl.DSL.table; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ConfigSchema; +import io.airbyte.config.ConfigWithMetadata; +import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.List; +import java.util.UUID; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_35_1_001__RemoveForeignKeyFromActorOauth extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_1_001__RemoveForeignKeyFromActorOauth.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + migrate(ctx); + } + + @VisibleForTesting + public static void migrate(DSLContext ctx) { + dropForeignKeyConstraintFromActorOauthTable(ctx); + populateActorOauthParameter(ctx); + } + + private static void dropForeignKeyConstraintFromActorOauthTable(final DSLContext ctx) { + ctx.alterTable("actor_oauth_parameter").dropForeignKey("actor_oauth_parameter_workspace_id_fkey").execute(); + LOGGER.info("actor_oauth_parameter_workspace_id_fkey constraint dropped"); + } + + private static void populateActorOauthParameter(final DSLContext ctx) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + final Field actorDefinitionId = DSL.field("actor_definition_id", SQLDataType.UUID.nullable(false)); + final Field configuration = DSL.field("configuration", SQLDataType.JSONB.nullable(false)); + final Field workspaceId = DSL.field("workspace_id", SQLDataType.UUID.nullable(true)); + final Field actorType = DSL.field("actor_type", SQLDataType.VARCHAR.asEnumDataType(ActorType.class).nullable(false)); + final Field createdAt = + DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime())); + final Field updatedAt = + DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime())); + + final List> sourceOauthParamsWithMetadata = listConfigsWithMetadata( + ConfigSchema.SOURCE_OAUTH_PARAM, + SourceOAuthParameter.class, + ctx); + long sourceOauthParamRecords = 0L; + for (final ConfigWithMetadata configWithMetadata : sourceOauthParamsWithMetadata) { + final SourceOAuthParameter sourceOAuthParameter = configWithMetadata.getConfig(); + if (actorDefinitionDoesNotExist(sourceOAuthParameter.getSourceDefinitionId(), ctx)) { + LOGGER.warn( + "Skipping source oauth parameter " + sourceOAuthParameter.getSourceDefinitionId() + " because the specified source definition " + + sourceOAuthParameter.getSourceDefinitionId() + + " doesn't exist and violates foreign key constraint."); + continue; + } else if (actorOAuthParamExists(sourceOAuthParameter.getOauthParameterId(), ctx)) { + LOGGER.warn( + "Skipping source oauth parameter " + sourceOAuthParameter.getOauthParameterId() + + " because the specified parameter already exists in the table."); + continue; + } + ctx.insertInto(DSL.table("actor_oauth_parameter")) + .set(id, sourceOAuthParameter.getOauthParameterId()) + .set(workspaceId, sourceOAuthParameter.getWorkspaceId()) + .set(actorDefinitionId, sourceOAuthParameter.getSourceDefinitionId()) + .set(configuration, JSONB.valueOf(Jsons.serialize(sourceOAuthParameter.getConfiguration()))) + .set(actorType, ActorType.source) + .set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC)) + .set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC)) + .execute(); + sourceOauthParamRecords++; + } + + LOGGER.info("actor_oauth_parameter table populated with " + sourceOauthParamRecords + " source oauth params records"); + + final List> destinationOauthParamsWithMetadata = listConfigsWithMetadata( + ConfigSchema.DESTINATION_OAUTH_PARAM, + DestinationOAuthParameter.class, + ctx); + long destinationOauthParamRecords = 0L; + for (final ConfigWithMetadata configWithMetadata : destinationOauthParamsWithMetadata) { + final DestinationOAuthParameter destinationOAuthParameter = configWithMetadata.getConfig(); + if (actorDefinitionDoesNotExist(destinationOAuthParameter.getDestinationDefinitionId(), ctx)) { + LOGGER.warn( + "Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId() + + " because the specified destination definition " + + destinationOAuthParameter.getDestinationDefinitionId() + + " doesn't exist and violates foreign key constraint."); + continue; + } else if (actorOAuthParamExists(destinationOAuthParameter.getOauthParameterId(), ctx)) { + LOGGER.warn( + "Skipping destination oauth parameter " + destinationOAuthParameter.getOauthParameterId() + + " because the specified parameter already exists in the table."); + continue; + } + ctx.insertInto(DSL.table("actor_oauth_parameter")) + .set(id, destinationOAuthParameter.getOauthParameterId()) + .set(workspaceId, destinationOAuthParameter.getWorkspaceId()) + .set(actorDefinitionId, destinationOAuthParameter.getDestinationDefinitionId()) + .set(configuration, JSONB.valueOf(Jsons.serialize(destinationOAuthParameter.getConfiguration()))) + .set(actorType, ActorType.destination) + .set(createdAt, OffsetDateTime.ofInstant(configWithMetadata.getCreatedAt(), ZoneOffset.UTC)) + .set(updatedAt, OffsetDateTime.ofInstant(configWithMetadata.getUpdatedAt(), ZoneOffset.UTC)) + .execute(); + destinationOauthParamRecords++; + } + + LOGGER.info("actor_oauth_parameter table populated with " + destinationOauthParamRecords + " destination oauth params records"); + } + + static boolean actorOAuthParamExists(UUID oauthParamId, DSLContext ctx) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + return ctx.fetchExists(select() + .from(table("actor_oauth_parameter")) + .where(id.eq(oauthParamId))); + } + +} diff --git a/airbyte-db/lib/src/main/resources/configs_database/normalized_tables_schema.txt b/airbyte-db/lib/src/main/resources/configs_database/normalized_tables_schema.txt index ec44b1595f96..ed5429f5dcc8 100644 --- a/airbyte-db/lib/src/main/resources/configs_database/normalized_tables_schema.txt +++ b/airbyte-db/lib/src/main/resources/configs_database/normalized_tables_schema.txt @@ -57,7 +57,6 @@ Indexes: "workspace_pkey" PRIMARY KEY, btree (id) Referenced by: - TABLE "actor_oauth_parameter" CONSTRAINT "actor_oauth_parameter_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE TABLE "actor" CONSTRAINT "actor_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE TABLE "operation" CONSTRAINT "operation_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE @@ -126,8 +125,6 @@ Indexes: "actor_oauth_parameter_pkey" PRIMARY KEY, btree (id) Foreign-key constraints: "actor_oauth_parameter_actor_definition_id_fkey" FOREIGN KEY (actor_definition_id) REFERENCES actor_definition(id) ON DELETE CASCADE - "actor_oauth_parameter_workspace_id_fkey" FOREIGN KEY (workspace_id) REFERENCES workspace(id) ON DELETE CASCADE - diff --git a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt index 863d6c9654db..65adab659efb 100644 --- a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt @@ -153,10 +153,6 @@ alter table "public"."actor_oauth_parameter" add constraint "actor_oauth_parameter_actor_definition_id_fkey" foreign key ("actor_definition_id") references "public"."actor_definition" ("id"); -alter table "public"."actor_oauth_parameter" - add constraint "actor_oauth_parameter_workspace_id_fkey" - foreign key ("workspace_id") - references "public"."workspace" ("id"); alter table "public"."connection" add constraint "connection_destination_id_fkey" foreign key ("destination_id") diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/SetupForNormalizedTablesTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/SetupForNormalizedTablesTest.java index 97f1bff30381..43a35b7f5856 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/SetupForNormalizedTablesTest.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/SetupForNormalizedTablesTest.java @@ -272,7 +272,7 @@ public static List destinationConnections() { public static List sourceOauthParameters() { final SourceOAuthParameter sourceOAuthParameter1 = new SourceOAuthParameter() .withConfiguration(Jsons.jsonNode("'{\"name\":\"John\", \"age\":30, \"car\":null}'")) - .withWorkspaceId(WORKSPACE_ID) + .withWorkspaceId(null) .withSourceDefinitionId(SOURCE_DEFINITION_ID_1) .withOauthParameterId(SOURCE_OAUTH_PARAMETER_ID_1); final SourceOAuthParameter sourceOAuthParameter2 = new SourceOAuthParameter() @@ -286,7 +286,7 @@ public static List sourceOauthParameters() { public static List destinationOauthParameters() { final DestinationOAuthParameter destinationOAuthParameter1 = new DestinationOAuthParameter() .withConfiguration(Jsons.jsonNode("'{\"name\":\"John\", \"age\":30, \"car\":null}'")) - .withWorkspaceId(WORKSPACE_ID) + .withWorkspaceId(null) .withDestinationDefinitionId(DESTINATION_DEFINITION_ID_1) .withOauthParameterId(DESTINATION_OAUTH_PARAMETER_ID_1); final DestinationOAuthParameter destinationOAuthParameter2 = new DestinationOAuthParameter() diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization_Test.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization_Test.java index 48c7be19a0c0..d2020d279b19 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization_Test.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_32_8_001__AirbyteConfigDatabaseDenormalization_Test.java @@ -287,7 +287,7 @@ private void assertDataForSourceOauthParams(final DSLContext context) { .from(table("actor_oauth_parameter")) .where(actorType.eq(ActorType.source)) .fetch(); - final List expectedDefinitions = sourceOauthParameters(); + final List expectedDefinitions = sourceOauthParameters().stream().filter(c -> c.getWorkspaceId() != null).toList(); Assertions.assertEquals(expectedDefinitions.size(), sourceOauthParams.size()); for (final Record record : sourceOauthParams) { @@ -315,7 +315,8 @@ private void assertDataForDestinationOauthParams(final DSLContext context) { .from(table("actor_oauth_parameter")) .where(actorType.eq(ActorType.destination)) .fetch(); - final List expectedDefinitions = destinationOauthParameters(); + final List expectedDefinitions = + destinationOauthParameters().stream().filter(c -> c.getWorkspaceId() != null).toList(); Assertions.assertEquals(expectedDefinitions.size(), destinationOauthParams.size()); for (final Record record : destinationOauthParams) { diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth_Test.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth_Test.java new file mode 100644 index 000000000000..99fa6944a95d --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_1_001__RemoveForeignKeyFromActorOauth_Test.java @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static io.airbyte.db.instance.configs.migrations.SetupForNormalizedTablesTest.destinationOauthParameters; +import static io.airbyte.db.instance.configs.migrations.SetupForNormalizedTablesTest.now; +import static io.airbyte.db.instance.configs.migrations.SetupForNormalizedTablesTest.sourceOauthParameters; +import static org.jooq.impl.DSL.asterisk; +import static org.jooq.impl.DSL.table; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.DestinationOAuthParameter; +import io.airbyte.config.SourceOAuthParameter; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; +import java.io.IOException; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.List; +import java.util.UUID; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.Record; +import org.jooq.Result; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class V0_35_1_001__RemoveForeignKeyFromActorOauth_Test extends AbstractConfigsDatabaseTest { + + @Test + public void testCompleteMigration() throws IOException, SQLException { + final Database database = getDatabase(); + final DSLContext context = DSL.using(database.getDataSource().getConnection()); + SetupForNormalizedTablesTest.setup(context); + + V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context); + V0_35_1_001__RemoveForeignKeyFromActorOauth.migrate(context); + assertDataForSourceOauthParams(context); + assertDataForDestinationOauthParams(context); + } + + private void assertDataForSourceOauthParams(final DSLContext context) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + final Field actorDefinitionId = DSL.field("actor_definition_id", SQLDataType.UUID.nullable(false)); + final Field configuration = DSL.field("configuration", SQLDataType.JSONB.nullable(false)); + final Field workspaceId = DSL.field("workspace_id", SQLDataType.UUID.nullable(true)); + final Field actorType = DSL.field("actor_type", SQLDataType.VARCHAR.asEnumDataType(ActorType.class).nullable(false)); + final Field createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); + final Field updatedAt = DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); + + final Result sourceOauthParams = context.select(asterisk()) + .from(table("actor_oauth_parameter")) + .where(actorType.eq(ActorType.source)) + .fetch(); + final List expectedDefinitions = sourceOauthParameters(); + Assertions.assertEquals(expectedDefinitions.size(), sourceOauthParams.size()); + + for (final Record record : sourceOauthParams) { + final SourceOAuthParameter sourceOAuthParameter = new SourceOAuthParameter() + .withOauthParameterId(record.get(id)) + .withConfiguration(Jsons.deserialize(record.get(configuration).data())) + .withWorkspaceId(record.get(workspaceId)) + .withSourceDefinitionId(record.get(actorDefinitionId)); + Assertions.assertTrue(expectedDefinitions.contains(sourceOAuthParameter)); + Assertions.assertEquals(now(), record.get(createdAt).toInstant()); + Assertions.assertEquals(now(), record.get(updatedAt).toInstant()); + } + } + + private void assertDataForDestinationOauthParams(final DSLContext context) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + final Field actorDefinitionId = DSL.field("actor_definition_id", SQLDataType.UUID.nullable(false)); + final Field configuration = DSL.field("configuration", SQLDataType.JSONB.nullable(false)); + final Field workspaceId = DSL.field("workspace_id", SQLDataType.UUID.nullable(true)); + final Field actorType = DSL.field("actor_type", SQLDataType.VARCHAR.asEnumDataType(ActorType.class).nullable(false)); + final Field createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); + final Field updatedAt = DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); + + final Result destinationOauthParams = context.select(asterisk()) + .from(table("actor_oauth_parameter")) + .where(actorType.eq(ActorType.destination)) + .fetch(); + final List expectedDefinitions = destinationOauthParameters(); + Assertions.assertEquals(expectedDefinitions.size(), destinationOauthParams.size()); + + for (final Record record : destinationOauthParams) { + final DestinationOAuthParameter destinationOAuthParameter = new DestinationOAuthParameter() + .withOauthParameterId(record.get(id)) + .withConfiguration(Jsons.deserialize(record.get(configuration).data())) + .withWorkspaceId(record.get(workspaceId)) + .withDestinationDefinitionId(record.get(actorDefinitionId)); + Assertions.assertTrue(expectedDefinitions.contains(destinationOAuthParameter)); + Assertions.assertEquals(now(), record.get(createdAt).toInstant()); + Assertions.assertEquals(now(), record.get(updatedAt).toInstant()); + } + } + +}