Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new tables for catalog storage #10226

Merged
merged 1 commit into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.15.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there some helper we should right for this? It seems like we have the same truncate query in a bunch of different tests and have to update it in a bunch places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely agree with that. I haven't seen any existing helper function for that. I'll add one in BaseDatabaseConfigPersistenceTest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have that ready in a separate commit.

"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static void tearDown() throws Exception {
@BeforeEach
public void resetDatabase() throws SQLException {
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static org.jooq.impl.DSL.constraint;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;

import com.google.common.annotations.VisibleForTesting;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
migrate(ctx);
}

@VisibleForTesting
public static void migrate(final DSLContext ctx) {
createActorCatalog(ctx);
createCatalogFetchEvent(ctx);
addConnectionTableForeignKey(ctx);
}

private static void createActorCatalog(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<JSONB> catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false));
final Field<String> catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(32).nullable(false));
final Field<OffsetDateTime> createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
ctx.createTableIfNotExists("actor_catalog")
.columns(id,
catalog,
catalogHash,
createdAt)
.constraints(primaryKey(id))
.execute();
LOGGER.info("actor_catalog table created");
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute();
}

private static void createCatalogFetchEvent(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<UUID> actorCatalogId = DSL.field("actor_catalog_id", SQLDataType.UUID.nullable(false));
final Field<UUID> actorId = DSL.field("actor_id", SQLDataType.UUID.nullable(false));
final Field<String> configHash = DSL.field("config_hash", SQLDataType.VARCHAR(32).nullable(false));
final Field<String> actorVersion = DSL.field("actor_version", SQLDataType.VARCHAR(256).nullable(false));

ctx.createTableIfNotExists("actor_catalog_fetch_event")
.columns(id,
actorCatalogId,
actorId,
configHash,
actorVersion)
.constraints(primaryKey(id),
foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(),
foreignKey(actorId).references("actor", "id").onDeleteCascade())
.execute();
LOGGER.info("actor_catalog_fetch_event table created");
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_id_idx").on("actor_catalog_fetch_event", "actor_id").execute();
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_catalog_id_idx").on("actor_catalog_fetch_event", "actor_catalog_id").execute();
}

