Skip to content

Commit

Permalink
Rebase on master
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Oct 20, 2021
1 parent 56f6c13 commit 7b65523
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
import org.slf4j.LoggerFactory;

/**
* Create a new table to store the latest job state for each standard sync. Issue:
* Create a new table to store the latest job state for each standard sync.
* <li>Column sync_id: the connectionId in StandardSync</><li>Column state: a json node representing
* a State object</li>
*/
public class V0_29_21_001__Store_last_sync_state extends BaseJavaMigration {

Expand All @@ -55,7 +57,7 @@ public V0_29_21_001__Store_last_sync_state() {
}

@VisibleForTesting
V0_29_21_001__Store_last_sync_state(Configs configs) {
V0_29_21_001__Store_last_sync_state(final Configs configs) {
this.configs = configs;
}

Expand Down Expand Up @@ -87,7 +89,7 @@ static void createTable(final DSLContext ctx) {

@VisibleForTesting
static void copyData(final DSLContext ctx, final Map<String, JsonNode> syncToStateMap, final OffsetDateTime timestamp) {
for (Map.Entry<String, JsonNode> entry : syncToStateMap.entrySet()) {
for (final Map.Entry<String, JsonNode> entry : syncToStateMap.entrySet()) {
ctx.insertInto(SYNC_STATE_TABLE)
.set(COLUMN_SYNC_ID, UUID.fromString(entry.getKey()))
.set(COLUMN_STATE, JSONB.valueOf(Jsons.serialize(entry.getValue())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class V0_29_21_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTe
private static final UUID SYNC_1_ID = UUID.randomUUID();
private static final UUID SYNC_2_ID = UUID.randomUUID();
private static final UUID SYNC_3_ID = UUID.randomUUID();
// these are State objects, see State.yaml for its schema;
// we cannot construct the POJO directly because State is defined in an downstream module
private static final JsonNode SYNC_2_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 2222 } }");
private static final JsonNode SYNC_3_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 3333 } }");

Expand Down Expand Up @@ -104,12 +106,12 @@ public void testGetSyncToStateMap() throws Exception {
createJob(ctx, SYNC_1_ID);

// The second job has one attempt.
long job2 = createJob(ctx, SYNC_2_ID);
final long job2 = createJob(ctx, SYNC_2_ID);
createAttempt(ctx, job2, 1, createAttemptOutput(SYNC_2_STATE));

// The third job has multiple attempts. The third attempt has the latest state.
long job3 = createJob(ctx, SYNC_3_ID);
JsonNode attempt31State = Jsons.deserialize("{ \"state\": { \"cursor\": 31 } }");
final long job3 = createJob(ctx, SYNC_3_ID);
final JsonNode attempt31State = Jsons.deserialize("{ \"state\": { \"cursor\": 31 } }");
createAttempt(ctx, job3, 1, createAttemptOutput(attempt31State));
createAttempt(ctx, job3, 2, null);
createAttempt(ctx, job3, 3, createAttemptOutput(SYNC_3_STATE));
Expand Down Expand Up @@ -205,7 +207,7 @@ public Configuration getConfiguration() {
public Connection getConnection() {
try {
return database.getDataSource().getConnection();
} catch (SQLException e) {
} catch (final SQLException e) {
throw new RuntimeException(e);
}
}
Expand All @@ -221,7 +223,7 @@ public Connection getConnection() {
/**
* Create a job record whose scope equals to the passed in standard sync id, and return the job id.
*/
private static long createJob(DSLContext ctx, UUID standardSyncId) {
private static long createJob(final DSLContext ctx, final UUID standardSyncId) {
final int insertCount = ctx.insertInto(JOBS_TABLE, JOB_SCOPE_FIELD)
.values(standardSyncId.toString())
.execute();
Expand All @@ -234,7 +236,7 @@ private static long createJob(DSLContext ctx, UUID standardSyncId) {
.get(JOB_ID_FIELD);
}

private static void createAttempt(DSLContext ctx, long jobId, int attemptNumber, JsonNode attemptOutput) {
private static void createAttempt(final DSLContext ctx, final long jobId, final int attemptNumber, final JsonNode attemptOutput) {
final int insertCount = ctx.insertInto(ATTEMPTS_TABLE, ATTEMPT_JOB_ID_FIELD, ATTEMPT_NUMBER_FIELD, ATTEMPT_OUTPUT_FIELD)
.values(jobId, attemptNumber, JSONB.valueOf(Jsons.serialize(attemptOutput)))
.execute();
Expand All @@ -252,7 +254,7 @@ private static void createAttempt(DSLContext ctx, long jobId, int attemptNumber,
*
* @param state The state object within a StandardSyncOutput.
*/
private static JsonNode createAttemptOutput(JsonNode state) {
private static JsonNode createAttemptOutput(final JsonNode state) {
final ObjectNode standardSyncOutput = OBJECT_MAPPER.createObjectNode()
.set("state", state);
return OBJECT_MAPPER.createObjectNode()
Expand Down Expand Up @@ -280,7 +282,7 @@ private static void checkTable(final DSLContext ctx, final boolean tableExists)

private static void checkSyncStates(final DSLContext ctx,
final Map<String, JsonNode> expectedSyncStates,
@Nullable OffsetDateTime expectedTimestamp) {
@Nullable final OffsetDateTime expectedTimestamp) {
for (final Map.Entry<String, JsonNode> entry : expectedSyncStates.entrySet()) {
final var record = ctx
.select(V0_29_21_001__Store_last_sync_state.COLUMN_STATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE;
import static io.airbyte.db.instance.jobs.jooq.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
Expand All @@ -21,7 +25,6 @@
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.configs.jooq.tables.SyncState;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
Expand All @@ -32,6 +35,7 @@
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
Expand Down Expand Up @@ -318,15 +322,66 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int

@Override
public <T> void writeOutput(final long jobId, final int attemptNumber, final T output) throws IOException {
final LocalDateTime now = LocalDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
writeOutputToAttemptTable(jobId, attemptNumber, output, now);
writeOutputToSyncStateTable(jobId, output, now);
}

jobDatabase.query(
ctx -> ctx.execute(
"UPDATE attempts SET output = CAST(? as JSONB), updated_at = ? WHERE job_id = ? AND attempt_number = ?",
Jsons.serialize(output),
now,
jobId,
attemptNumber));
private <T> void writeOutputToAttemptTable(final long jobId,
final int attemptNumber,
final T output,
final OffsetDateTime now)
throws IOException {
jobDatabase.transaction(
ctx -> ctx.update(ATTEMPTS)
.set(ATTEMPTS.OUTPUT, JSONB.valueOf(Jsons.serialize(output)))
.set(ATTEMPTS.UPDATED_AT, now)
.where(ATTEMPTS.JOB_ID.eq(jobId), ATTEMPTS.ATTEMPT_NUMBER.eq(attemptNumber))
.execute());
}

private <T> void writeOutputToSyncStateTable(final long jobId, final T output, final OffsetDateTime now) throws IOException {
if (!(output instanceof JobOutput)) {
return;
}
final JobOutput jobOutput = (JobOutput) output;
if (jobOutput.getSync() == null) {
return;
}

final Record1<String> jobConnectionId = jobDatabase.query(ctx -> ctx
.select(JOBS.SCOPE)
.from(JOBS)
.where(JOBS.ID.eq(jobId))
.fetchAny());
final State syncState = jobOutput.getSync().getState();

if (jobConnectionId == null) {
LOGGER.error("No job can be found for id {}", jobId);
return;
}

final UUID connectionId = UUID.fromString(jobConnectionId.value1());
configDatabase.transaction(
ctx -> {
final boolean hasExistingRecord = ctx.fetchExists(SYNC_STATE, SYNC_STATE.SYNC_ID.eq(connectionId));
if (hasExistingRecord) {
LOGGER.info("Updating connection {} state", connectionId);
return ctx.update(SYNC_STATE)
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(syncState)))
.set(SYNC_STATE.UPDATED_AT, now)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.execute();
} else {
LOGGER.info("Inserting new state for connection {}", connectionId);
return ctx.insertInto(SYNC_STATE)
.set(SYNC_STATE.SYNC_ID, UUID.fromString(jobConnectionId.value1()))
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(syncState)))
.set(SYNC_STATE.CREATED_AT, now)
.set(SYNC_STATE.UPDATED_AT, now)
.execute();
}
});
}

@Override
Expand Down Expand Up @@ -396,9 +451,9 @@ public Optional<Job> getLastReplicationJob(final UUID connectionId) throws IOExc
@Override
public Optional<State> getCurrentState(final UUID connectionId) throws IOException {
return configDatabase.query(ctx -> {
final Record1<JSONB> record = ctx.select(SyncState.SYNC_STATE.STATE)
.from(SyncState.SYNC_STATE)
.where(SyncState.SYNC_STATE.SYNC_ID.eq(connectionId))
final Record1<JSONB> record = ctx.select(SYNC_STATE.STATE)
.from(SYNC_STATE)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.fetchAny();
if (record == null) {
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

package io.airbyte.scheduler.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE;
import static io.airbyte.db.instance.jobs.jooq.Tables.AIRBYTE_METADATA;
import static io.airbyte.db.instance.jobs.jooq.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand Down Expand Up @@ -175,9 +179,12 @@ void tearDown() throws Exception {

private void resetDb() throws SQLException {
// todo (cgardens) - truncate whole db.
jobDatabase.query(ctx -> ctx.execute("TRUNCATE TABLE jobs"));
jobDatabase.query(ctx -> ctx.execute("TRUNCATE TABLE attempts"));
jobDatabase.query(ctx -> ctx.execute("TRUNCATE TABLE airbyte_metadata"));
jobDatabase.query(ctx -> ctx.truncateTable(JOBS).execute());
jobDatabase.query(ctx -> ctx.truncateTable(ATTEMPTS).execute());
jobDatabase.query(ctx -> ctx.truncateTable(AIRBYTE_METADATA).execute());
// the airbyte_configs table cannot be truncated because the config database
// is considered ready for consumption only when there are records in this table
configDatabase.query(ctx -> ctx.truncateTable(SYNC_STATE).execute());
}

private Result<Record> getJobRecord(final long jobId) throws SQLException {
Expand Down

0 comments on commit 7b65523

Please sign in to comment.