Skip to content

Commit

Permalink
Populate and read from SyncStats table (#16476)
Browse files Browse the repository at this point in the history
- Populate sync stats table when job is complete
- Method to read from sync stats table
  • Loading branch information
alovew authored Sep 9, 2022
1 parent 40c8447 commit 3fc6730
Show file tree
Hide file tree
Showing 8 changed files with 134 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.3.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.4.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_4_001__ChangeSyncStatsForeignKey extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_4_001__ChangeSyncStatsForeignKey.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
final DSLContext ctx = DSL.using(context.getConnection());
changeForeignKeyType(ctx);
}

private void changeForeignKeyType(final DSLContext ctx) throws Exception {
ctx.alterTable("sync_stats").alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ create table "public"."jobs"(
);
create table "public"."sync_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"attempt_id" int8 not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
"source_state_messages_emitted" int8 null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
Expand All @@ -23,6 +24,7 @@
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.SyncStats;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
Expand Down Expand Up @@ -61,6 +63,7 @@
import org.jooq.JSONB;
import org.jooq.Named;
import org.jooq.Record;
import org.jooq.RecordMapper;
import org.jooq.Result;
import org.jooq.Sequence;
import org.jooq.Table;
Expand Down Expand Up @@ -305,14 +308,37 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int
}

@Override
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output) throws IOException {
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output, final SyncStats syncStats) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
jobDatabase.transaction(
ctx -> ctx.update(ATTEMPTS)
.set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute());
jobDatabase.transaction(ctx -> {
ctx.update(ATTEMPTS)
.set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute();
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
final Long attemptId = record.get().get("id", Long.class);

ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.execute();
return null;
});

}

@Override
Expand All @@ -327,6 +353,26 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
.execute());
}

@Override
public List<SyncStats> getSyncStats(final Long attemptId) throws IOException {
return jobDatabase
.query(ctx -> ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList());
}

private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED))
.withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED))
.withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED))
.withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED))
.withMeanSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED))
.withMaxSecondsBeforeSourceStateMessageEmitted(record.get(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED))
.withMeanSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED))
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

@Override
public Job getJob(final long jobId) throws IOException {
return jobDatabase.query(ctx -> getJob(ctx, jobId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.SyncStats;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.AttemptWithJobInfo;
import io.airbyte.scheduler.models.Job;
Expand All @@ -29,6 +30,8 @@
*/
public interface JobPersistence {

List<SyncStats> getSyncStats(Long attemptId) throws IOException;

Job getJob(long jobId) throws IOException;

//
Expand Down Expand Up @@ -125,7 +128,7 @@ public interface JobPersistence {
* StandardSyncOutput#state in the configs database by calling
* ConfigRepository#updateConnectionState, which takes care of persisting the connection state.
*/
<T> void writeOutput(long jobId, int attemptNumber, T output) throws IOException;
<T> void writeOutput(long jobId, int attemptNumber, T output, SyncStats syncStats) throws IOException;

/**
* Writes a summary of all failures that occurred during the attempt.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.AIRBYTE_METADATA;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand All @@ -30,6 +31,9 @@
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.SyncStats;
import io.airbyte.db.Database;
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DataSourceFactory;
Expand Down Expand Up @@ -201,6 +205,7 @@ private void resetDb() throws SQLException {
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).cascade().execute());
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).cascade().execute());
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).cascade().execute());
jobDatabase.query(ctx -> ctx.truncateTable(SYNC_STATS));
}

private Result<Record> getJobRecord(final long jobId) throws SQLException {
Expand Down Expand Up @@ -245,18 +250,44 @@ void testCompleteAttemptSuccess() throws IOException {

@Test
@DisplayName("Should be able to read what is written")
void testWriteOutput() throws IOException {
void testWriteOutput() throws IOException, SQLException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job created = jobPersistence.getJob(jobId);
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG);
final SyncStats syncStats =
new SyncStats().withBytesEmitted(100L).withRecordsEmitted(9L).withRecordsCommitted(10L).withDestinationStateMessagesEmitted(1L)
.withSourceStateMessagesEmitted(4L).withMaxSecondsBeforeSourceStateMessageEmitted(5L).withMeanSecondsBeforeSourceStateMessageEmitted(2L)
.withMaxSecondsBetweenStateMessageEmittedandCommitted(10L).withMeanSecondsBetweenStateMessageEmittedandCommitted(3L);
final StandardSyncOutput standardSyncOutput =
new StandardSyncOutput().withStandardSyncSummary(new StandardSyncSummary().withTotalStats(syncStats));
final JobOutput jobOutput = new JobOutput().withOutputType(JobOutput.OutputType.DISCOVER_CATALOG).withSync(standardSyncOutput);

when(timeSupplier.get()).thenReturn(Instant.ofEpochMilli(4242));
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput);
jobPersistence.writeOutput(jobId, attemptNumber, jobOutput,
jobOutput.getSync().getStandardSyncSummary().getTotalStats());

final Job updated = jobPersistence.getJob(jobId);

assertEquals(Optional.of(jobOutput), updated.getAttempts().get(0).getOutput());
assertNotEquals(created.getAttempts().get(0).getUpdatedAtInSecond(), updated.getAttempts().get(0).getUpdatedAtInSecond());

final Optional<Record> record =
jobDatabase.query(ctx -> ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst());

final Long attemptId = record.get().get("id", Long.class);

final SyncStats storedSyncStats = jobPersistence.getSyncStats(attemptId).stream().findFirst().get();
assertEquals(100L, storedSyncStats.getBytesEmitted());
assertEquals(9L, storedSyncStats.getRecordsEmitted());
assertEquals(10L, storedSyncStats.getRecordsCommitted());
assertEquals(4L, storedSyncStats.getSourceStateMessagesEmitted());
assertEquals(1L, storedSyncStats.getDestinationStateMessagesEmitted());
assertEquals(5L, storedSyncStats.getMaxSecondsBeforeSourceStateMessageEmitted());
assertEquals(2L, storedSyncStats.getMeanSecondsBeforeSourceStateMessageEmitted());
assertEquals(10L, storedSyncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted());
assertEquals(3L, storedSyncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted());

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.SyncStats;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -172,7 +173,8 @@ public void jobSuccess(final JobSuccessInput input) {

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
} else {
log.warn("The job {} doesn't have any output for the attempt {}", jobId, attemptId);
}
Expand Down Expand Up @@ -230,7 +232,8 @@ public void attemptFailure(final AttemptFailureInput input) {

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
jobPersistence.writeOutput(jobId, attemptId, jobOutput);
final SyncStats syncStats = jobOutput.getSync().getStandardSyncSummary().getTotalStats();
jobPersistence.writeOutput(jobId, attemptId, jobOutput, syncStats);
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ class Update {
void setJobSuccess() throws IOException {
jobCreationAndStatusUpdateActivity.jobSuccess(new JobSuccessInput(JOB_ID, ATTEMPT_ID, standardSyncOutput));

Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).succeedAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobNotifier).successJob(Mockito.any());
Mockito.verify(mJobtracker).trackSync(Mockito.any(), eq(JobState.SUCCEEDED));
Expand Down Expand Up @@ -339,7 +339,7 @@ void setAttemptFailure() throws IOException {
jobCreationAndStatusUpdateActivity.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, standardSyncOutput, failureSummary));

Mockito.verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
Mockito.verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput, jobOutput.getSync().getStandardSyncSummary().getTotalStats());
Mockito.verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
}

Expand Down

0 comments on commit 3fc6730

Please sign in to comment.