Skip to content

Commit

Permalink
Normalization Summaries table and read/write methods (airbytehq#16655)
Browse files Browse the repository at this point in the history
* Add migration to create normalization summaries table
- read/write methods for normalization summary
  • Loading branch information
alovew authored and jhammarstedt committed Oct 31, 2022
1 parent 33f4af8 commit 74fbe0a
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.4.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.4.002", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.currentOffsetDateTime;
import static org.jooq.impl.DSL.foreignKey;
import static org.jooq.impl.DSL.primaryKey;

import java.time.OffsetDateTime;
import java.util.UUID;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// TODO: update migration description in the class name
public class V0_40_4_002__CreateNormalizationSummaries extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_4_002__CreateNormalizationSummaries.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
createNormalizationSummariesTable(ctx);
}

private void createNormalizationSummariesTable(final DSLContext ctx) {
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<Long> attemptId = DSL.field("attempt_id", SQLDataType.BIGINT.nullable(false));
final Field<OffsetDateTime> startTime = DSL.field("start_time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(true));
final Field<OffsetDateTime> endTime = DSL.field("end_time", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(true));
final Field<JSONB> failures = DSL.field("failures", SQLDataType.JSONB.nullable(true));
final Field<OffsetDateTime> createdAt =
DSL.field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));
final Field<OffsetDateTime> updatedAt =
DSL.field("updated_at", SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false).defaultValue(currentOffsetDateTime()));

ctx.createTableIfNotExists("normalization_summaries")
.columns(id, attemptId, startTime, endTime, failures, createdAt, updatedAt)
.constraints(primaryKey(id), foreignKey(attemptId).references("attempts", "id").onDeleteCascade())
.execute();

ctx.createIndex("normalization_summary_attempt_id_idx").on("normalization_summaries", "attempt_id").execute();

}

}
17 changes: 17 additions & 0 deletions airbyte-db/db-lib/src/main/resources/jobs_database/schema_dump.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ create table "public"."jobs"(
constraint "jobs_pkey"
primary key ("id")
);
create table "public"."normalization_summaries"(
"id" uuid not null,
"attempt_id" int8 not null,
"start_time" timestamptz(35) null,
"end_time" timestamptz(35) null,
"failures" jsonb null,
"created_at" timestamptz(35) not null default null,
"updated_at" timestamptz(35) not null default null,
constraint "normalization_summaries_pkey"
primary key ("id")
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int8 not null,
Expand All @@ -66,6 +77,10 @@ create table "public"."sync_stats"(
constraint "sync_stats_pkey"
primary key ("id")
);
alter table "public"."normalization_summaries"
add constraint "normalization_summaries_attempt_id_fkey"
foreign key ("attempt_id")
references "public"."attempts" ("id");
alter table "public"."sync_stats"
add constraint "sync_stats_attempt_id_fkey"
foreign key ("attempt_id")
Expand All @@ -81,5 +96,7 @@ create unique index "job_attempt_idx" on "public"."attempts"(
create index "jobs_config_type_idx" on "public"."jobs"("config_type" asc);
create unique index "jobs_pkey" on "public"."jobs"("id" asc);
create index "jobs_scope_idx" on "public"."jobs"("scope" asc);
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.NORMALIZATION_SUMMARIES;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -22,9 +25,11 @@
import io.airbyte.commons.text.Sqls;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
Expand Down Expand Up @@ -310,7 +315,12 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int
}

@Override
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output, final SyncStats syncStats) throws IOException {
public <T> void writeOutput(final long jobId,
final int attemptNumber,
final T output,
final SyncStats syncStats,
final NormalizationSummary normalizationSummary)
throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
jobDatabase.transaction(ctx -> {
ctx.update(ATTEMPTS)
Expand Down Expand Up @@ -338,6 +348,19 @@ public <T> void writeOutput(final long jobId, final int attemptNumber, final T o
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.execute();

if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
.set(NORMALIZATION_SUMMARIES.ID, UUID.randomUUID())
.set(NORMALIZATION_SUMMARIES.UPDATED_AT, now)
.set(NORMALIZATION_SUMMARIES.CREATED_AT, now)
.set(NORMALIZATION_SUMMARIES.ATTEMPT_ID, attemptId)
.set(NORMALIZATION_SUMMARIES.START_TIME,
OffsetDateTime.ofInstant(Instant.ofEpochMilli(normalizationSummary.getStartTime()), ZoneOffset.UTC))
.set(NORMALIZATION_SUMMARIES.END_TIME, OffsetDateTime.ofInstant(Instant.ofEpochMilli(normalizationSummary.getEndTime()), ZoneOffset.UTC))
.set(NORMALIZATION_SUMMARIES.FAILURES, JSONB.valueOf(Jsons.serialize(normalizationSummary.getFailures())))
.execute();
}
return null;
});

Expand All @@ -364,6 +387,15 @@ public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
.toList());
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final Long attemptId) throws IOException, JsonProcessingException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(NORMALIZATION_SUMMARIES).where(NORMALIZATION_SUMMARIES.ATTEMPT_ID.eq(attemptId))
.fetch(getNormalizationSummaryRecordMapper())
.stream()
.toList());
}

private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED))
.withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED))
Expand All @@ -375,6 +407,24 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper() {
final RecordMapper<Record, NormalizationSummary> recordMapper = record -> {
try {
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
.withFailures(record.get(NORMALIZATION_SUMMARIES.FAILURES, String.class) == null ? null : deserializeFailureReasons(record));
} catch (final JsonProcessingException e) {
throw new RuntimeException(e);
}
};
return recordMapper;
}

