Skip to content

Commit

Permalink
Add normalization to destination definition and actor definition table (
Browse files Browse the repository at this point in the history
#18300)

* updated StandardDestinationDefinition.yaml, added normalization and tags to the destination_definition.yaml and added information about normalization and DBT to the ACTOR_DEFINITION table

* updated docs

* updated BootloaderAppTest.java for new migration

* updated schema dump

* Update normalization version and fix bigquery

* Use varchar 255

* Update migration version to the latest

* Update normalized table schema file and add comment

* Revert "Use varchar 255"

This reverts commit e182466.

* Use varchar 255

* Add unit test for migration

* Format code

Co-authored-by: Liren Tu <tuliren@gmail.com>
  • Loading branch information
andriikorotkov and tuliren authored Nov 4, 2022
1 parent 15143f7 commit 350d544
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void testBootloaderAppBlankDb() throws Exception {
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
// to show that you meant to make a new migration to the prod database
assertEquals("0.40.18.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.18.002", configsMigrator.getLatestMigration().getVersion().getVersion());

val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(VERSION_0330_ALPHA, jobsPersistence.getVersion().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,10 @@ properties:
protocolVersion:
type: string
description: the Airbyte Protocol version supported by the connector
normalizationRepository:
type: string
normalizationTag:
type: string
supportsDbt:
type: boolean
default: false
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ static void writeStandardDestinationDefinition(final List<StandardDestinationDef
standardDestinationDefinition.getResourceRequirements() == null ? null
: JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getResourceRequirements())))
.set(Tables.ACTOR_DEFINITION.UPDATED_AT, timestamp)
.set(Tables.ACTOR_DEFINITION.NORMALIZATION_REPOSITORY, standardDestinationDefinition.getNormalizationRepository())
.set(Tables.ACTOR_DEFINITION.NORMALIZATION_TAG, standardDestinationDefinition.getNormalizationTag())
.set(Tables.ACTOR_DEFINITION.SUPPORTS_DBT, standardDestinationDefinition.getSupportsDbt())
.where(Tables.ACTOR_DEFINITION.ID.eq(standardDestinationDefinition.getDestinationDefinitionId()))
.execute();

Expand Down Expand Up @@ -159,6 +162,9 @@ static void writeStandardDestinationDefinition(final List<StandardDestinationDef
: JSONB.valueOf(Jsons.serialize(standardDestinationDefinition.getResourceRequirements())))
.set(Tables.ACTOR_DEFINITION.CREATED_AT, timestamp)
.set(Tables.ACTOR_DEFINITION.UPDATED_AT, timestamp)
.set(Tables.ACTOR_DEFINITION.NORMALIZATION_REPOSITORY, standardDestinationDefinition.getNormalizationRepository())
.set(Tables.ACTOR_DEFINITION.NORMALIZATION_TAG, standardDestinationDefinition.getNormalizationTag())
.set(Tables.ACTOR_DEFINITION.SUPPORTS_DBT, standardDestinationDefinition.getSupportsDbt())
.execute();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
dockerImageTag: 1.2.5
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationRepository: airbyte/normalization
normalizationTag: 0.2.24
supportsDbt: true
resourceRequirements:
jobSpecific:
- jobType: sync
Expand All @@ -43,6 +46,8 @@
dockerImageTag: 1.2.5
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationRepository: airbyte/normalization
normalizationTag: 0.2.24
resourceRequirements:
jobSpecific:
- jobType: sync
Expand Down Expand Up @@ -70,6 +75,9 @@
dockerImageTag: 0.2.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse
releaseStage: alpha
normalizationRepository: airbyte/normalization-clickhouse
normalizationTag: 0.2.24
supportsDbt: true
- name: Cloudflare R2
destinationDefinitionId: 0fb07be9-7c3b-4336-850d-5efc006152ee
dockerRepository: airbyte/destination-r2
Expand Down Expand Up @@ -185,6 +193,9 @@
documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql
icon: mssql.svg
releaseStage: alpha
normalizationRepository: airbyte/normalization-mssql
normalizationTag: 0.2.24
supportsDbt: true
- name: MeiliSearch
destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
dockerRepository: airbyte/destination-meilisearch
Expand All @@ -206,20 +217,29 @@
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
icon: mysql.svg
releaseStage: alpha
normalizationRepository: airbyte/normalization-mysql
normalizationTag: 0.2.24
supportsDbt: true
- name: Oracle
destinationDefinitionId: 3986776d-2319-4de9-8af8-db14c0996e72
dockerRepository: airbyte/destination-oracle
dockerImageTag: 0.1.19
documentationUrl: https://docs.airbyte.com/integrations/destinations/oracle
icon: oracle.svg
releaseStage: alpha
normalizationRepository: airbyte/normalization-oracle
normalizationTag: 0.2.24
supportsDbt: true
- name: Postgres
destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.3.26
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
icon: postgresql.svg
releaseStage: alpha
normalizationRepository: airbyte/normalization-postgres
normalizationTag: 0.2.24
supportsDbt: true
- name: Pulsar
destinationDefinitionId: 2340cbba-358e-11ec-8d3d-0242ac130203
dockerRepository: airbyte/destination-pulsar
Expand Down Expand Up @@ -247,6 +267,9 @@
dockerImageTag: 0.3.51
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
icon: redshift.svg
normalizationRepository: airbyte/normalization-redshift
normalizationTag: 0.2.24
supportsDbt: true
resourceRequirements:
jobSpecific:
- jobType: sync
Expand Down Expand Up @@ -286,6 +309,9 @@
dockerImageTag: 0.4.38
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationRepository: airbyte/normalization-snowflake
normalizationTag: 0.2.24
supportsDbt: true
resourceRequirements:
jobSpecific:
- jobType: sync
Expand Down Expand Up @@ -335,6 +361,9 @@
documentationUrl: https://docs.airbyte.com/integrations/destinations/tidb
icon: tidb.svg
releaseStage: alpha
normalizationRepository: airbyte/normalization-tidb
normalizationTag: 0.2.24
supportsDbt: true
- name: Typesense
destinationDefinitionId: 36be8dc6-9851-49af-b776-9d4c30e4ab6a
dockerRepository: airbyte/destination-typesense
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumns extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumns.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
final DSLContext ctx = DSL.using(context.getConnection());
addNormalizationRepositoryColumn(ctx);
addNormalizationTagColumn(ctx);
addSupportsDbtColumn(ctx);
}

