From aa20d11f138e14b7ac49914c3b8388c74845e5fd Mon Sep 17 00:00:00 2001 From: Liren Tu Date: Mon, 25 Oct 2021 10:23:01 -0700 Subject: [PATCH] Fix migration query --- .../V0_30_22_001__Store_last_sync_state.java | 25 +++--- ...30_22_001__Store_last_sync_state_test.java | 80 +++++++++++++------ 2 files changed, 67 insertions(+), 38 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java index b4ea92ab8c26..79567bb34c5c 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state.java @@ -126,29 +126,28 @@ static Optional getJobsDatabase(final Configs configs) { @VisibleForTesting static Set getStandardSyncStates(final Database jobsDatabase) throws SQLException { final Table jobsTable = DSL.table("jobs"); - final Field jobIdField = DSL.field("jobs.id", SQLDataType.BIGINT); - final Field syncIdField = DSL.field("jobs.scope", SQLDataType.VARCHAR); + final Field jobId = DSL.field("jobs.id", SQLDataType.BIGINT); + final Field connectionId = DSL.field("jobs.scope", SQLDataType.VARCHAR); final Table attemptsTable = DSL.table("attempts"); - final Field attemptJobIdField = DSL.field("attempts.job_id", SQLDataType.BIGINT); - final Field attemptNumberField = DSL.field("attempts.attempt_number", SQLDataType.INTEGER); + final Field attemptJobId = DSL.field("attempts.job_id", SQLDataType.BIGINT); + final Field attemptNumber = DSL.field("attempts.attempt_number", SQLDataType.INTEGER); + final Field attemptCreatedAt = DSL.field("attempts.created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); // output schema: JobOutput.yaml // sync schema: StandardSyncOutput.yaml // state schema: State.yaml, e.g. { "state": { "cursor": 1000 } } - final Field attemptStateField = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB); + final Field attemptState = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB); return jobsDatabase.query(ctx -> ctx - .select(syncIdField, attemptStateField) + .select(connectionId, attemptState) + .distinctOn(connectionId) .from(attemptsTable) .innerJoin(jobsTable) - .on(jobIdField.eq(attemptJobIdField)) - .where(DSL.row(attemptJobIdField, attemptNumberField).in( - // for each job id, find the last attempt with a state - DSL.select(attemptJobIdField, DSL.max(attemptNumberField)) - .from(attemptsTable) - .where(attemptStateField.isNotNull()) - .groupBy(attemptJobIdField))) + .on(jobId.eq(attemptJobId)) + .where(attemptState.isNotNull()) + // this query assumes that an attempt with larger created_at field is always a newer attempt + .orderBy(connectionId, attemptCreatedAt.desc()) .fetch() .stream() .map(r -> getStandardSyncState(UUID.fromString(r.value1()), Jsons.deserialize(r.value2().data(), State.class)))) diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java index c9758cb38834..e7fbd7e371a7 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/instance/configs/migrations/V0_30_22_001__Store_last_sync_state_test.java @@ -14,6 +14,7 @@ import static org.jooq.impl.DSL.field; import static org.jooq.impl.DSL.table; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,16 +57,19 @@ class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTest { private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper(); + private static final OffsetDateTime TIMESTAMP = OffsetDateTime.now(); private static final Table JOBS_TABLE = table("jobs"); private static final Field JOB_ID_FIELD = field("id", SQLDataType.BIGINT); private static final Field JOB_SCOPE_FIELD = field("scope", SQLDataType.VARCHAR); + private static final Field JOB_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); private static final Table ATTEMPTS_TABLE = table("attempts"); private static final Field ATTEMPT_ID_FIELD = field("id", SQLDataType.BIGINT); private static final Field ATTEMPT_JOB_ID_FIELD = field("job_id", SQLDataType.BIGINT); private static final Field ATTEMPT_NUMBER_FIELD = field("attempt_number", SQLDataType.INTEGER); private static final Field ATTEMPT_OUTPUT_FIELD = field("output", SQLDataType.JSONB); + private static final Field ATTEMPT_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE); private static final UUID CONNECTION_1_ID = UUID.randomUUID(); private static final UUID CONNECTION_2_ID = UUID.randomUUID(); @@ -73,6 +77,7 @@ class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTe private static final State CONNECTION_2_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 2222 } }", State.class); private static final State CONNECTION_3_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 3333 } }", State.class); + private static final State CONNECTION_OLD_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": -1 } }", State.class); private static final StandardSyncState STD_CONNECTION_STATE_2 = getStandardSyncState(CONNECTION_2_ID, CONNECTION_2_STATE); private static final StandardSyncState STD_CONNECTION_STATE_3 = getStandardSyncState(CONNECTION_3_ID, CONNECTION_3_STATE); @@ -106,24 +111,34 @@ public void testGetJobsDatabase() { @Test @Order(20) - public void testGetSyncToStateMap() throws Exception { + public void testGetStandardSyncStates() throws Exception { jobDatabase.query(ctx -> { - // Create three jobs for three standard syncs. - // The first job has no attempt. - createJob(ctx, CONNECTION_1_ID); - - // The second job has one attempt. - final long job2 = createJob(ctx, CONNECTION_2_ID); - createAttempt(ctx, job2, 1, createAttemptOutput(CONNECTION_2_STATE)); - - // The third job has multiple attempts. The third attempt has the latest state. - final long job3 = createJob(ctx, CONNECTION_3_ID); - final State attempt31State = new State().withState(Jsons.deserialize("{\"cursor\": 31 }")); - createAttempt(ctx, job3, 1, createAttemptOutput(attempt31State)); - createAttempt(ctx, job3, 2, null); - createAttempt(ctx, job3, 3, createAttemptOutput(CONNECTION_3_STATE)); - createAttempt(ctx, job3, 4, null); - createAttempt(ctx, job3, 5, null); + // Connection 1 has 1 job, no attempt. + // This is to test that connection without no state is not returned. + createJob(ctx, CONNECTION_1_ID, 30); + + // Connection 2 has two jobs, each has one attempt. + // This is to test that only the state from the latest job is returned. + final long job21 = createJob(ctx, CONNECTION_2_ID, 10); + final long job22 = createJob(ctx, CONNECTION_2_ID, 20); + assertNotEquals(job21, job22); + createAttempt(ctx, job21, 1, createAttemptOutput(CONNECTION_OLD_STATE), 11); + createAttempt(ctx, job22, 1, createAttemptOutput(CONNECTION_2_STATE), 21); + + // Connection 3 has two jobs. + // The first job has multiple attempts. Its third attempt has the latest state. + // The second job has two attempts with no state. + // This is to test that only the state from the latest attempt is returned. + final long job31 = createJob(ctx, CONNECTION_3_ID, 5); + final long job32 = createJob(ctx, CONNECTION_3_ID, 15); + assertNotEquals(job31, job32); + createAttempt(ctx, job31, 1, createAttemptOutput(CONNECTION_OLD_STATE), 6); + createAttempt(ctx, job31, 2, null, 7); + createAttempt(ctx, job31, 3, createAttemptOutput(CONNECTION_3_STATE), 8); + createAttempt(ctx, job31, 4, null, 9); + createAttempt(ctx, job31, 5, null, 10); + createAttempt(ctx, job32, 1, null, 20); + createAttempt(ctx, job32, 2, null, 25); assertEquals(STD_CONNECTION_STATES, V0_30_22_001__Store_last_sync_state.getStandardSyncStates(jobDatabase)); @@ -197,24 +212,39 @@ public Connection getConnection() { } /** - * Create a job record whose scope equals to the passed in standard sync id, and return the job id. + * Create a job record whose scope equals to the passed in connection id, and return the job id. + * + * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. */ - private static long createJob(final DSLContext ctx, final UUID standardSyncId) { - final int insertCount = ctx.insertInto(JOBS_TABLE, JOB_SCOPE_FIELD) - .values(standardSyncId.toString()) + private static long createJob(final DSLContext ctx, final UUID connectionId, final long creationOffset) { + final int insertCount = ctx.insertInto(JOBS_TABLE) + .set(JOB_SCOPE_FIELD, connectionId.toString()) + .set(JOB_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) .execute(); assertEquals(1, insertCount); return ctx.select(JOB_ID_FIELD) .from(JOBS_TABLE) - .where(JOB_SCOPE_FIELD.eq(standardSyncId.toString())) + .where(JOB_SCOPE_FIELD.eq(connectionId.toString())) + .orderBy(JOB_CREATED_AT_FIELD.desc()) + .limit(1) .fetchOne() .get(JOB_ID_FIELD); } - private static void createAttempt(final DSLContext ctx, final long jobId, final int attemptNumber, final JobOutput attemptOutput) { - final int insertCount = ctx.insertInto(ATTEMPTS_TABLE, ATTEMPT_JOB_ID_FIELD, ATTEMPT_NUMBER_FIELD, ATTEMPT_OUTPUT_FIELD) - .values(jobId, attemptNumber, JSONB.valueOf(Jsons.serialize(attemptOutput))) + /** + * @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset. + */ + private static void createAttempt(final DSLContext ctx, + final long jobId, + final int attemptNumber, + final JobOutput attemptOutput, + final long creationOffset) { + final int insertCount = ctx.insertInto(ATTEMPTS_TABLE) + .set(ATTEMPT_JOB_ID_FIELD, jobId) + .set(ATTEMPT_NUMBER_FIELD, attemptNumber) + .set(ATTEMPT_OUTPUT_FIELD, JSONB.valueOf(Jsons.serialize(attemptOutput))) + .set(ATTEMPT_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset)) .execute(); assertEquals(1, insertCount);