private static List<FailureReason> deserializeFailureReasons(final Record record) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
return List.of(mapper.readValue(String.valueOf(record.get(NORMALIZATION_SUMMARIES.FAILURES)), FailureReason[].class));
}

@Override
public Job getJob(final long jobId) throws IOException {
return jobDatabase.query(ctx -> getJob(ctx, jobId));
Expand Down Expand Up @@ -438,7 +488,7 @@ public List<Job> listJobsIncludingId(final Set<ConfigType> configTypes, final St
.fetchOne().into(int.class));

// calculate the multiple of `pagesize` that includes the target job
int pageSizeThatIncludesJob = (countIncludingJob / pagesize + 1) * pagesize;
final int pageSizeThatIncludesJob = (countIncludingJob / pagesize + 1) * pagesize;
return listJobs(configTypes, connectionId, pageSizeThatIncludesJob, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.AttemptWithJobInfo;
Expand All @@ -32,6 +33,8 @@ public interface JobPersistence {

List<SyncStats> getSyncStats(Long attemptId) throws IOException;

List<NormalizationSummary> getNormalizationSummary(Long attemptId) throws IOException;

Job getJob(long jobId) throws IOException;

//
Expand Down Expand Up @@ -128,7 +131,7 @@ public interface JobPersistence {
* StandardSyncOutput#state in the configs database by calling
* ConfigRepository#updateConnectionState, which takes care of persisting the connection state.
*/
<T> void writeOutput(long jobId, int attemptNumber, T output, SyncStats syncStats) throws IOException;
<T> void writeOutput(long jobId, int attemptNumber, T output, SyncStats syncStats, NormalizationSummary normalizationSummary) throws IOException;

/**
* Writes a summary of all failures that occurred during the attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.FailureReason.FailureType;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
Expand Down Expand Up @@ -258,13 +260,21 @@ void testWriteOutput() throws IOException, SQLException {
new SyncStats().withBytesEmitted(100L).withRecordsEmitted(9L).withRecordsCommitted(10L).withDestinationStateMessagesEmitted(1L)
.withSourceStateMessagesEmitted(4L).withMaxSecondsBeforeSourceStateMessageEmitted(5L).withMeanSecondsBeforeSourceStateMessageEmitted(2L)
.withMaxSecondsBetweenStateMessageEmittedandCommitted(10L).withMeanSecondsBetweenStateMessageEmittedandCommitted(3L);
final FailureReason failureReason1 = new FailureReason().withFailureOrigin(FailureOrigin.DESTINATION).withFailureType(FailureType.SYSTEM_ERROR)
.withExternalMessage("There was a normalization error");
final FailureReason failureReason2 = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE).withFailureType(FailureType.CONFIG_ERROR)
.withExternalMessage("There was another normalization error");

final NormalizationSummary normalizationSummary =
new NormalizationSummary().withStartTime(10L).withEndTime(500L).withFailures(List.of(failureReason1, failureReason2));
final StandardSyncOutput standardSyncOutput =
new StandardSyncOutput().withStandardSyncSummary(new StandardSyncSummary().withTotalStats(syncStats));
new StandardSyncOutput().withStandardSyncSummary(new StandardSyncSummary().withTotalStats(syncStats))
.withNormalizationSummary(normalizationSummary);
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG).withSync(standardSyncOutput);

when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput,
jobOutput.getSync().getStandardSyncSummary().getTotalStats());
jobOutput.getSync().getStandardSyncSummary().getTotalStats(), jobOutput.getSync().getNormalizationSummary());

final Job updated = jobPersistence.getJob(jobId);

Expand All @@ -288,6 +298,10 @@ void testWriteOutput() throws IOException, SQLException {
assertEquals(10L, storedSyncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
assertEquals(3L, storedSyncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

final NormalizationSummary storedNormalizationSummary = jobPersistence.getNormalizationSummary(attemptId).stream().findFirst().get();
assertEquals(10L, storedNormalizationSummary.getStartTime());
assertEquals(500L, storedNormalizationSummary.getEndTime());
assertEquals(List.of(failureReason1, failureReason2), storedNormalizationSummary.getFailures());
}

@Test
Expand Down Expand Up @@ -1036,7 +1050,7 @@ class GetJobCount {
@Test
@DisplayName("Should return the total job count for the connection")
void testGetJobCount() throws IOException {
int numJobsToCreate = 10;
final int numJobsToCreate = 10;
for (int i = 0; i < numJobsToCreate; i++) {
jobPersistence.enqueueJob(CONNECTION_ID.toString(), SPEC_JOB_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.airbyte.config.FailureReason;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
Expand Down Expand Up @@ -193,7 +194,8 @@ public void jobSuccess(final JobSuccessInput input) {
if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats, normalizationSummary);
} else {
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId);
}
Expand Down Expand Up @@ -252,7 +254,8 @@ public void attemptFailure(final AttemptFailureInput input) {
if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
final NormalizationSummary normalizationSummary = jobOutput.getSync().getNormalizationSummary();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats, normalizationSummary);
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ class Update {
void setJobSuccess() throws IOException {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));

Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(),
jobOutput.getSync().getNormalizationSummary());
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED));
Expand Down Expand Up @@ -339,7 +340,8 @@ void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput, failureSummary));

Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats(),
jobOutput.getSync().getNormalizationSummary());
Mockito.verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
}

Expand Down

0 comments on commit 74fbe0a

Please sign in to comment.