Skip to content

Commit

Permalink
Fix migration query
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Oct 25, 2021
1 parent 9e0dc1e commit aa20d11
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,29 +126,28 @@ static Optional<Database> getJobsDatabase(final Configs configs) {
@VisibleForTesting
static Set<StandardSyncState> getStandardSyncStates(final Database jobsDatabase) throws SQLException {
final Table<?> jobsTable = DSL.table("jobs");
final Field<Long> jobIdField = DSL.field("jobs.id", SQLDataType.BIGINT);
final Field<String> syncIdField = DSL.field("jobs.scope", SQLDataType.VARCHAR);
final Field<Long> jobId = DSL.field("jobs.id", SQLDataType.BIGINT);
final Field<String> connectionId = DSL.field("jobs.scope", SQLDataType.VARCHAR);

final Table<?> attemptsTable = DSL.table("attempts");
final Field<Long> attemptJobIdField = DSL.field("attempts.job_id", SQLDataType.BIGINT);
final Field<Integer> attemptNumberField = DSL.field("attempts.attempt_number", SQLDataType.INTEGER);
final Field<Long> attemptJobId = DSL.field("attempts.job_id", SQLDataType.BIGINT);
final Field<Integer> attemptNumber = DSL.field("attempts.attempt_number", SQLDataType.INTEGER);
final Field<OffsetDateTime> attemptCreatedAt = DSL.field("attempts.created_at", SQLDataType.TIMESTAMPWITHTIMEZONE);

// output schema: JobOutput.yaml
// sync schema: StandardSyncOutput.yaml
// state schema: State.yaml, e.g. { "state": { "cursor": 1000 } }
final Field<JSONB> attemptStateField = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB);
final Field<JSONB> attemptState = DSL.field("attempts.output -> 'sync' -> 'state'", SQLDataType.JSONB);

return jobsDatabase.query(ctx -> ctx
.select(syncIdField, attemptStateField)
.select(connectionId, attemptState)
.distinctOn(connectionId)
.from(attemptsTable)
.innerJoin(jobsTable)
.on(jobIdField.eq(attemptJobIdField))
.where(DSL.row(attemptJobIdField, attemptNumberField).in(
// for each job id, find the last attempt with a state
DSL.select(attemptJobIdField, DSL.max(attemptNumberField))
.from(attemptsTable)
.where(attemptStateField.isNotNull())
.groupBy(attemptJobIdField)))
.on(jobId.eq(attemptJobId))
.where(attemptState.isNotNull())
// this query assumes that an attempt with larger created_at field is always a newer attempt
.orderBy(connectionId, attemptCreatedAt.desc())
.fetch()
.stream()
.map(r -> getStandardSyncState(UUID.fromString(r.value1()), Jsons.deserialize(r.value2().data(), State.class))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.table;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -56,23 +57,27 @@
class V0_30_22_001__Store_last_sync_state_test extends AbstractConfigsDatabaseTest {

private static final ObjectMapper OBJECT_MAPPER = MoreMappers.initMapper();
private static final OffsetDateTime TIMESTAMP = OffsetDateTime.now();

private static final Table<?> JOBS_TABLE = table("jobs");
private static final Field<Long> JOB_ID_FIELD = field("id", SQLDataType.BIGINT);
private static final Field<String> JOB_SCOPE_FIELD = field("scope", SQLDataType.VARCHAR);
private static final Field<OffsetDateTime> JOB_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE);

private static final Table<?> ATTEMPTS_TABLE = table("attempts");
private static final Field<Long> ATTEMPT_ID_FIELD = field("id", SQLDataType.BIGINT);
private static final Field<Long> ATTEMPT_JOB_ID_FIELD = field("job_id", SQLDataType.BIGINT);
private static final Field<Integer> ATTEMPT_NUMBER_FIELD = field("attempt_number", SQLDataType.INTEGER);
private static final Field<JSONB> ATTEMPT_OUTPUT_FIELD = field("output", SQLDataType.JSONB);
private static final Field<OffsetDateTime> ATTEMPT_CREATED_AT_FIELD = field("created_at", SQLDataType.TIMESTAMPWITHTIMEZONE);

private static final UUID CONNECTION_1_ID = UUID.randomUUID();
private static final UUID CONNECTION_2_ID = UUID.randomUUID();
private static final UUID CONNECTION_3_ID = UUID.randomUUID();

private static final State CONNECTION_2_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 2222 } }", State.class);
private static final State CONNECTION_3_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": 3333 } }", State.class);
private static final State CONNECTION_OLD_STATE = Jsons.deserialize("{ \"state\": { \"cursor\": -1 } }", State.class);

