Skip to content

Commit

Permalink
Sync stats migration (#16285)
Browse files Browse the repository at this point in the history
* Sync stats migration
  • Loading branch information
alovew authored Sep 7, 2022
1 parent bc0d7cc commit 4ef54ad
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.35.62.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.3.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;

import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_3_001__CreateSyncStats extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_3_001__CreateSyncStats.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
final DSLContext ctx = DSL.using(context.getConnection());
createSyncStatsTable(ctx);
}

private static void createSyncStatsTable(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<Integer> attemptId = DSL.field("attempt_id", SQLDataType.INTEGER.nullable(false));
final Field<Long> recordsEmitted = DSL.field("records_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> bytesEmitted = DSL.field("bytes_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> sourceStateMessagesEmitted = DSL.field("source_state_messages_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> destinationStateMessagesEmitted = DSL.field("destination_state_messages_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> recordsCommitted = DSL.field("records_committed", SQLDataType.BIGINT.nullable(true));
final Field<Long> meanSecondsBeforeSourceStateMessageEmitted =
DSL.field("mean_seconds_before_source_state_message_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> maxSecondsBeforeSourceStateMessageEmitted =
DSL.field("max_seconds_before_source_state_message_emitted", SQLDataType.BIGINT.nullable(true));
final Field<Long> meanSecondsBetweenStateMessageEmittedandCommitted =
DSL.field("mean_seconds_between_state_message_emitted_and_committed", SQLDataType.BIGINT.nullable(true));
final Field<Long> maxSecondsBetweenStateMessageEmittedandCommitted =
DSL.field("max_seconds_between_state_message_emitted_and_committed", SQLDataType.BIGINT.nullable(true));
final Field<OffsetDateTime> createdAt =
DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt =
DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

ctx.createTableIfNotExists("sync_stats")
.columns(id, attemptId, recordsEmitted, bytesEmitted, sourceStateMessagesEmitted, destinationStateMessagesEmitted, recordsCommitted,
meanSecondsBeforeSourceStateMessageEmitted, maxSecondsBeforeSourceStateMessageEmitted, meanSecondsBetweenStateMessageEmittedandCommitted,
maxSecondsBetweenStateMessageEmittedandCommitted, createdAt, updatedAt)
.constraints(primaryKey(id), foreignKey(attemptId).references("attempts", "id").onDeleteCascade())
.execute();

ctx.createIndex("attempt_id_idx").on("sync_stats", "attempt_id").execute();
}

}
23 changes: 23 additions & 0 deletions airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,27 @@ create table "public"."jobs"(
constraint "jobs_pkey"
primary key ("id")
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
"source_state_messages_emitted" int8 null,
"destination_state_messages_emitted" int8 null,
"records_committed" int8 null,
"mean_seconds_before_source_state_message_emitted" int8 null,
"max_seconds_before_source_state_message_emitted" int8 null,
"mean_seconds_between_state_message_emitted_and_committed" int8 null,
"max_seconds_between_state_message_emitted_and_committed" int8 null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
constraint "sync_stats_pkey"
primary key ("id")
);
alter table "public"."sync_stats"
add constraint "sync_stats_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
create unique index "airbyte_jobs_migrations_pk" on "public"."airbyte_jobs_migrations"("installed_rank" asc);
create index "airbyte_jobs_migrations_s_idx" on "public"."airbyte_jobs_migrations"("success" asc);
create unique index "airbyte_metadata_pkey" on "public"."airbyte_metadata"("key" asc);
Expand All @@ -60,3 +81,5 @@ create unique index "job_attempt_idx" on "public"."attempts"(
create index "jobs_config_type_idx" on "public"."jobs"("config_type" asc);
create unique index "jobs_pkey" on "public"."jobs"("id" asc);
create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public DefaultJobPersistence(final Database jobDatabase) {
this(jobDatabase, Instant::now, 30, 500, 10);
}

private static String jobSelectAndJoin(String jobsSubquery) {
private static String jobSelectAndJoin(final String jobsSubquery) {
return "SELECT\n"
+ "jobs.id AS job_id,\n"
+ "jobs.config_type AS config_type,\n"
Expand Down Expand Up @@ -802,7 +802,7 @@ private static void truncateTable(final DSLContext ctx, final String schema, fin
final Table<Record> backupTableSql = getTable(backupSchema, tableName);
ctx.dropTableIfExists(backupTableSql).execute();
ctx.createTable(backupTableSql).as(DSL.select(DSL.asterisk()).from(tableSql)).withData().execute();
ctx.truncateTable(tableSql).restartIdentity().execute();
ctx.truncateTable(tableSql).restartIdentity().cascade().execute();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,9 @@ void tearDown() throws Exception {

private void resetDb() throws SQLException {
// todo (cgardens) - truncate whole db.
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).execute());
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).execute());
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).execute());
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).cascade().execute());
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).cascade().execute());
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).cascade().execute());
}

private Result<Record> getJobRecord(final long jobId) throws SQLException {
Expand Down

0 comments on commit 4ef54ad

Please sign in to comment.