From 721c904e5a911c655b5bc1a20dcd4ed4f6cbb5bf Mon Sep 17 00:00:00 2001 From: Malik Diarra Date: Tue, 8 Feb 2022 15:39:36 -0800 Subject: [PATCH] Create new tables for catalog storage --- .../airbyte/bootloader/BootloaderAppTest.java | 2 +- ...baseConfigPersistenceE2EReadWriteTest.java | 3 +- ...DatabaseConfigPersistenceLoadDataTest.java | 3 +- .../DatabaseConfigPersistenceTest.java | 3 +- ...istenceUpdateConnectorDefinitionsTest.java | 3 +- ...0_35_26_001__PersistDiscoveredCatalog.java | 96 +++++++++++++++ .../configs_database/schema_dump.txt | 35 ++++++ ..._26_001__PersistDiscoveredCatalogTest.java | 116 ++++++++++++++++++ 8 files changed, 256 insertions(+), 5 deletions(-) create mode 100644 airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalog.java create mode 100644 airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalogTest.java diff --git a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java index 6a6525d358b1..8045a5f6dd0d 100644 --- a/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java +++ b/airbyte-bootloader/src/test/java/io/airbyte/bootloader/BootloaderAppTest.java @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception { mockedConfigs.getConfigDatabaseUrl()) .getAndInitialize(); val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName()); - assertEquals("0.35.15.001", configsMigrator.getLatestMigration().getVersion().getVersion()); + assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion()); val jobsPersistence = new DefaultJobPersistence(jobDatabase); assertEquals(version, jobsPersistence.getVersion().get()); diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java index 3459d380c791..6239421cd8fd 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceE2EReadWriteTest.java @@ -43,7 +43,8 @@ public void setup() throws Exception { final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); MigrationDevHelper.runLastMigration(devDatabaseMigrator); database.query(ctx -> ctx - .execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); + .execute( + "TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); } @AfterEach diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java index cd875741fbca..7fdf97a1003e 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceLoadDataTest.java @@ -54,7 +54,8 @@ public static void setup() throws Exception { final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); MigrationDevHelper.runLastMigration(devDatabaseMigrator); database.query(ctx -> ctx - .execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); + .execute( + "TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); } @AfterAll diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java index 85f242300fb4..e27c8f1fe10c 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceTest.java @@ -57,7 +57,8 @@ public void setup() throws Exception { final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator); MigrationDevHelper.runLastMigration(devDatabaseMigrator); database.query(ctx -> ctx - .execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); + .execute( + "TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); } @AfterEach diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java index 52fe88d3a7ec..72f193d830af 100644 --- a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/DatabaseConfigPersistenceUpdateConnectorDefinitionsTest.java @@ -54,7 +54,8 @@ public static void tearDown() throws Exception { @BeforeEach public void resetDatabase() throws SQLException { database.query(ctx -> ctx - .execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); + .execute( + "TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace")); } @Test diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalog.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalog.java new file mode 100644 index 000000000000..1b7164b253b4 --- /dev/null +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalog.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import static org.jooq.impl.DSL.constraint; +import static org.jooq.impl.DSL.foreignKey; +import static org.jooq.impl.DSL.primaryKey; + +import com.google.common.annotations.VisibleForTesting; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.flywaydb.core.api.migration.BaseJavaMigration; +import org.flywaydb.core.api.migration.Context; +import org.jooq.DSLContext; +import org.jooq.Field; +import org.jooq.JSONB; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration { + + private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class); + + @Override + public void migrate(final Context context) throws Exception { + LOGGER.info("Running migration: {}", this.getClass().getSimpleName()); + + // Warning: please do not use any jOOQ generated code to write a migration. + // As database schema changes, the generated jOOQ code can be deprecated. So + // old migration may not compile if there is any generated code. + final DSLContext ctx = DSL.using(context.getConnection()); + migrate(ctx); + } + + @VisibleForTesting + public static void migrate(final DSLContext ctx) { + createActorCatalog(ctx); + createCatalogFetchEvent(ctx); + addConnectionTableForeignKey(ctx); + } + + private static void createActorCatalog(final DSLContext ctx) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + final Field catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false)); + final Field catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(60).nullable(false)); + final Field createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false)); + ctx.createTableIfNotExists("actor_catalog") + .columns(id, + catalog, + catalogHash, + createdAt) + .constraints(primaryKey(id)) + .execute(); + LOGGER.info("actor_catalog table created"); + ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute(); + } + + private static void createCatalogFetchEvent(final DSLContext ctx) { + final Field id = DSL.field("id", SQLDataType.UUID.nullable(false)); + final Field actorCatalogId = DSL.field("actor_catalog_id", SQLDataType.UUID.nullable(false)); + final Field actorId = DSL.field("actor_id", SQLDataType.UUID.nullable(false)); + final Field configHash = DSL.field("config_hash", SQLDataType.VARCHAR(60).nullable(false)); + final Field actorVersion = DSL.field("actor_version", SQLDataType.VARCHAR(60).nullable(false)); + + ctx.createTableIfNotExists("actor_catalog_fetch_event") + .columns(id, + actorCatalogId, + actorId, + configHash, + actorVersion) + .constraints(primaryKey(id), + foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(), + foreignKey(actorId).references("actor", "id").onDeleteCascade()) + .execute(); + LOGGER.info("actor_catalog_fetch_event table created"); + ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_id_idx").on("actor_catalog_fetch_event", "actor_id").execute(); + ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_catalog_id_idx").on("actor_catalog_fetch_event", "actor_catalog_id").execute(); + } + + private static void addConnectionTableForeignKey(final DSLContext ctx) { + final Field sourceCatalogId = DSL.field("source_catalog_id", SQLDataType.UUID.nullable(true)); + ctx.alterTable("connection") + .addIfNotExists(sourceCatalogId).execute(); + ctx.alterTable("connection") + .dropConstraintIfExists("connection_actor_catalog_id_fk"); + ctx.alterTable("connection") + .add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId) + .references("actor_catalog", "id").onDeleteCascade()) + .execute(); + } + +} diff --git a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt index db7705569f17..1d56e6a65a0b 100644 --- a/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt +++ b/airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt @@ -15,6 +15,23 @@ create table "public"."actor"( constraint "actor_pkey" primary key ("id") ); +create table "public"."actor_catalog"( + "id" uuid not null, + "catalog" jsonb not null, + "catalog_hash" varchar(60) not null, + "created_at" timestamptz(35) not null, + constraint "actor_catalog_pkey" + primary key ("id") +); +create table "public"."actor_catalog_fetch_event"( + "id" uuid not null, + "actor_catalog_id" uuid not null, + "actor_id" uuid not null, + "config_hash" varchar(60) not null, + "actor_version" varchar(60) not null, + constraint "actor_catalog_fetch_event_pkey" + primary key ("id") +); create table "public"."actor_definition"( "id" uuid not null, "name" varchar(256) not null, @@ -73,6 +90,7 @@ create table "public"."connection"( "resource_requirements" jsonb null, "created_at" timestamptz(35) not null default null, "updated_at" timestamptz(35) not null default null, + "source_catalog_id" uuid null, constraint "connection_pkey" primary key ("id") ); @@ -142,10 +160,22 @@ alter table "public"."actor" add constraint "actor_workspace_id_fkey" foreign key ("workspace_id") references "public"."workspace" ("id"); +alter table "public"."actor_catalog_fetch_event" + add constraint "actor_catalog_fetch_event_actor_catalog_id_fkey" + foreign key ("actor_catalog_id") + references "public"."actor_catalog" ("id"); +alter table "public"."actor_catalog_fetch_event" + add constraint "actor_catalog_fetch_event_actor_id_fkey" + foreign key ("actor_id") + references "public"."actor" ("id"); alter table "public"."actor_oauth_parameter" add constraint "actor_oauth_parameter_actor_definition_id_fkey" foreign key ("actor_definition_id") references "public"."actor_definition" ("id"); +alter table "public"."connection" + add constraint "connection_actor_catalog_id_fk" + foreign key ("source_catalog_id") + references "public"."actor_catalog" ("id"); alter table "public"."connection" add constraint "connection_destination_id_fkey" foreign key ("destination_id") @@ -172,6 +202,11 @@ alter table "public"."state" references "public"."connection" ("id"); create index "actor_actor_definition_id_idx" on "public"."actor"("actor_definition_id" asc); create unique index "actor_pkey" on "public"."actor"("id" asc); +create index "actor_catalog_catalog_hash_id_idx" on "public"."actor_catalog"("catalog_hash" asc); +create unique index "actor_catalog_pkey" on "public"."actor_catalog"("id" asc); +create index "actor_catalog_fetch_event_actor_catalog_id_idx" on "public"."actor_catalog_fetch_event"("actor_catalog_id" asc); +create index "actor_catalog_fetch_event_actor_id_idx" on "public"."actor_catalog_fetch_event"("actor_id" asc); +create unique index "actor_catalog_fetch_event_pkey" on "public"."actor_catalog_fetch_event"("id" asc); create unique index "actor_definition_pkey" on "public"."actor_definition"("id" asc); create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc); create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc); diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalogTest.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalogTest.java new file mode 100644 index 000000000000..d7d0b5c2ba1c --- /dev/null +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_35_26_001__PersistDiscoveredCatalogTest.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.instance.configs.migrations; + +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest; +import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType; +import java.io.IOException; +import java.sql.SQLException; +import java.time.OffsetDateTime; +import java.util.UUID; +import org.jooq.DSLContext; +import org.jooq.JSONB; +import org.jooq.impl.DSL; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class V0_35_26_001__PersistDiscoveredCatalogTest extends AbstractConfigsDatabaseTest { + + @Test + public void test() throws SQLException, IOException { + + final Database database = getDatabase(); + final DSLContext context = DSL.using(database.getDataSource().getConnection()); + V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context); + V0_35_26_001__PersistDiscoveredCatalog.migrate(context); + assertCanInsertData(context); + } + + private void assertCanInsertData(final DSLContext ctx) { + Assertions.assertDoesNotThrow(() -> { + final UUID catalogId = UUID.randomUUID(); + final UUID actorId = UUID.randomUUID(); + final UUID actorDefinitionId = UUID.randomUUID(); + final UUID workspaceId = UUID.randomUUID(); + + ctx.insertInto(DSL.table("workspace")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("slug"), + DSL.field("initial_setup_complete")) + .values( + workspaceId, + "default", + "default", + true) + .execute(); + ctx.insertInto(DSL.table("actor_definition")) + .columns( + DSL.field("id"), + DSL.field("name"), + DSL.field("docker_repository"), + DSL.field("docker_image_tag"), + DSL.field("actor_type"), + DSL.field("spec")) + .values( + actorDefinitionId, + "name", + "repo", + "1.0.0", + ActorType.source, + JSONB.valueOf("{}")) + .execute(); + ctx.insertInto(DSL.table("actor")) + .columns( + DSL.field("id"), + DSL.field("workspace_id"), + DSL.field("actor_definition_id"), + DSL.field("name"), + DSL.field("configuration"), + DSL.field("actor_type"), + DSL.field("created_at"), + DSL.field("updated_at")) + .values( + actorId, + workspaceId, + actorDefinitionId, + "some actor", + JSONB.valueOf("{}"), + ActorType.source, + OffsetDateTime.now(), + OffsetDateTime.now()) + .execute(); + ctx.insertInto(DSL.table("actor_catalog")) + .columns( + DSL.field("id"), + DSL.field("catalog"), + DSL.field("catalog_hash"), + DSL.field("created_at")) + .values( + catalogId, + JSONB.valueOf("{}"), + "", + OffsetDateTime.now()) + .execute(); + ctx.insertInto(DSL.table("actor_catalog_fetch_event")) + .columns( + DSL.field("id"), + DSL.field("actor_catalog_id"), + DSL.field("actor_id"), + DSL.field("config_hash"), + DSL.field("actor_version")) + .values( + UUID.randomUUID(), + catalogId, + actorId, + "", + "2.0.1") + .execute(); + }); + } + +}