Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress Bar Read APIs #20937

Merged
merged 15 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ private static Job getJobFromRecord(final Record record) {

private static Attempt getAttemptFromRecord(final Record record) {
return new Attempt(
record.get(ATTEMPT_NUMBER, Long.class),
record.get(ATTEMPT_NUMBER, int.class),
record.get(JOB_ID, Long.class),
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class Attempt {

private final long id;
private final int attemptNumber;
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
Expand All @@ -23,7 +23,7 @@ public class Attempt {
private final long createdAtInSecond;
private final Long endedAtInSecond;

public Attempt(final long id,
public Attempt(final int attemptNumber,
final long jobId,
final Path logPath,
final @Nullable JobOutput output,
Expand All @@ -32,7 +32,7 @@ public Attempt(final long id,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.attemptNumber = attemptNumber;
this.jobId = jobId;
this.output = output;
this.status = status;
Expand All @@ -43,8 +43,8 @@ public Attempt(final long id,
this.endedAtInSecond = endedAtInSecond;
}

public long getId() {
return id;
public int getAttemptNumber() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed this variable name to better reflect what it actually is

return attemptNumber;
}

public long getJobId() {
Expand Down Expand Up @@ -92,7 +92,7 @@ public boolean equals(final Object o) {
return false;
}
final Attempt attempt = (Attempt) o;
return id == attempt.id &&
return attemptNumber == attempt.attemptNumber &&
jobId == attempt.jobId &&
updatedAtInSecond == attempt.updatedAtInSecond &&
createdAtInSecond == attempt.createdAtInSecond &&
Expand All @@ -105,13 +105,13 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
public String toString() {
return "Attempt{" +
"id=" + id +
"id=" + attemptNumber +
", jobId=" + jobId +
", output=" + output +
", status=" + status +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -142,7 +143,7 @@ static void dbDown() {
container.close();
}

private static Attempt createAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) {
private static Attempt createAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) {
return new Attempt(
id,
jobId,
Expand All @@ -155,7 +156,7 @@ private static Attempt createAttempt(final long id, final long jobId, final Atte
NOW.getEpochSecond());
}

private static Attempt createUnfinishedAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) {
private static Attempt createUnfinishedAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) {
return new Attempt(
id,
jobId,
Expand Down Expand Up @@ -238,7 +239,7 @@ void testCompleteAttemptFailed() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.INCOMPLETE,
Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)),
Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand All @@ -256,7 +257,7 @@ void testCompleteAttemptSuccess() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)),
Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand Down Expand Up @@ -326,8 +327,8 @@ void testWriteAttemptFailureSummary() throws IOException {
}

@Nested
@DisplayName("Test writing in progress stats")
class WriteStats {
@DisplayName("Stats Related Tests")
class Stats {

@Test
@DisplayName("Writing stats the first time should only write record and bytes information correctly")
Expand Down Expand Up @@ -455,6 +456,18 @@ void testWriteNullNamespace() throws IOException {
assertEquals(streamStats, actStreamStats);
}

@Test
@DisplayName("Writing multiple stats a stream with null namespace should write correctly without exceptions")
void testGetStatsNoResult() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH);

final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber);
assertNull(stats.combinedStats());
assertEquals(0, stats.perStreamStats().size());

}

}

@Test
Expand All @@ -471,8 +484,8 @@ void testGetLastSyncJobWithMultipleAttempts() throws IOException {
SYNC_JOB_CONFIG,
JobStatus.INCOMPLETE,
Lists.newArrayList(
createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)),
createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)),
NOW.getEpochSecond());

assertEquals(Optional.of(expected), actual);
Expand Down Expand Up @@ -514,15 +527,14 @@ void testExportImport() throws IOException, SQLException {
jobPersistence.importDatabase("test", outputStreams);

final List<Job> actualList = jobPersistence.listJobs(SPEC_JOB_CONFIG.getConfigType(), CONNECTION_ID.toString(), 9999, 0);

final Job actual = actualList.get(0);
final Job expected = createJob(
jobId,
SPEC_JOB_CONFIG,
JobStatus.SUCCEEDED,
Lists.newArrayList(
createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)),
createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)),
NOW.getEpochSecond());

assertEquals(1, actualList.size());
Expand Down Expand Up @@ -557,8 +569,8 @@ void testListJobsWithTimestamp() throws IOException {
assertEquals(jobs.size(), 1);
assertEquals(jobs.get(0).getId(), syncJobId);
assertEquals(jobs.get(0).getAttempts().size(), 2);
assertEquals(jobs.get(0).getAttempts().get(0).getId(), 0);
assertEquals(jobs.get(0).getAttempts().get(1).getId(), 1);
assertEquals(jobs.get(0).getAttempts().get(0).getAttemptNumber(), 0);
assertEquals(jobs.get(0).getAttempts().get(1).getAttemptNumber(), 1);

final Path syncJobThirdAttemptLogPath = LOG_PATH.resolve("3");
final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId, syncJobThirdAttemptLogPath);
Expand All @@ -578,12 +590,12 @@ void testListJobsWithTimestamp() throws IOException {
assertEquals(secondQueryJobs.size(), 2);
assertEquals(secondQueryJobs.get(0).getId(), syncJobId);
assertEquals(secondQueryJobs.get(0).getAttempts().size(), 1);
assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getId(), 2);
assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getAttemptNumber(), 2);

assertEquals(secondQueryJobs.get(1).getId(), newSyncJobId);
assertEquals(secondQueryJobs.get(1).getAttempts().size(), 2);
assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getId(), 0);
assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getId(), 1);
assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getAttemptNumber(), 0);
assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getAttemptNumber(), 1);