private static final StandardSyncState STD_CONNECTION_STATE_2 = getStandardSyncState(CONNECTION_2_ID, CONNECTION_2_STATE);
private static final StandardSyncState STD_CONNECTION_STATE_3 = getStandardSyncState(CONNECTION_3_ID, CONNECTION_3_STATE);
Expand Down Expand Up @@ -106,24 +111,34 @@ public void testGetJobsDatabase() {

@Test
@Order(20)
public void testGetSyncToStateMap() throws Exception {
public void testGetStandardSyncStates() throws Exception {
jobDatabase.query(ctx -> {
// Create three jobs for three standard syncs.
// The first job has no attempt.
createJob(ctx, CONNECTION_1_ID);

// The second job has one attempt.
final long job2 = createJob(ctx, CONNECTION_2_ID);
createAttempt(ctx, job2, 1, createAttemptOutput(CONNECTION_2_STATE));

// The third job has multiple attempts. The third attempt has the latest state.
final long job3 = createJob(ctx, CONNECTION_3_ID);
final State attempt31State = new State().withState(Jsons.deserialize("{\"cursor\": 31 }"));
createAttempt(ctx, job3, 1, createAttemptOutput(attempt31State));
createAttempt(ctx, job3, 2, null);
createAttempt(ctx, job3, 3, createAttemptOutput(CONNECTION_3_STATE));
createAttempt(ctx, job3, 4, null);
createAttempt(ctx, job3, 5, null);
// Connection 1 has 1 job, no attempt.
// This is to test that connection without no state is not returned.
createJob(ctx, CONNECTION_1_ID, 30);

// Connection 2 has two jobs, each has one attempt.
// This is to test that only the state from the latest job is returned.
final long job21 = createJob(ctx, CONNECTION_2_ID, 10);
final long job22 = createJob(ctx, CONNECTION_2_ID, 20);
assertNotEquals(job21, job22);
createAttempt(ctx, job21, 1, createAttemptOutput(CONNECTION_OLD_STATE), 11);
createAttempt(ctx, job22, 1, createAttemptOutput(CONNECTION_2_STATE), 21);

// Connection 3 has two jobs.
// The first job has multiple attempts. Its third attempt has the latest state.
// The second job has two attempts with no state.
// This is to test that only the state from the latest attempt is returned.
final long job31 = createJob(ctx, CONNECTION_3_ID, 5);
final long job32 = createJob(ctx, CONNECTION_3_ID, 15);
assertNotEquals(job31, job32);
createAttempt(ctx, job31, 1, createAttemptOutput(CONNECTION_OLD_STATE), 6);
createAttempt(ctx, job31, 2, null, 7);
createAttempt(ctx, job31, 3, createAttemptOutput(CONNECTION_3_STATE), 8);
createAttempt(ctx, job31, 4, null, 9);
createAttempt(ctx, job31, 5, null, 10);
createAttempt(ctx, job32, 1, null, 20);
createAttempt(ctx, job32, 2, null, 25);

assertEquals(STD_CONNECTION_STATES, V0_30_22_001__Store_last_sync_state.getStandardSyncStates(jobDatabase));

Expand Down Expand Up @@ -197,24 +212,39 @@ public Connection getConnection() {
}

/**
* Create a job record whose scope equals to the passed in standard sync id, and return the job id.
* Create a job record whose scope equals to the passed in connection id, and return the job id.
*
* @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset.
*/
private static long createJob(final DSLContext ctx, final UUID standardSyncId) {
final int insertCount = ctx.insertInto(JOBS_TABLE, JOB_SCOPE_FIELD)
.values(standardSyncId.toString())
private static long createJob(final DSLContext ctx, final UUID connectionId, final long creationOffset) {
final int insertCount = ctx.insertInto(JOBS_TABLE)
.set(JOB_SCOPE_FIELD, connectionId.toString())
.set(JOB_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset))
.execute();
assertEquals(1, insertCount);

return ctx.select(JOB_ID_FIELD)
.from(JOBS_TABLE)
.where(JOB_SCOPE_FIELD.eq(standardSyncId.toString()))
.where(JOB_SCOPE_FIELD.eq(connectionId.toString()))
.orderBy(JOB_CREATED_AT_FIELD.desc())
.limit(1)
.fetchOne()
.get(JOB_ID_FIELD);
}

private static void createAttempt(final DSLContext ctx, final long jobId, final int attemptNumber, final JobOutput attemptOutput) {
final int insertCount = ctx.insertInto(ATTEMPTS_TABLE, ATTEMPT_JOB_ID_FIELD, ATTEMPT_NUMBER_FIELD, ATTEMPT_OUTPUT_FIELD)
.values(jobId, attemptNumber, JSONB.valueOf(Jsons.serialize(attemptOutput)))
/**
* @param creationOffset Set the creation timestamp to {@code TIMESTAMP} + this passed in offset.
*/
private static void createAttempt(final DSLContext ctx,
final long jobId,
final int attemptNumber,
final JobOutput attemptOutput,
final long creationOffset) {
final int insertCount = ctx.insertInto(ATTEMPTS_TABLE)
.set(ATTEMPT_JOB_ID_FIELD, jobId)
.set(ATTEMPT_NUMBER_FIELD, attemptNumber)
.set(ATTEMPT_OUTPUT_FIELD, JSONB.valueOf(Jsons.serialize(attemptOutput)))
.set(ATTEMPT_CREATED_AT_FIELD, TIMESTAMP.plusDays(creationOffset))
.execute();
assertEquals(1, insertCount);

Expand Down

0 comments on commit aa20d11

Please sign in to comment.