Skip to content

Commit

Permalink
Create new tables for catalog storage
Browse files Browse the repository at this point in the history
  • Loading branch information
malikdiarra committed Feb 10, 2022
1 parent 5bc8ec2 commit 721c904
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void testBootloaderAppBlankDb() throws Exception {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.15.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.26.001", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void setup() throws Exception {
final DevDatabaseMigrator devDatabaseMigrator = new DevDatabaseMigrator(configsDatabaseMigrator);
MigrationDevHelper.runLastMigration(devDatabaseMigrator);
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static void tearDown() throws Exception {
@BeforeEach
public void resetDatabase() throws SQLException {
database.query(ctx -> ctx
.execute("TRUNCATE TABLE state, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
.execute(
"TRUNCATE TABLE state, actor_catalog, actor_catalog_fetch_event, connection_operation, connection, operation, actor_oauth_parameter, actor, actor_definition, workspace"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static org.jooq.impl.DSL.constraint;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;

import com.google.common.annotations.VisibleForTesting;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_35_26_001__PersistDiscoveredCatalog extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_26_001__PersistDiscoveredCatalog.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
migrate(ctx);
}

@VisibleForTesting
public static void migrate(final DSLContext ctx) {
createActorCatalog(ctx);
createCatalogFetchEvent(ctx);
addConnectionTableForeignKey(ctx);
}

private static void createActorCatalog(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<JSONB> catalog = DSL.field("catalog", SQLDataType.JSONB.nullable(false));
final Field<String> catalogHash = DSL.field("catalog_hash", SQLDataType.VARCHAR(60).nullable(false));
final Field<OffsetDateTime> createdAt = DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
ctx.createTableIfNotExists("actor_catalog")
.columns(id,
catalog,
catalogHash,
createdAt)
.constraints(primaryKey(id))
.execute();
LOGGER.info("actor_catalog table created");
ctx.createIndexIfNotExists("actor_catalog_catalog_hash_id_idx").on("actor_catalog", "catalog_hash").execute();
}

private static void createCatalogFetchEvent(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<UUID> actorCatalogId = DSL.field("actor_catalog_id", SQLDataType.UUID.nullable(false));
final Field<UUID> actorId = DSL.field("actor_id", SQLDataType.UUID.nullable(false));
final Field<String> configHash = DSL.field("config_hash", SQLDataType.VARCHAR(60).nullable(false));
final Field<String> actorVersion = DSL.field("actor_version", SQLDataType.VARCHAR(60).nullable(false));

ctx.createTableIfNotExists("actor_catalog_fetch_event")
.columns(id,
actorCatalogId,
actorId,
configHash,
actorVersion)
.constraints(primaryKey(id),
foreignKey(actorCatalogId).references("actor_catalog", "id").onDeleteCascade(),
foreignKey(actorId).references("actor", "id").onDeleteCascade())
.execute();
LOGGER.info("actor_catalog_fetch_event table created");
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_id_idx").on("actor_catalog_fetch_event", "actor_id").execute();
ctx.createIndexIfNotExists("actor_catalog_fetch_event_actor_catalog_id_idx").on("actor_catalog_fetch_event", "actor_catalog_id").execute();
}

private static void addConnectionTableForeignKey(final DSLContext ctx) {
final Field<UUID> sourceCatalogId = DSL.field("source_catalog_id", SQLDataType.UUID.nullable(true));
ctx.alterTable("connection")
.addIfNotExists(sourceCatalogId).execute();
ctx.alterTable("connection")
.dropConstraintIfExists("connection_actor_catalog_id_fk");
ctx.alterTable("connection")
.add(constraint("connection_actor_catalog_id_fk").foreignKey(sourceCatalogId)
.references("actor_catalog", "id").onDeleteCascade())
.execute();
}

}
35 changes: 35 additions & 0 deletions airbyte-db/lib/src/main/resources/configs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@ create table "public"."actor"(
constraint "actor_pkey"
primary key ("id")
);
create table "public"."actor_catalog"(
"id" uuid not null,
"catalog" jsonb not null,
"catalog_hash" varchar(60) not null,
"created_at" timestamptz(35) not null,
constraint "actor_catalog_pkey"
primary key ("id")
);
create table "public"."actor_catalog_fetch_event"(
"id" uuid not null,
"actor_catalog_id" uuid not null,
"actor_id" uuid not null,
"config_hash" varchar(60) not null,
"actor_version" varchar(60) not null,
constraint "actor_catalog_fetch_event_pkey"
primary key ("id")
);
create table "public"."actor_definition"(
"id" uuid not null,
"name" varchar(256) not null,
Expand Down Expand Up @@ -73,6 +90,7 @@ create table "public"."connection"(
"resource_requirements" jsonb null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
"source_catalog_id" uuid null,
constraint "connection_pkey"
primary key ("id")
);
Expand Down Expand Up @@ -142,10 +160,22 @@ alter table "public"."actor"
add constraint "actor_workspace_id_fkey"
foreign key ("workspace_id")
references "public"."workspace" ("id");
alter table "public"."actor_catalog_fetch_event"
add constraint "actor_catalog_fetch_event_actor_catalog_id_fkey"
foreign key ("actor_catalog_id")
references "public"."actor_catalog" ("id");
alter table "public"."actor_catalog_fetch_event"
add constraint "actor_catalog_fetch_event_actor_id_fkey"
foreign key ("actor_id")
references "public"."actor" ("id");
alter table "public"."actor_oauth_parameter"
add constraint "actor_oauth_parameter_actor_definition_id_fkey"
foreign key ("actor_definition_id")
references "public"."actor_definition" ("id");
alter table "public"."connection"
add constraint "connection_actor_catalog_id_fk"
foreign key ("source_catalog_id")
references "public"."actor_catalog" ("id");
alter table "public"."connection"
add constraint "connection_destination_id_fkey"
foreign key ("destination_id")
Expand All @@ -172,6 +202,11 @@ alter table "public"."state"
references "public"."connection" ("id");
create index "actor_actor_definition_id_idx" on "public"."actor"("actor_definition_id" asc);
create unique index "actor_pkey" on "public"."actor"("id" asc);
create index "actor_catalog_catalog_hash_id_idx" on "public"."actor_catalog"("catalog_hash" asc);
create unique index "actor_catalog_pkey" on "public"."actor_catalog"("id" asc);
create index "actor_catalog_fetch_event_actor_catalog_id_idx" on "public"."actor_catalog_fetch_event"("actor_catalog_id" asc);
create index "actor_catalog_fetch_event_actor_id_idx" on "public"."actor_catalog_fetch_event"("actor_id" asc);
create unique index "actor_catalog_fetch_event_pkey" on "public"."actor_catalog_fetch_event"("id" asc);
create unique index "actor_definition_pkey" on "public"."actor_definition"("id" asc);
create unique index "actor_oauth_parameter_pkey" on "public"."actor_oauth_parameter"("id" asc);
create unique index "airbyte_configs_migrations_pk" on "public"."airbyte_configs_migrations"("installed_rank" asc);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import io.airbyte.db.Database;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.migrations.V0_32_8_001__AirbyteConfigDatabaseDenormalization.ActorType;
import java.io.IOException;
import java.sql.SQLException;
import java.time.OffsetDateTime;
import java.util.UUID;
import org.jooq.DSLContext;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class V0_35_26_001__PersistDiscoveredCatalogTest extends AbstractConfigsDatabaseTest {

@Test
public void test() throws SQLException, IOException {

final Database database = getDatabase();
final DSLContext context = DSL.using(database.getDataSource().getConnection());
V0_32_8_001__AirbyteConfigDatabaseDenormalization.migrate(context);
V0_35_26_001__PersistDiscoveredCatalog.migrate(context);
assertCanInsertData(context);
}

private void assertCanInsertData(final DSLContext ctx) {
Assertions.assertDoesNotThrow(() -> {
final UUID catalogId = UUID.randomUUID();
final UUID actorId = UUID.randomUUID();
final UUID actorDefinitionId = UUID.randomUUID();
final UUID workspaceId = UUID.randomUUID();

ctx.insertInto(DSL.table("workspace"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("slug"),
DSL.field("initial_setup_complete"))
.values(
workspaceId,
"default",
"default",
true)
.execute();
ctx.insertInto(DSL.table("actor_definition"))
.columns(
DSL.field("id"),
DSL.field("name"),
DSL.field("docker_repository"),
DSL.field("docker_image_tag"),
DSL.field("actor_type"),
DSL.field("spec"))
.values(
actorDefinitionId,
"name",
"repo",
"1.0.0",
ActorType.source,
JSONB.valueOf("{}"))
.execute();
ctx.insertInto(DSL.table("actor"))
.columns(
DSL.field("id"),
DSL.field("workspace_id"),
DSL.field("actor_definition_id"),
DSL.field("name"),
DSL.field("configuration"),
DSL.field("actor_type"),
DSL.field("created_at"),
DSL.field("updated_at"))
.values(
actorId,
workspaceId,
actorDefinitionId,
"some actor",
JSONB.valueOf("{}"),
ActorType.source,
OffsetDateTime.now(),
OffsetDateTime.now())
.execute();
ctx.insertInto(DSL.table("actor_catalog"))
.columns(
DSL.field("id"),
DSL.field("catalog"),
DSL.field("catalog_hash"),
DSL.field("created_at"))
.values(
catalogId,
JSONB.valueOf("{}"),
"",
OffsetDateTime.now())
.execute();
ctx.insertInto(DSL.table("actor_catalog_fetch_event"))
.columns(
DSL.field("id"),
DSL.field("actor_catalog_id"),
DSL.field("actor_id"),
DSL.field("config_hash"),
DSL.field("actor_version"))
.values(
UUID.randomUUID(),
catalogId,
actorId,
"",
"2.0.1")
.execute();
});
}

}

0 comments on commit 721c904

Please sign in to comment.