Long maxEndedAtTimestampAfterSecondQuery = -1L;
for (final Job c : secondQueryJobs) {
Expand Down Expand Up @@ -628,35 +640,35 @@ void testListAttemptsWithJobInfo() throws IOException {
assertEquals(6, allAttempts.size());

assertEquals(job1, allAttempts.get(0).getJobInfo().getId());
assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getId());
assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getAttemptNumber());

assertEquals(job2, allAttempts.get(1).getJobInfo().getId());
assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getId());
assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getAttemptNumber());

assertEquals(job2, allAttempts.get(2).getJobInfo().getId());
assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getId());
assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getAttemptNumber());

assertEquals(job1, allAttempts.get(3).getJobInfo().getId());
assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getId());
assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getAttemptNumber());

assertEquals(job1, allAttempts.get(4).getJobInfo().getId());
assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getId());
assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getAttemptNumber());

assertEquals(job2, allAttempts.get(5).getJobInfo().getId());
assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getId());
assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getAttemptNumber());

final List<AttemptWithJobInfo> attemptsAfterTimestamp = jobPersistence.listAttemptsWithJobInfo(ConfigType.SYNC,
Instant.ofEpochSecond(allAttempts.get(2).getAttempt().getEndedAtInSecond().orElseThrow()));
assertEquals(3, attemptsAfterTimestamp.size());

assertEquals(job1, attemptsAfterTimestamp.get(0).getJobInfo().getId());
assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getId());
assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getAttemptNumber());

assertEquals(job1, attemptsAfterTimestamp.get(1).getJobInfo().getId());
assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getId());
assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getAttemptNumber());

assertEquals(job2, attemptsAfterTimestamp.get(2).getJobInfo().getId());
assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getId());
assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getAttemptNumber());
}

private static Supplier<Instant> incrementingSecondSupplier(final Instant startTime) {
Expand Down Expand Up @@ -887,7 +899,7 @@ void testCreateAttempt() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.RUNNING,
Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand All @@ -901,12 +913,12 @@ void testCreateAttemptAttemptId() throws IOException {

final Job jobAfterOneAttempts = jobPersistence.getJob(jobId);
assertEquals(0, attemptNumber1);
assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getId());
assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getAttemptNumber());

final int attemptNumber2 = jobPersistence.createAttempt(jobId, LOG_PATH);
final Job jobAfterTwoAttempts = jobPersistence.getJob(jobId);
assertEquals(1, attemptNumber2);
assertEquals(Sets.newHashSet(0L, 1L), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getId).collect(Collectors.toSet()));
assertEquals(Sets.newHashSet(0, 1), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getAttemptNumber).collect(Collectors.toSet()));
}

@Test
Expand All @@ -922,7 +934,7 @@ void testCreateAttemptWhileAttemptAlreadyRunning() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.RUNNING,
Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)),
Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand All @@ -941,7 +953,7 @@ void testCreateAttemptTerminal() throws IOException {
jobId,
SPEC_JOB_CONFIG,
JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)),
Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond());
assertEquals(expected, actual);
}
Expand Down Expand Up @@ -1336,8 +1348,8 @@ void testGetNextJobWithMultipleAttempts() throws IOException {
SPEC_JOB_CONFIG,
JobStatus.PENDING,
Lists.newArrayList(
createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)),
createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)),
NOW.getEpochSecond());

assertEquals(Optional.of(expected), actual);
Expand Down Expand Up @@ -1562,8 +1574,8 @@ void testListJobsWithMultipleAttempts() throws IOException {
SPEC_JOB_CONFIG,
JobStatus.SUCCEEDED,
Lists.newArrayList(
createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)),
createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH),
createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)),
NOW.getEpochSecond());

assertEquals(1, actualList.size());
Expand Down Expand Up @@ -1682,7 +1694,7 @@ void testListJobsWithStatus() throws IOException {
SPEC_JOB_CONFIG,
JobStatus.INCOMPLETE,
Lists.newArrayList(
createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)),
createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)),
NOW.getEpochSecond());

assertEquals(1, actualList.size());
Expand Down Expand Up @@ -1720,7 +1732,7 @@ void testListJobsWithStatusAndConfigType() throws IOException, InterruptedExcept
SPEC_JOB_CONFIG,
JobStatus.INCOMPLETE,
Lists.newArrayList(
createAttempt(0L, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)),
createAttempt(0, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)),
NOW.getEpochSecond(),
SPEC_SCOPE);

Expand Down Expand Up @@ -1757,12 +1769,12 @@ void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, I
Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED));

final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)),
Lists.newArrayList(createAttempt(0, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob2 =
createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)),
Lists.newArrayList(createAttempt(0, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob4 =
createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() {
}

private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) {
return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null);
return new Attempt(1, 1L, null, null, attemptStatus, null, 0L, 0L, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void testGetLastFailedAttempt() {

final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED);
assertTrue(job.getLastFailedAttempt().isPresent());
assertEquals(2, job.getLastFailedAttempt().get().getId());
assertEquals(2, job.getLastFailedAttempt().get().getAttemptNumber());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) {

public static AttemptRead getAttemptRead(final Attempt attempt) {
return new AttemptRead()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will also change the api in a later PR to a better name. Waiting to sync up with the FE engineers first.

.id(attempt.getId())
.id((long) attempt.getAttemptNumber())
.status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class))
.bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats
.map(JobOutput::getSync)
Expand Down
Loading