private static void addConnectionTableForeignKey(final DSLContext ctx) {
final Field<UUID> sourceCatalogId = DSL.field("source_catalog_id", SQLDataType.UUID.nullable(true));
ctx.alterTable("connection")
.addIfNotExists(sourceCatalogId).execute();
ctx.alterTable("connection")
.dropConstraintIfExists("connection_actor_catalog_id_fk");
ctx.alterTable("connection")
.add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId)
.references("actor_catalog", "id").onDeleteCascade())
.execute();
}

}
35 changes: 35 additions & 0 deletions airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@ create table "public"."actor"(
constraint "actor_pkey"
primary key ("id")
);
create table "public"."actor_catalog"(
"id" uuid not null,
"catalog" jsonb not null,
"catalog_hash" varchar(32) not null,
"created_at" timestamptz(35) not null,
constraint "actor_catalog_pkey"
primary key ("id")
);
create table "public"."actor_catalog_fetch_event"(
"id" uuid not null,
"actor_catalog_id" uuid not null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it would make sense to add an index on actor_catalog_id. I think one of the queries we'll want to run frequently is when have seen this catalog returned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"actor_id" uuid not null,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make sense to do an index on actor_id. I think the most common query we will do on this table is get the record for an actor or get all records for the actor. Potentially

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

"config_hash" varchar(32) not null,
"actor_version" varchar(256) not null,
constraint "actor_catalog_fetch_event_pkey"
primary key ("id")
);
create table "public"."actor_definition"(
"id" uuid not null,
"name" varchar(256) not null,
Expand Down Expand Up @@ -73,6 +90,7 @@ create table "public"."connection"(
"resource_requirements" jsonb null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"source_catalog_id" uuid null,
constraint "connection_pkey"
primary key ("id")
);
Expand Down Expand Up @@ -142,10 +160,22 @@ alter table "public"."actor"
add constraint "actor_workspace_id_fkey"
foreign key ("workspace_id")
references "public"."workspace" ("id");
alter table "public"."actor_catalog_fetch_event"
add constraint "actor_catalog_fetch_event_actor_catalog_id_fkey"
foreign key ("actor_catalog_id")
references "public"."actor_catalog" ("id");
alter table "public"."actor_catalog_fetch_event"
add constraint "actor_catalog_fetch_event_actor_id_fkey"
foreign key ("actor_id")
references "public"."actor" ("id");
alter table "public"."actor_oauth_parameter"
add constraint "actor_oauth_parameter_actor_definition_id_fkey"
foreign key ("actor_definition_id")
references "public"."actor_definition" ("id");
alter table "public"."connection"
add constraint "connection_actor_catalog_id_fk"
foreign key ("source_catalog_id")
references "public"."actor_catalog" ("id");
alter table "public"."connection"
add constraint "connection_destination_id_fkey"
foreign key ("destination_id")
Expand All @@ -172,6 +202,11 @@ alter table "public"."state"
references "public"."connection" ("id");
create index "actor_actor_definition_id_idx" on "public"."actor"("actor_definition_id" asc);
create unique index "actor_pkey" on "public"."actor"("id" asc);
create index "actor_catalog_catalog_hash_id_idx" on "public"."actor_catalog"("catalog_hash" asc);
create unique index "actor_catalog_pkey" on "public"."actor_catalog"("id" asc);
create index "actor_catalog_fetch_event_actor_catalog_id_idx" on "public"."actor_catalog_fetch_event"("actor_catalog_id" asc);
create index "actor_catalog_fetch_event_actor_id_idx" on "public"."actor_catalog_fetch_event"("actor_id" asc);
create unique index "actor_catalog_fetch_event_pkey" on "public"."actor_catalog_fetch_event"("id" asc);
create unique index "actor_definition_pkey" on "public"."actor_definition"("id" asc);
create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc);
create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class V0_35_26_001__PersistDiscoveredCatalogTest extends AbstractConfigsDatabaseTest {

@Test
public void test() throws SQLException, IOException {

final Database database = getDatabase();
final DSLContext context = DSL.using(database.getDataSource().getConnection());
V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context);
V0_35_26_001__PersistDiscoveredCatalog.migrate(context);
assertCanInsertData(context);
}

private void assertCanInsertData(final DSLContext ctx) {
Assertions.assertDoesNotThrow(() -> {
final UUID catalogId = UUID.randomUUID();
final UUID actorId = UUID.randomUUID();
final UUID actorDefinitionId = UUID.randomUUID();
final UUID workspaceId = UUID.randomUUID();

ctx.insertInto(DSL.table("workspace"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("slug"),
DSL.field("initial_setup_complete"))
.values(
workspaceId,
"default",
"default",
true)
.execute();
ctx.insertInto(DSL.table("actor_definition"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("docker_repository"),
DSL.field("docker_image_tag"),
DSL.field("actor_type"),
DSL.field("spec"))
.values(
actorDefinitionId,
"name",
"repo",
"1.0.0",
ActorType.source,
JSONB.valueOf("{}"))
.execute();
ctx.insertInto(DSL.table("actor"))
.columns(
DSL.field("id"),
DSL.field("workspace_id"),
DSL.field("actor_definition_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"),
DSL.field("created_at"),
DSL.field("updated_at"))
.values(
actorId,
workspaceId,
actorDefinitionId,
"some actor",
JSONB.valueOf("{}"),
ActorType.source,
OffsetDateTime.now(),
OffsetDateTime.now())
.execute();
ctx.insertInto(DSL.table("actor_catalog"))
.columns(
DSL.field("id"),
DSL.field("catalog"),
DSL.field("catalog_hash"),
DSL.field("created_at"))
.values(
catalogId,
JSONB.valueOf("{}"),
"",
OffsetDateTime.now())
.execute();
ctx.insertInto(DSL.table("actor_catalog_fetch_event"))
.columns(
DSL.field("id"),
DSL.field("actor_catalog_id"),
DSL.field("actor_id"),
DSL.field("config_hash"),
DSL.field("actor_version"))
.values(
UUID.randomUUID(),
catalogId,
actorId,
"",
"2.0.1")
.execute();
});
}

}