From fc8caa271f13473b6c94e1a18b2be7f2a6414272 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 29 Dec 2022 17:05:35 -0800 Subject: [PATCH 1/8] Rename job id to attemptNumber so it is clear this refers to the attempt number and not the id of the attempt row. Also change the type from long to int as this makes more sense. --- .../job/DefaultJobPersistence.java | 2 +- .../persistence/job/models/Attempt.java | 16 ++-- .../job/DefaultJobPersistenceTest.java | 88 +++++++++++-------- .../persistence/job/models/AttemptTest.java | 2 +- .../persistence/job/models/JobTest.java | 2 +- .../server/converters/JobConverter.java | 2 +- .../server/handlers/JobHistoryHandler.java | 41 ++++++++- .../server/converters/JobConverterTest.java | 6 +- .../handlers/JobHistoryHandlerTest.java | 6 +- ...obCreationAndStatusUpdateActivityImpl.java | 5 +- 10 files changed, 107 insertions(+), 63 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 1c5dd8253cdb..af872b3b0d34 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -849,7 +849,7 @@ private static Job getJobFromRecord(final Record record) { private static Attempt getAttemptFromRecord(final Record record) { return new Attempt( - record.get(ATTEMPT_NUMBER, Long.class), + record.get(ATTEMPT_NUMBER, int.class), record.get(JOB_ID, Long.class), Path.of(record.get("log_path", String.class)), record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index 110deaecab7b..a3dc08b076d2 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -13,7 +13,7 @@ public class Attempt { - private final long id; + private final int attemptNumber; private final long jobId; private final JobOutput output; private final AttemptStatus status; @@ -23,7 +23,7 @@ public class Attempt { private final long createdAtInSecond; private final Long endedAtInSecond; - public Attempt(final long id, + public Attempt(final int attemptNumber, final long jobId, final Path logPath, final @Nullable JobOutput output, @@ -32,7 +32,7 @@ public Attempt(final long id, final long createdAtInSecond, final long updatedAtInSecond, final @Nullable Long endedAtInSecond) { - this.id = id; + this.attemptNumber = attemptNumber; this.jobId = jobId; this.output = output; this.status = status; @@ -43,8 +43,8 @@ public Attempt(final long id, this.endedAtInSecond = endedAtInSecond; } - public long getId() { - return id; + public int getAttemptNumber() { + return attemptNumber; } public long getJobId() { @@ -92,7 +92,7 @@ public boolean equals(final Object o) { return false; } final Attempt attempt = (Attempt) o; - return id == attempt.id && + return attemptNumber == attempt.attemptNumber && jobId == attempt.jobId && updatedAtInSecond == attempt.updatedAtInSecond && createdAtInSecond == attempt.createdAtInSecond && @@ -105,13 +105,13 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); + return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); } @Override public String toString() { return "Attempt{" + - "id=" + id + + "id=" + attemptNumber + ", jobId=" + jobId + ", output=" + output + ", status=" + status + diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 0b48aaf036c4..b186c4241131 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -14,6 +14,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -142,7 +143,7 @@ static void dbDown() { container.close(); } - private static Attempt createAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -155,7 +156,7 @@ private static Attempt createAttempt(final long id, final long jobId, final Atte NOW.getEpochSecond()); } - private static Attempt createUnfinishedAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createUnfinishedAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -238,7 +239,7 @@ void testCompleteAttemptFailed() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -256,7 +257,7 @@ void testCompleteAttemptSuccess() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -326,8 +327,8 @@ void testWriteAttemptFailureSummary() throws IOException { } @Nested - @DisplayName("Test writing in progress stats") - class WriteStats { + @DisplayName("Stats Related Tests") + class Stats { @Test @DisplayName("Writing stats the first time should only write record and bytes information correctly") @@ -455,6 +456,18 @@ void testWriteNullNamespace() throws IOException { assertEquals(streamStats, actStreamStats); } + @Test + @DisplayName("Writing multiple stats a stream with null namespace should write correctly without exceptions") + void testGetStatsNoResult() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); + + final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); + assertNull(stats.combinedStats()); + assertEquals(0, stats.perStreamStats().size()); + + } + } @Test @@ -471,8 +484,8 @@ void testGetLastSyncJobWithMultipleAttempts() throws IOException { SYNC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -514,15 +527,14 @@ void testExportImport() throws IOException, SQLException { jobPersistence.importDatabase("test", outputStreams); final List actualList = jobPersistence.listJobs(SPEC_JOB_CONFIG.getConfigType(), CONNECTION_ID.toString(), 9999, 0); - final Job actual = actualList.get(0); final Job expected = createJob( jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -557,8 +569,8 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(jobs.size(), 1); assertEquals(jobs.get(0).getId(), syncJobId); assertEquals(jobs.get(0).getAttempts().size(), 2); - assertEquals(jobs.get(0).getAttempts().get(0).getId(), 0); - assertEquals(jobs.get(0).getAttempts().get(1).getId(), 1); + assertEquals(jobs.get(0).getAttempts().get(0).getAttemptNumber(), 0); + assertEquals(jobs.get(0).getAttempts().get(1).getAttemptNumber(), 1); final Path syncJobThirdAttemptLogPath = LOG_PATH.resolve("3"); final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId, syncJobThirdAttemptLogPath); @@ -578,12 +590,12 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(secondQueryJobs.size(), 2); assertEquals(secondQueryJobs.get(0).getId(), syncJobId); assertEquals(secondQueryJobs.get(0).getAttempts().size(), 1); - assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getId(), 2); + assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getAttemptNumber(), 2); assertEquals(secondQueryJobs.get(1).getId(), newSyncJobId); assertEquals(secondQueryJobs.get(1).getAttempts().size(), 2); - assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getId(), 0); - assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getId(), 1); + assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getAttemptNumber(), 0); + assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getAttemptNumber(), 1); Long maxEndedAtTimestampAfterSecondQuery = -1L; for (final Job c : secondQueryJobs) { @@ -628,35 +640,35 @@ void testListAttemptsWithJobInfo() throws IOException { assertEquals(6, allAttempts.size()); assertEquals(job1, allAttempts.get(0).getJobInfo().getId()); - assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getId()); + assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(1).getJobInfo().getId()); - assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getId()); + assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(2).getJobInfo().getId()); - assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getId()); + assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getAttemptNumber()); assertEquals(job1, allAttempts.get(3).getJobInfo().getId()); - assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getId()); + assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getAttemptNumber()); assertEquals(job1, allAttempts.get(4).getJobInfo().getId()); - assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getId()); + assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getAttemptNumber()); assertEquals(job2, allAttempts.get(5).getJobInfo().getId()); - assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getId()); + assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getAttemptNumber()); final List attemptsAfterTimestamp = jobPersistence.listAttemptsWithJobInfo(ConfigType.SYNC, Instant.ofEpochSecond(allAttempts.get(2).getAttempt().getEndedAtInSecond().orElseThrow())); assertEquals(3, attemptsAfterTimestamp.size()); assertEquals(job1, attemptsAfterTimestamp.get(0).getJobInfo().getId()); - assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getId()); + assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getAttemptNumber()); assertEquals(job1, attemptsAfterTimestamp.get(1).getJobInfo().getId()); - assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getId()); + assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getAttemptNumber()); assertEquals(job2, attemptsAfterTimestamp.get(2).getJobInfo().getId()); - assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getId()); + assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getAttemptNumber()); } private static Supplier incrementingSecondSupplier(final Instant startTime) { @@ -887,7 +899,7 @@ void testCreateAttempt() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -901,12 +913,12 @@ void testCreateAttemptAttemptId() throws IOException { final Job jobAfterOneAttempts = jobPersistence.getJob(jobId); assertEquals(0, attemptNumber1); - assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getId()); + assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getAttemptNumber()); final int attemptNumber2 = jobPersistence.createAttempt(jobId, LOG_PATH); final Job jobAfterTwoAttempts = jobPersistence.getJob(jobId); assertEquals(1, attemptNumber2); - assertEquals(Sets.newHashSet(0L, 1L), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getId).collect(Collectors.toSet())); + assertEquals(Sets.newHashSet(0, 1), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getAttemptNumber).collect(Collectors.toSet())); } @Test @@ -922,7 +934,7 @@ void testCreateAttemptWhileAttemptAlreadyRunning() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -941,7 +953,7 @@ void testCreateAttemptTerminal() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -1336,8 +1348,8 @@ void testGetNextJobWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -1562,8 +1574,8 @@ void testListJobsWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1682,7 +1694,7 @@ void testListJobsWithStatus() throws IOException { SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1720,7 +1732,7 @@ void testListJobsWithStatusAndConfigType() throws IOException, InterruptedExcept SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0L, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond(), SPEC_SCOPE); @@ -1757,12 +1769,12 @@ void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, I Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED)); final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob2 = createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob4 = createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index 0913a29ca734..badc1ac68d70 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null); + return new Attempt(1, 1L, null, null, attemptStatus, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 3e10fa003d36..4cdb4f15403d 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -68,7 +68,7 @@ void testGetLastFailedAttempt() { final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED); assertTrue(job.getLastFailedAttempt().isPresent()); - assertEquals(2, job.getLastFailedAttempt().get().getId()); + assertEquals(2, job.getLastFailedAttempt().get().getAttemptNumber()); } @Test diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 52c28f3640f1..f478e85b90c0 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -137,7 +137,7 @@ public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) { public static AttemptRead getAttemptRead(final Attempt attempt) { return new AttemptRead() - .id(attempt.getId()) + .id((long) attempt.getAttemptNumber()) .status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class)) .bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats .map(JobOutput::getSync) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index f61476a18600..4d93b86d089d 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -6,6 +6,9 @@ import com.google.common.base.Preconditions; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; +import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.AttemptStats; +import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -120,14 +123,44 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); - final List jobReads = jobs - .stream() - .map(JobConverter::getJobWithAttemptsRead) - .collect(Collectors.toList()); + final List jobReads = jobs.stream().map(JobConverter::getJobWithAttemptsRead).collect(Collectors.toList()); + hydrateWithStats(jobReads); return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } + private void hydrateWithStats(final List jobReads) throws IOException { + for (final JobWithAttemptsRead jwar : jobReads) { + for (final AttemptRead a : jwar.getAttempts()) { + // make sure the attempt id is correct + System.out.println(a.getId().intValue()); + final var attemptStats = jobPersistence.getAttemptStats(jwar.getJob().getId(), a.getId().intValue()); + + final var combinedStats = attemptStats.combinedStats(); + if (combinedStats == null) { + a.setTotalStats(new AttemptStats()); + continue; + } + + a.getTotalStats() + .estimatedBytes(combinedStats.getEstimatedBytes()) + .estimatedRecords(combinedStats.getEstimatedRecords()) + .bytesEmitted(combinedStats.getBytesEmitted()) + .recordsEmitted(combinedStats.getRecordsEmitted()); + + final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.setStreamStats(streamStats); + } + } + } + public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException { final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); return jobConverter.getJobInfoRead(job); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index fe1d084d92ce..c61232df5e08 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -68,7 +68,7 @@ class JobConverterTest { private static final long JOB_ID = 100L; - private static final long ATTEMPT_ID = 1002L; + private static final Integer ATTEMPT_ID = 1002; private static final String JOB_CONFIG_ID = "123"; private static final JobStatus JOB_STATUS = JobStatus.RUNNING; private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING; @@ -124,7 +124,7 @@ class JobConverterTest { .updatedAt(CREATED_AT)) .attempts(Lists.newArrayList(new AttemptInfoRead() .attempt(new AttemptRead() - .id(ATTEMPT_ID) + .id((long) ATTEMPT_ID) .status(io.airbyte.api.model.generated.AttemptStatus.RUNNING) .recordsSynced(RECORDS_EMITTED) .bytesSynced(BYTES_EMITTED) @@ -195,7 +195,7 @@ public void setUp() { when(job.getCreatedAtInSecond()).thenReturn(CREATED_AT); when(job.getUpdatedAtInSecond()).thenReturn(CREATED_AT); when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt)); - when(attempt.getId()).thenReturn(ATTEMPT_ID); + when(attempt.getAttemptNumber()).thenReturn(ATTEMPT_ID); when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS); when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT)); when(attempt.getLogPath()).thenReturn(LOG_PATH); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index cb68ad2b8205..9bb889b23788 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -77,7 +77,7 @@ class JobHistoryHandlerTest { private static final long JOB_ID = 100L; - private static final long ATTEMPT_ID = 1002L; + private static final int ATTEMPT_ID = 1002; private static final String JOB_CONFIG_ID = "ef296385-6796-413f-ac1b-49c4caba3f2b"; private static final JobStatus JOB_STATUS = JobStatus.SUCCEEDED; private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE; @@ -134,7 +134,7 @@ private static List toAttemptInfoList(final List attem private static AttemptRead toAttemptRead(final Attempt a) { return new AttemptRead() - .id(a.getId()) + .id((long) a.getAttemptNumber()) .status(Enums.convertTo(a.getStatus(), io.airbyte.api.model.generated.AttemptStatus.class)) .createdAt(a.getCreatedAtInSecond()) .updatedAt(a.getUpdatedAtInSecond()) @@ -146,7 +146,7 @@ private static Attempt createAttempt(final long jobId, final long timestamps, fi } @BeforeEach - void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { + void setUp() { testJobAttempt = createAttempt(JOB_ID, CREATED_AT, AttemptStatus.SUCCEEDED); testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT, CREATED_AT); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 39b089f8fef2..37a9f7647629 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -431,7 +431,7 @@ private boolean checkActiveJobPreviousAttempt(final Job activeJob, final int att if (activeJob.getAttempts().size() > minAttemptSize) { final Optional optionalAttempt = activeJob.getAttempts().stream() - .filter(attempt -> attempt.getId() == (attemptId - 1)).findFirst(); + .filter(attempt -> attempt.getAttemptNumber() == (attemptId - 1)).findFirst(); result = optionalAttempt.isPresent() && optionalAttempt.get().getStatus().equals(FAILED); } @@ -451,8 +451,7 @@ private void failNonTerminalJobs(final UUID connectionId) { continue; } - // the Attempt object 'id' is actually the value of the attempt_number column in the db - final int attemptNumber = (int) attempt.getId(); + final int attemptNumber = attempt.getAttemptNumber(); log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId); jobPersistence.failAttempt(jobId, attemptNumber); jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber, From cb8e9405ff5387ef90c1440412bd657a9feaa949 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Thu, 29 Dec 2022 17:10:03 -0800 Subject: [PATCH 2/8] Clean up test: rename ATTEMPT_ID to ATTEMPT_NUMBER. --- .../server/converters/JobConverterTest.java | 6 ++-- .../handlers/JobHistoryHandlerTest.java | 28 +++++++------------ 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index c61232df5e08..f73ec6e62aad 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -68,7 +68,7 @@ class JobConverterTest { private static final long JOB_ID = 100L; - private static final Integer ATTEMPT_ID = 1002; + private static final Integer ATTEMPT_NUMBER = 0; private static final String JOB_CONFIG_ID = "123"; private static final JobStatus JOB_STATUS = JobStatus.RUNNING; private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING; @@ -124,7 +124,7 @@ class JobConverterTest { .updatedAt(CREATED_AT)) .attempts(Lists.newArrayList(new AttemptInfoRead() .attempt(new AttemptRead() - .id((long) ATTEMPT_ID) + .id((long) ATTEMPT_NUMBER) .status(io.airbyte.api.model.generated.AttemptStatus.RUNNING) .recordsSynced(RECORDS_EMITTED) .bytesSynced(BYTES_EMITTED) @@ -195,7 +195,7 @@ public void setUp() { when(job.getCreatedAtInSecond()).thenReturn(CREATED_AT); when(job.getUpdatedAtInSecond()).thenReturn(CREATED_AT); when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt)); - when(attempt.getAttemptNumber()).thenReturn(ATTEMPT_ID); + when(attempt.getAttemptNumber()).thenReturn(ATTEMPT_NUMBER); when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS); when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT)); when(attempt.getLogPath()).thenReturn(LOG_PATH); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 9bb889b23788..d475f5096b40 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -77,7 +77,7 @@ class JobHistoryHandlerTest { private static final long JOB_ID = 100L; - private static final int ATTEMPT_ID = 1002; + private static final int ATTEMPT_NUMBER = 0; private static final String JOB_CONFIG_ID = "ef296385-6796-413f-ac1b-49c4caba3f2b"; private static final JobStatus JOB_STATUS = JobStatus.SUCCEEDED; private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE; @@ -89,17 +89,9 @@ class JobHistoryHandlerTest { private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>()); private static final long CREATED_AT = System.currentTimeMillis() / 1000; - private SourceRead sourceRead; - private ConnectionRead connectionRead; - private DestinationRead destinationRead; private ConnectionsHandler connectionsHandler; private SourceHandler sourceHandler; private DestinationHandler destinationHandler; - private SourceDefinitionsHandler sourceDefinitionsHandler; - private DestinationDefinitionsHandler destinationDefinitionsHandler; - private StandardDestinationDefinition standardDestinationDefinition; - private StandardSourceDefinition standardSourceDefinition; - private AirbyteVersion airbyteVersion; private Job testJob; private Attempt testJobAttempt; private JobPersistence jobPersistence; @@ -142,7 +134,7 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); } @BeforeEach @@ -153,11 +145,11 @@ void setUp() { connectionsHandler = mock(ConnectionsHandler.class); sourceHandler = mock(SourceHandler.class); - sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); destinationHandler = mock(DestinationHandler.class); - destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); - airbyteVersion = mock(AirbyteVersion.class); jobPersistence = mock(JobPersistence.class); + final SourceDefinitionsHandler sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); + final DestinationDefinitionsHandler destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); + final AirbyteVersion airbyteVersion = mock(AirbyteVersion.class); jobHistoryHandler = new JobHistoryHandler(jobPersistence, WorkerEnvironment.DOCKER, LogConfigs.EMPTY, connectionsHandler, sourceHandler, sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion); } @@ -305,16 +297,16 @@ void testGetJobInfoLight() throws IOException { @Test @DisplayName("Should return the right info to debug this job") void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNotFoundException, URISyntaxException { - standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); + StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); + SourceRead sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); - standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); + StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); - destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); + DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); - connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + ConnectionRead connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); when(connectionsHandler.getConnection(UUID.fromString(testJob.getScope()))).thenReturn(connectionRead); final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(); From e6381a7162479bcf5686aad04fd9542e79a120b3 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 30 Dec 2022 10:27:11 -0800 Subject: [PATCH 3/8] Modify test to include returning stats. --- .../server/handlers/JobHistoryHandler.java | 9 ++--- .../handlers/JobHistoryHandlerTest.java | 36 ++++++++++++++++--- 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 4d93b86d089d..82cad9c484a8 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -132,13 +132,13 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept private void hydrateWithStats(final List jobReads) throws IOException { for (final JobWithAttemptsRead jwar : jobReads) { for (final AttemptRead a : jwar.getAttempts()) { - // make sure the attempt id is correct - System.out.println(a.getId().intValue()); - final var attemptStats = jobPersistence.getAttemptStats(jwar.getJob().getId(), a.getId().intValue()); + a.setTotalStats(new AttemptStats()); + final var attemptStats = jobPersistence.getAttemptStats(jwar.getJob().getId(), a.getId().intValue()); final var combinedStats = attemptStats.combinedStats(); if (combinedStats == null) { - a.setTotalStats(new AttemptStats()); + // If overall stats are missing, assume stream stats are also missing, since overall stats are + // easier to produce than stream stats. continue; } @@ -150,6 +150,7 @@ private void hydrateWithStats(final List jobReads) throws I final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() .streamName(s.getStreamName()) + .streamNamespace(s.getStreamNamespace()) .stats(new AttemptStats() .bytesEmitted(s.getStats().getBytesEmitted()) .recordsEmitted(s.getStats().getRecordsEmitted()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index d475f5096b40..c4800b7e5d7b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -6,6 +6,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -14,6 +16,7 @@ import io.airbyte.api.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.AttemptRead; +import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; @@ -42,9 +45,12 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; +import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.JobPersistence.AttemptStats; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; @@ -158,6 +164,21 @@ void setUp() { @DisplayName("When listing jobs") class ListJobs { + private static final AttemptStats ATTEMPT_STATS = new AttemptStats(new SyncStats().withBytesEmitted(10L).withRecordsEmitted(10L), + List.of( + new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)), + new StreamSyncStats().withStreamName("stream2") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)))); + + private static final io.airbyte.api.model.generated.AttemptStats ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats() + .bytesEmitted(10L).recordsEmitted(10L); + + private static final List ATTEMPT_STREAM_STATS = List.of( + new AttemptStreamStats().streamNamespace("ns1").streamName("stream1") + .stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L)), + new AttemptStreamStats().streamName("stream2").stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L))); + @Test @DisplayName("Should return jobs with/without attempts in descending order") void testListJobs() throws IOException { @@ -174,6 +195,7 @@ void testListJobs() throws IOException { when(jobPersistence.listJobs(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, pagesize, rowOffset)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); + when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -181,8 +203,8 @@ void testListJobs() throws IOException { .pagination(new Pagination().pageSize(pagesize).rowOffset(rowOffset)); final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); - final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( - testJobAttempt))); + final var expAttemptRead = toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS); + final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(expAttemptRead)); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); @@ -215,6 +237,7 @@ void testListJobsFor() throws IOException { when(jobPersistence.listJobs(configTypes, JOB_CONFIG_ID, pagesize, rowOffset)).thenReturn(List.of(latestJob, secondJob, firstJob)); when(jobPersistence.getJobCount(configTypes, JOB_CONFIG_ID)).thenReturn(3L); + when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final JobListRequestBody requestBody = new JobListRequestBody() .configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA)) @@ -223,9 +246,11 @@ void testListJobsFor() throws IOException { final JobReadList jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var firstJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(firstJob)).attempts(ImmutableList.of(toAttemptRead(testJobAttempt))); + new JobWithAttemptsRead().job(toJobInfo(firstJob)) + .attempts(ImmutableList.of(toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var secondJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(secondJob)).attempts(ImmutableList.of(toAttemptRead(secondJobAttempt))); + new JobWithAttemptsRead().job(toJobInfo(secondJob)) + .attempts(ImmutableList.of(toAttemptRead(secondJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead)).totalJobCount(3L); @@ -249,6 +274,7 @@ void testListJobsIncludingJobId() throws IOException { when(jobPersistence.listJobsIncludingId(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, jobId2, pagesize)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); + when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -258,7 +284,7 @@ void testListJobsIncludingJobId() throws IOException { final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( - testJobAttempt))); + testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); From b753d9e960eed6f3695988023ae14ccfb282c2e7 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 30 Dec 2022 11:13:21 -0800 Subject: [PATCH 4/8] Add tests for the debug job info route. --- .../server/handlers/JobHistoryHandler.java | 70 ++++++++++--------- .../handlers/JobHistoryHandlerTest.java | 45 ++++++------ 2 files changed, 61 insertions(+), 54 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 82cad9c484a8..6ad01c0d7062 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -5,6 +5,7 @@ package io.airbyte.server.handlers; import com.google.common.base.Preconditions; +import io.airbyte.api.model.generated.AttemptInfoRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.AttemptRead; import io.airbyte.api.model.generated.AttemptStats; @@ -121,45 +122,44 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept (request.getPagination() != null && request.getPagination().getRowOffset() != null) ? request.getPagination().getRowOffset() : 0); } - final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); - final List jobReads = jobs.stream().map(JobConverter::getJobWithAttemptsRead).collect(Collectors.toList()); - hydrateWithStats(jobReads); + for (final JobWithAttemptsRead jwar : jobReads) { + for (final AttemptRead a : jwar.getAttempts()) { + hydrateWithStats(jwar.getJob().getId(), a); + } + } + final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } - private void hydrateWithStats(final List jobReads) throws IOException { - for (final JobWithAttemptsRead jwar : jobReads) { - for (final AttemptRead a : jwar.getAttempts()) { - a.setTotalStats(new AttemptStats()); - - final var attemptStats = jobPersistence.getAttemptStats(jwar.getJob().getId(), a.getId().intValue()); - final var combinedStats = attemptStats.combinedStats(); - if (combinedStats == null) { - // If overall stats are missing, assume stream stats are also missing, since overall stats are - // easier to produce than stream stats. - continue; - } - - a.getTotalStats() - .estimatedBytes(combinedStats.getEstimatedBytes()) - .estimatedRecords(combinedStats.getEstimatedRecords()) - .bytesEmitted(combinedStats.getBytesEmitted()) - .recordsEmitted(combinedStats.getRecordsEmitted()); - - final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() - .streamName(s.getStreamName()) - .streamNamespace(s.getStreamNamespace()) - .stats(new AttemptStats() - .bytesEmitted(s.getStats().getBytesEmitted()) - .recordsEmitted(s.getStats().getRecordsEmitted()) - .estimatedBytes(s.getStats().getEstimatedBytes()) - .estimatedRecords(s.getStats().getEstimatedRecords()))) - .collect(Collectors.toList()); - a.setStreamStats(streamStats); + private void hydrateWithStats(final long jobId, final AttemptRead a) throws IOException { + a.setTotalStats(new AttemptStats()); + + final var attemptStats = jobPersistence.getAttemptStats(jobId, a.getId().intValue()); + final var combinedStats = attemptStats.combinedStats(); + if (combinedStats == null) { + // If overall stats are missing, assume stream stats are also missing, since overall stats are + // easier to produce than stream stats. Exit early. + return; } - } + + a.getTotalStats() + .estimatedBytes(combinedStats.getEstimatedBytes()) + .estimatedRecords(combinedStats.getEstimatedRecords()) + .bytesEmitted(combinedStats.getBytesEmitted()) + .recordsEmitted(combinedStats.getRecordsEmitted()); + + final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .streamNamespace(s.getStreamNamespace()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.setStreamStats(streamStats); } public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException { @@ -177,6 +177,10 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job); + for (final AttemptInfoRead a : jobinfoRead.getAttempts()) { + hydrateWithStats(job.getId(), a.getAttempt()); + } + final JobDebugInfoRead jobDebugInfoRead = buildJobDebugInfoRead(jobinfoRead); if (temporalClient != null) { final UUID connectionId = UUID.fromString(job.getScope()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index c4800b7e5d7b..250153a4178e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -95,6 +95,21 @@ class JobHistoryHandlerTest { private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>()); private static final long CREATED_AT = System.currentTimeMillis() / 1000; + private static final AttemptStats ATTEMPT_STATS = new AttemptStats(new SyncStats().withBytesEmitted(10L).withRecordsEmitted(10L), + List.of( + new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)), + new StreamSyncStats().withStreamName("stream2") + .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)))); + + private static final io.airbyte.api.model.generated.AttemptStats ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats() + .bytesEmitted(10L).recordsEmitted(10L); + + private static final List ATTEMPT_STREAM_STATS = List.of( + new AttemptStreamStats().streamNamespace("ns1").streamName("stream1") + .stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L)), + new AttemptStreamStats().streamName("stream2").stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L))); + private ConnectionsHandler connectionsHandler; private SourceHandler sourceHandler; private DestinationHandler destinationHandler; @@ -164,21 +179,6 @@ void setUp() { @DisplayName("When listing jobs") class ListJobs { - private static final AttemptStats ATTEMPT_STATS = new AttemptStats(new SyncStats().withBytesEmitted(10L).withRecordsEmitted(10L), - List.of( - new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1") - .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)), - new StreamSyncStats().withStreamName("stream2") - .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)))); - - private static final io.airbyte.api.model.generated.AttemptStats ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats() - .bytesEmitted(10L).recordsEmitted(10L); - - private static final List ATTEMPT_STREAM_STATS = List.of( - new AttemptStreamStats().streamNamespace("ns1").streamName("stream1") - .stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L)), - new AttemptStreamStats().streamName("stream2").stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L))); - @Test @DisplayName("Should return jobs with/without attempts in descending order") void testListJobs() throws IOException { @@ -323,16 +323,16 @@ void testGetJobInfoLight() throws IOException { @Test @DisplayName("Should return the right info to debug this job") void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNotFoundException, URISyntaxException { - StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); + final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - SourceRead sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); + final SourceRead sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); - StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); + final StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); - DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); + final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); - ConnectionRead connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + final ConnectionRead connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); when(connectionsHandler.getConnection(UUID.fromString(testJob.getScope()))).thenReturn(connectionRead); final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(); @@ -343,10 +343,13 @@ void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNo destinationIdRequestBody.setDestinationId(connectionRead.getDestinationId()); when(destinationHandler.getDestination(destinationIdRequestBody)).thenReturn(destinationRead); when(jobPersistence.getJob(JOB_ID)).thenReturn(testJob); + when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final JobIdRequestBody requestBody = new JobIdRequestBody().id(JOB_ID); final JobDebugInfoRead jobDebugInfoActual = jobHistoryHandler.getJobDebugInfo(requestBody); - final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(toAttemptInfoList(ImmutableList.of(testJobAttempt))); + final List attemptInfoReads = toAttemptInfoList(ImmutableList.of(testJobAttempt)); + attemptInfoReads.forEach(read -> read.getAttempt().totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS)); + final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(attemptInfoReads); assertEquals(exp, jobDebugInfoActual); } From 1d498d70e1e403bb9a8383eee0ab9e5ba4368b3b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Fri, 30 Dec 2022 11:18:12 -0800 Subject: [PATCH 5/8] Add comment.; --- .../server/handlers/JobHistoryHandler.java | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 6ad01c0d7062..1cd05f6e1d5a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -133,33 +133,41 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } + /** + * Retrieve stats for a given job id and attempt number and hydrate the api model with the retrieved + * information. + * + * @param jobId the job the attempt belongs to. Used as an index to retrieve stats. + * @param a the attempt to hydrate stats for. + * @throws IOException + */ private void hydrateWithStats(final long jobId, final AttemptRead a) throws IOException { - a.setTotalStats(new AttemptStats()); - - final var attemptStats = jobPersistence.getAttemptStats(jobId, a.getId().intValue()); - final var combinedStats = attemptStats.combinedStats(); - if (combinedStats == null) { - // If overall stats are missing, assume stream stats are also missing, since overall stats are - // easier to produce than stream stats. Exit early. - return; - } + a.setTotalStats(new AttemptStats()); + + final var attemptStats = jobPersistence.getAttemptStats(jobId, a.getId().intValue()); + final var combinedStats = attemptStats.combinedStats(); + if (combinedStats == null) { + // If overall stats are missing, assume stream stats are also missing, since overall stats are + // easier to produce than stream stats. Exit early. + return; + } - a.getTotalStats() - .estimatedBytes(combinedStats.getEstimatedBytes()) - .estimatedRecords(combinedStats.getEstimatedRecords()) - .bytesEmitted(combinedStats.getBytesEmitted()) - .recordsEmitted(combinedStats.getRecordsEmitted()); - - final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() - .streamName(s.getStreamName()) - .streamNamespace(s.getStreamNamespace()) - .stats(new AttemptStats() - .bytesEmitted(s.getStats().getBytesEmitted()) - .recordsEmitted(s.getStats().getRecordsEmitted()) - .estimatedBytes(s.getStats().getEstimatedBytes()) - .estimatedRecords(s.getStats().getEstimatedRecords()))) - .collect(Collectors.toList()); - a.setStreamStats(streamStats); + a.getTotalStats() + .estimatedBytes(combinedStats.getEstimatedBytes()) + .estimatedRecords(combinedStats.getEstimatedRecords()) + .bytesEmitted(combinedStats.getBytesEmitted()) + .recordsEmitted(combinedStats.getRecordsEmitted()); + + final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() + .streamName(s.getStreamName()) + .streamNamespace(s.getStreamNamespace()) + .stats(new AttemptStats() + .bytesEmitted(s.getStats().getBytesEmitted()) + .recordsEmitted(s.getStats().getRecordsEmitted()) + .estimatedBytes(s.getStats().getEstimatedBytes()) + .estimatedRecords(s.getStats().getEstimatedRecords()))) + .collect(Collectors.toList()); + a.setStreamStats(streamStats); } public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException { From 408575c1b6d0c77177c5cbb67aa102c9f598ef11 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 18:12:09 -0800 Subject: [PATCH 6/8] Checkpoint: Implement persistence method to retrieve stats for a list of jobs. --- .../job/DefaultJobPersistence.java | 51 +++++++++++++++++++ .../persistence/job/JobPersistence.java | 4 ++ .../job/DefaultJobPersistenceTest.java | 27 ++++++++++ 3 files changed, 82 insertions(+) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index af872b3b0d34..7059ff9eea9d 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import io.airbyte.commons.enums.Enums; @@ -72,6 +73,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.InsertValuesStepN; @@ -511,6 +513,55 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t }); } + @Override + public Map getAttemptStats(final List jobIds) throws IOException { + final var jobIdsStr = StringUtils.join(jobIds, ','); + return jobDatabase.query(ctx -> { + final var attemptStats = new HashMap(); + final var syncResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id," + + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM sync_stats stats " + + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " + + "WHERE job_id IN ( " + jobIdsStr + ");"); + syncResults.forEach(r -> { + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + final var syncStats = new SyncStats() + .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); + attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); + }); + + final var streamResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id, " + + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM stream_stats stats " + + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " + + "WHERE attempt_id IN " + + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); + + streamResults.forEach(r -> { + final var streamSyncStats = new StreamSyncStats() + .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) + .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) + .withStats(new SyncStats() + .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); + + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + if (!attemptStats.containsKey(key)) { + throw new RuntimeException("there are stream stats without sync stats entries suggesting the database is in a bad state"); + } + attemptStats.get(key).perStreamStats().add(streamSyncStats); + }); + return attemptStats; + }); + } + @Override public List getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException { return jobDatabase diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index 895382c40b20..e3ed5f169f0b 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -49,6 +49,8 @@ public interface JobPersistence { */ record AttemptStats(SyncStats combinedStats, List perStreamStats) {} + record JobAttemptPair(long id, int attemptNumber) {} + /** * Retrieve the combined and per stream stats for a single attempt. * @@ -57,6 +59,8 @@ record AttemptStats(SyncStats combinedStats, List perStreamStat */ AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException; + Map getAttemptStats(List jobIds) throws IOException; + List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; Job getJob(long jobId) throws IOException; diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index b186c4241131..6ec6821f0525 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -468,6 +468,33 @@ void testGetStatsNoResult() throws IOException { } + @Test + @DisplayName("Retrieving all attempts stats for a job should return the right information") + void testGetMultipleStats() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int attemptNumberOne = jobPersistence.createAttempt(jobId, LOG_PATH); + + // First write for first attempt. + var streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumberOne, 1000, 1000, 1000, 1000, streamStats); + + // Second write for first attempt. + when(timeSupplier.get()).thenReturn(Instant.now()); + streamStats = List.of( + new StreamSyncStats().withStreamName("name1") + .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); + jobPersistence.writeStats(jobId, attemptNumberOne, 2000, 2000, 2000, 2000, streamStats); + jobPersistence.failAttempt(jobId, attemptNumberOne); + + final int attemptNumberTwo = jobPersistence.createAttempt(jobId, LOG_PATH); + jobPersistence.writeStats(jobId, attemptNumberTwo, 1000, 1000, 1000, 1000, streamStats); + + final var stats = jobPersistence.getAttemptStats(List.of(jobId)); + + } + } @Test From 8aea8211a2d60222bb8ca7e76f67bd2678d5dd7b Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 21:06:18 -0800 Subject: [PATCH 7/8] Implement batch stats read and tests. --- .../job/DefaultJobPersistence.java | 99 +++++++++++-------- .../persistence/job/JobPersistence.java | 11 +++ .../job/DefaultJobPersistenceTest.java | 47 ++++++--- 3 files changed, 105 insertions(+), 52 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index 7059ff9eea9d..d7485ee70644 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -517,51 +517,68 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t public Map getAttemptStats(final List jobIds) throws IOException { final var jobIdsStr = StringUtils.join(jobIds, ','); return jobDatabase.query(ctx -> { - final var attemptStats = new HashMap(); - final var syncResults = ctx.fetch( - "SELECT atmpt.attempt_number, atmpt.job_id," - + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " - + "FROM sync_stats stats " - + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " - + "WHERE job_id IN ( " + jobIdsStr + ");"); - syncResults.forEach(r -> { - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); - final var syncStats = new SyncStats() - .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) - .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) - .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) - .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); - attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); - }); - - final var streamResults = ctx.fetch( - "SELECT atmpt.attempt_number, atmpt.job_id, " - + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " - + "FROM stream_stats stats " - + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " - + "WHERE attempt_id IN " - + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); - - streamResults.forEach(r -> { - final var streamSyncStats = new StreamSyncStats() - .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) - .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) - .withStats(new SyncStats() - .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) - .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) - .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) - .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); - - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); - if (!attemptStats.containsKey(key)) { - throw new RuntimeException("there are stream stats without sync stats entries suggesting the database is in a bad state"); - } - attemptStats.get(key).perStreamStats().add(streamSyncStats); - }); + // Instead of one massive join query, separate this query into two queries for better readability + // for now. + // We can combine the queries at a later date if this still proves to be not efficient enough. + final Map attemptStats = hydrateSyncStats(jobIdsStr, ctx); + hydrateStreamStats(jobIdsStr, ctx, attemptStats); return attemptStats; }); } + private static Map hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) { + final var attemptStats = new HashMap(); + final var syncResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id," + + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM sync_stats stats " + + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " + + "WHERE job_id IN ( " + jobIdsStr + ");"); + syncResults.forEach(r -> { + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + final var syncStats = new SyncStats() + .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); + attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); + }); + return attemptStats; + } + + /** + * This method needed to be called after + * {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats + * has prepopulated the map. + */ + private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map attemptStats) { + final var streamResults = ctx.fetch( + "SELECT atmpt.attempt_number, atmpt.job_id, " + + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " + + "FROM stream_stats stats " + + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " + + "WHERE attempt_id IN " + + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); + + streamResults.forEach(r -> { + final var streamSyncStats = new StreamSyncStats() + .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) + .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) + .withStats(new SyncStats() + .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) + .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) + .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) + .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); + + final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); + if (!attemptStats.containsKey(key)) { + LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key); + return; + } + attemptStats.get(key).perStreamStats().add(streamSyncStats); + }); + } + @Override public List getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException { return jobDatabase diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index e3ed5f169f0b..da7b3a98474e 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -59,6 +59,17 @@ record JobAttemptPair(long id, int attemptNumber) {} */ AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException; + /** + * Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to + * avoid overloading the database with too many queries. + *

+ * This implementation is intended to utilise complex joins under the hood to reduce the potential + * N+1 database pattern. + * + * @param jobIds + * @return + * @throws IOException + */ Map getAttemptStats(List jobIds) throws IOException; List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index 6ec6821f0525..bae6a9435222 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -49,6 +49,7 @@ import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.persistence.job.JobPersistence.AttemptStats; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; @@ -471,27 +472,51 @@ void testGetStatsNoResult() throws IOException { @Test @DisplayName("Retrieving all attempts stats for a job should return the right information") void testGetMultipleStats() throws IOException { - final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); - final int attemptNumberOne = jobPersistence.createAttempt(jobId, LOG_PATH); + final long jobOneId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int jobOneAttemptNumberOne = jobPersistence.createAttempt(jobOneId, LOG_PATH); // First write for first attempt. var streamStats = List.of( new StreamSyncStats().withStreamName("name1") .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); - jobPersistence.writeStats(jobId, attemptNumberOne, 1000, 1000, 1000, 1000, streamStats); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); - // Second write for first attempt. + // Second write for first attempt. This is the record that should be returned. when(timeSupplier.get()).thenReturn(Instant.now()); streamStats = List.of( new StreamSyncStats().withStreamName("name1") .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); - jobPersistence.writeStats(jobId, attemptNumberOne, 2000, 2000, 2000, 2000, streamStats); - jobPersistence.failAttempt(jobId, attemptNumberOne); - - final int attemptNumberTwo = jobPersistence.createAttempt(jobId, LOG_PATH); - jobPersistence.writeStats(jobId, attemptNumberTwo, 1000, 1000, 1000, 1000, streamStats); - - final var stats = jobPersistence.getAttemptStats(List.of(jobId)); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 2000, 2000, 2000, 2000, streamStats); + jobPersistence.failAttempt(jobOneId, jobOneAttemptNumberOne); + + // Second attempt for first job. + final int jobOneAttemptNumberTwo = jobPersistence.createAttempt(jobOneId, LOG_PATH); + jobPersistence.writeStats(jobOneId, jobOneAttemptNumberTwo, 1000, 1000, 1000, 1000, streamStats); + + // First attempt for second job. + final long jobTwoId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); + final int jobTwoAttemptNumberOne = jobPersistence.createAttempt(jobTwoId, LOG_PATH); + jobPersistence.writeStats(jobTwoId, jobTwoAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); + + final var stats = jobPersistence.getAttemptStats(List.of(jobOneId, jobTwoId)); + final var exp = Map.of( + new JobAttemptPair(jobOneId, jobOneAttemptNumberOne), + new AttemptStats( + new SyncStats().withRecordsEmitted(2000L).withBytesEmitted(2000L).withEstimatedBytes(2000L).withEstimatedRecords(2000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), + new JobAttemptPair(jobOneId, jobOneAttemptNumberTwo), + new AttemptStats( + new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), + new JobAttemptPair(jobTwoId, jobTwoAttemptNumberOne), + new AttemptStats( + new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), + List.of(new StreamSyncStats().withStreamName("name1").withStats( + new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L))))); + + assertEquals(exp, stats); } From 61473b7c08bf36d9aa9290d5605eb96f322b96c3 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Wed, 4 Jan 2023 21:41:11 -0800 Subject: [PATCH 8/8] Update tests. --- .../server/handlers/JobHistoryHandler.java | 21 +++++++++++++++---- .../handlers/JobHistoryHandlerTest.java | 15 ++++++++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 1cd05f6e1d5a..752cb2e446ce 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -37,6 +37,7 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; import io.airbyte.server.converters.JobConverter; @@ -45,11 +46,14 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class JobHistoryHandler { private final ConnectionsHandler connectionsHandler; @@ -123,9 +127,17 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept } final List jobReads = jobs.stream().map(JobConverter::getJobWithAttemptsRead).collect(Collectors.toList()); + final var jobIds = jobReads.stream().map(r -> r.getJob().getId()).toList(); + final Map stats = jobPersistence.getAttemptStats(jobIds); for (final JobWithAttemptsRead jwar : jobReads) { for (final AttemptRead a : jwar.getAttempts()) { - hydrateWithStats(jwar.getJob().getId(), a); + final var stat = stats.get(new JobAttemptPair(jwar.getJob().getId(), a.getId().intValue())); + if (stat == null) { + log.error("Missing stats for job {} attempt {}", jwar.getJob().getId(), a.getId().intValue()); + continue; + } + + hydrateWithStats(a, stat); } } @@ -141,10 +153,9 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept * @param a the attempt to hydrate stats for. * @throws IOException */ - private void hydrateWithStats(final long jobId, final AttemptRead a) throws IOException { + private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptStats attemptStats) throws IOException { a.setTotalStats(new AttemptStats()); - final var attemptStats = jobPersistence.getAttemptStats(jobId, a.getId().intValue()); final var combinedStats = attemptStats.combinedStats(); if (combinedStats == null) { // If overall stats are missing, assume stream stats are also missing, since overall stats are @@ -186,7 +197,9 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job); for (final AttemptInfoRead a : jobinfoRead.getAttempts()) { - hydrateWithStats(job.getId(), a.getAttempt()); + final int attemptNumber = a.getAttempt().getId().intValue(); + final var attemptStats = jobPersistence.getAttemptStats(job.getId(), attemptNumber); + hydrateWithStats(a.getAttempt(), attemptStats); } final JobDebugInfoRead jobDebugInfoRead = buildJobDebugInfoRead(jobinfoRead); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 250153a4178e..105e2fbb9c17 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -51,6 +51,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; import io.airbyte.persistence.job.JobPersistence.AttemptStats; +import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; @@ -69,6 +70,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -195,7 +197,9 @@ void testListJobs() throws IOException { when(jobPersistence.listJobs(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, pagesize, rowOffset)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); - when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); + when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -237,7 +241,10 @@ void testListJobsFor() throws IOException { when(jobPersistence.listJobs(configTypes, JOB_CONFIG_ID, pagesize, rowOffset)).thenReturn(List.of(latestJob, secondJob, firstJob)); when(jobPersistence.getJobCount(configTypes, JOB_CONFIG_ID)).thenReturn(3L); - when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); + when(jobPersistence.getAttemptStats(List.of(300L, 200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(secondJobId, 0), ATTEMPT_STATS, + new JobAttemptPair(latestJobId, 0), ATTEMPT_STATS)); final JobListRequestBody requestBody = new JobListRequestBody() .configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA)) @@ -274,7 +281,9 @@ void testListJobsIncludingJobId() throws IOException { when(jobPersistence.listJobsIncludingId(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, jobId2, pagesize)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); - when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); + when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( + new JobAttemptPair(100, 0), ATTEMPT_STATS, + new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API))