static void addNormalizationRepositoryColumn(final DSLContext ctx) {
ctx.alterTable("actor_definition")
.addColumnIfNotExists(DSL.field(
"normalization_repository",
SQLDataType.VARCHAR(255).nullable(true)))
.execute();
}

static void addNormalizationTagColumn(final DSLContext ctx) {
ctx.alterTable("actor_definition")
.addColumnIfNotExists(DSL.field(
"normalization_tag",
SQLDataType.VARCHAR(255).nullable(true)))
.execute();
}

static void addSupportsDbtColumn(final DSLContext ctx) {
ctx.alterTable("actor_definition")
.addColumnIfNotExists(DSL.field("supports_dbt",
SQLDataType.BOOLEAN.nullable(true)))
.execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ public class V0_40_3_002__RemoveActorForeignKeyFromOauthParamsTable extends Base
@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
removeActorDefinitionForeignKey(ctx);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
// The file represents the schema of each of the config table
// The file represents the schema of each of the config table after the normalization.
// It is created in https://github.com/airbytehq/airbyte/pull/8563.
// This schema is maintained manually by copying / pasting the output from Postgres' \d+ command.
// It is not consumed programmatically, and can be outdated.

enum_schema | enum_name | enum_value
-------------+---------------------------+------------------------------
Expand Down Expand Up @@ -68,23 +71,26 @@ Referenced by:


Table "public.actor_definition"
Column | Type | Collation | Nullable | Default
-----------------------+--------------------------+-----------+----------+-------------------
id | uuid | | not null |
name | character varying(256) | | not null |
docker_repository | character varying(256) | | not null |
docker_image_tag | character varying(256) | | not null |
documentation_url | character varying(256) | | |
icon | character varying(256) | | |
actor_type | actor_type | | not null |
source_type | source_type | | |
spec | jsonb | | not null |
created_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP
updated_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP
tombstone | boolean | | not null | false
release_stage | release_stage | | |
release_date | date | | |
resource_requirements | jsonb | | |
Column | Type | Collation | Nullable | Default
----------------------------+--------------------------+-----------+----------+-------------------
id | uuid | | not null |
name | character varying(256) | | not null |
docker_repository | character varying(256) | | not null |
docker_image_tag | character varying(256) | | not null |
documentation_url | character varying(256) | | |
icon | character varying(256) | | |
actor_type | actor_type | | not null |
source_type | source_type | | |
spec | jsonb | | not null |
created_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP
updated_at | timestamp with time zone | | not null | CURRENT_TIMESTAMP
tombstone | boolean | | not null | false
release_stage | release_stage | | |
release_date | date | | |
resource_requirements | jsonb | | |
normalization_repository | character varying(255) | | |
normalization_tag | character varying(255) | | |
supports_dbt | boolean | | |
Indexes:
"actor_definition_pkey" PRIMARY KEY, btree (id)
Referenced by:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ create table "public"."actor_definition"(
"public" bool not null default false,
"custom" bool not null default false,
"protocol_version" varchar(255) null,
"normalization_repository" varchar(255) null,
"normalization_tag" varchar(255) null,
"supports_dbt" bool null,
constraint "actor_definition_pkey"
primary key ("id")
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.configs.migrations;

import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.db.factory.FlywayFactory;
import io.airbyte.db.instance.configs.AbstractConfigsDatabaseTest;
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.development.DevDatabaseMigrator;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumnsTest extends AbstractConfigsDatabaseTest {

@BeforeEach
void beforeEach() {
final Flyway flyway =
FlywayFactory.create(dataSource, "V0_40_18_001__AddInvalidProtocolFlagToConnections", ConfigsDatabaseMigrator.DB_IDENTIFIER,
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
final ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway);

final BaseJavaMigration previousMigration = new V0_40_18_001__AddInvalidProtocolFlagToConnections();
final DevDatabaseMigrator devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator, previousMigration.getVersion());
devConfigsDbMigrator.createBaseline();
}

@Test
void test() throws Exception {
final DSLContext context = getDslContext();
assertFalse(columnExists(context, "normalization_repository"));
assertFalse(columnExists(context, "normalization_tag"));
assertFalse(columnExists(context, "supports_dbt"));
V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumns.addNormalizationRepositoryColumn(context);
assertTrue(columnExists(context, "normalization_repository"));
V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumns.addNormalizationTagColumn(context);
assertTrue(columnExists(context, "normalization_tag"));
V0_40_18_002__AddActorDefinitionNormalizationAndDbtColumns.addSupportsDbtColumn(context);
assertTrue(columnExists(context, "supports_dbt"));
}

static boolean columnExists(final DSLContext ctx, final String columnName) {
return ctx.fetchExists(DSL.select()
.from("information_schema.columns")
.where(DSL.field("table_name").eq("actor_definition")
.and(DSL.field("column_name").eq(columnName))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.IOException;
import java.sql.SQLException;
import org.flywaydb.core.Flyway;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -28,7 +29,7 @@ void beforeEach() {
ConfigsDatabaseMigrator.MIGRATION_FILE_LOCATION);
final ConfigsDatabaseMigrator configsDbMigrator = new ConfigsDatabaseMigrator(database, flyway);

final V0_40_3_001__AddProtocolVersionToActorDefinition previousMigration = new V0_40_3_001__AddProtocolVersionToActorDefinition();
final BaseJavaMigration previousMigration = new V0_40_3_001__AddProtocolVersionToActorDefinition();
final DevDatabaseMigrator devConfigsDbMigrator = new DevDatabaseMigrator(configsDbMigrator, previousMigration.getVersion());
devConfigsDbMigrator.createBaseline();
}
Expand Down

0 comments on commit 350d544

Please sign in to comment.