From ef335e283f5e098eeae7db6e808f62e34b05b094 Mon Sep 17 00:00:00 2001 From: Topher Lubaway Date: Fri, 6 Jan 2023 10:09:31 -0600 Subject: [PATCH] Revert "Progress Bar Read APIs (#20937)" (#21115) Breaks when there is no config present https://github.com/airbytehq/airbyte/issues/21112 This reverts commit 3a2b0405c425562ba5fa8bfe3f5a4646c165591a. --- .../job/DefaultJobPersistence.java | 70 +-------- .../persistence/job/JobPersistence.java | 15 -- .../persistence/job/models/Attempt.java | 16 +- .../job/DefaultJobPersistenceTest.java | 140 +++++------------- .../persistence/job/models/AttemptTest.java | 2 +- .../persistence/job/models/JobTest.java | 2 +- .../server/converters/JobConverter.java | 2 +- .../server/handlers/JobHistoryHandler.java | 69 +-------- .../server/converters/JobConverterTest.java | 6 +- .../handlers/JobHistoryHandlerTest.java | 82 ++++------ ...obCreationAndStatusUpdateActivityImpl.java | 5 +- 11 files changed, 87 insertions(+), 322 deletions(-) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java index d7485ee70644..1c5dd8253cdb 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobPersistence.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.UnmodifiableIterator; import io.airbyte.commons.enums.Enums; @@ -73,7 +72,6 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; -import org.apache.commons.lang3.StringUtils; import org.jooq.DSLContext; import org.jooq.Field; import org.jooq.InsertValuesStepN; @@ -513,72 +511,6 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t }); } - @Override - public Map getAttemptStats(final List jobIds) throws IOException { - final var jobIdsStr = StringUtils.join(jobIds, ','); - return jobDatabase.query(ctx -> { - // Instead of one massive join query, separate this query into two queries for better readability - // for now. - // We can combine the queries at a later date if this still proves to be not efficient enough. - final Map attemptStats = hydrateSyncStats(jobIdsStr, ctx); - hydrateStreamStats(jobIdsStr, ctx, attemptStats); - return attemptStats; - }); - } - - private static Map hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) { - final var attemptStats = new HashMap(); - final var syncResults = ctx.fetch( - "SELECT atmpt.attempt_number, atmpt.job_id," - + "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " - + "FROM sync_stats stats " - + "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id " - + "WHERE job_id IN ( " + jobIdsStr + ");"); - syncResults.forEach(r -> { - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); - final var syncStats = new SyncStats() - .withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED)) - .withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED)) - .withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS)) - .withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES)); - attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList())); - }); - return attemptStats; - } - - /** - * This method needed to be called after - * {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats - * has prepopulated the map. - */ - private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map attemptStats) { - final var streamResults = ctx.fetch( - "SELECT atmpt.attempt_number, atmpt.job_id, " - + "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted " - + "FROM stream_stats stats " - + "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id " - + "WHERE attempt_id IN " - + "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));"); - - streamResults.forEach(r -> { - final var streamSyncStats = new StreamSyncStats() - .withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE)) - .withStreamName(r.get(STREAM_STATS.STREAM_NAME)) - .withStats(new SyncStats() - .withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED)) - .withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED)) - .withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS)) - .withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES))); - - final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER)); - if (!attemptStats.containsKey(key)) { - LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key); - return; - } - attemptStats.get(key).perStreamStats().add(streamSyncStats); - }); - } - @Override public List getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException { return jobDatabase @@ -917,7 +849,7 @@ private static Job getJobFromRecord(final Record record) { private static Attempt getAttemptFromRecord(final Record record) { return new Attempt( - record.get(ATTEMPT_NUMBER, int.class), + record.get(ATTEMPT_NUMBER, Long.class), record.get(JOB_ID, Long.class), Path.of(record.get("log_path", String.class)), record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class), diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java index da7b3a98474e..895382c40b20 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/JobPersistence.java @@ -49,8 +49,6 @@ public interface JobPersistence { */ record AttemptStats(SyncStats combinedStats, List perStreamStats) {} - record JobAttemptPair(long id, int attemptNumber) {} - /** * Retrieve the combined and per stream stats for a single attempt. * @@ -59,19 +57,6 @@ record JobAttemptPair(long id, int attemptNumber) {} */ AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException; - /** - * Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to - * avoid overloading the database with too many queries. - *

- * This implementation is intended to utilise complex joins under the hood to reduce the potential - * N+1 database pattern. - * - * @param jobIds - * @return - * @throws IOException - */ - Map getAttemptStats(List jobIds) throws IOException; - List getNormalizationSummary(long jobId, int attemptNumber) throws IOException; Job getJob(long jobId) throws IOException; diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java index a3dc08b076d2..110deaecab7b 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/models/Attempt.java @@ -13,7 +13,7 @@ public class Attempt { - private final int attemptNumber; + private final long id; private final long jobId; private final JobOutput output; private final AttemptStatus status; @@ -23,7 +23,7 @@ public class Attempt { private final long createdAtInSecond; private final Long endedAtInSecond; - public Attempt(final int attemptNumber, + public Attempt(final long id, final long jobId, final Path logPath, final @Nullable JobOutput output, @@ -32,7 +32,7 @@ public Attempt(final int attemptNumber, final long createdAtInSecond, final long updatedAtInSecond, final @Nullable Long endedAtInSecond) { - this.attemptNumber = attemptNumber; + this.id = id; this.jobId = jobId; this.output = output; this.status = status; @@ -43,8 +43,8 @@ public Attempt(final int attemptNumber, this.endedAtInSecond = endedAtInSecond; } - public int getAttemptNumber() { - return attemptNumber; + public long getId() { + return id; } public long getJobId() { @@ -92,7 +92,7 @@ public boolean equals(final Object o) { return false; } final Attempt attempt = (Attempt) o; - return attemptNumber == attempt.attemptNumber && + return id == attempt.id && jobId == attempt.jobId && updatedAtInSecond == attempt.updatedAtInSecond && createdAtInSecond == attempt.createdAtInSecond && @@ -105,13 +105,13 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); + return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond); } @Override public String toString() { return "Attempt{" + - "id=" + attemptNumber + + "id=" + id + ", jobId=" + jobId + ", output=" + output + ", status=" + status + diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java index bae6a9435222..0b48aaf036c4 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobPersistenceTest.java @@ -14,7 +14,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -49,7 +48,6 @@ import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.instance.test.TestDatabaseProviders; import io.airbyte.persistence.job.JobPersistence.AttemptStats; -import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptStatus; import io.airbyte.persistence.job.models.AttemptWithJobInfo; @@ -144,7 +142,7 @@ static void dbDown() { container.close(); } - private static Attempt createAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -157,7 +155,7 @@ private static Attempt createAttempt(final int id, final long jobId, final Attem NOW.getEpochSecond()); } - private static Attempt createUnfinishedAttempt(final int id, final long jobId, final AttemptStatus status, final Path logPath) { + private static Attempt createUnfinishedAttempt(final long id, final long jobId, final AttemptStatus status, final Path logPath) { return new Attempt( id, jobId, @@ -240,7 +238,7 @@ void testCompleteAttemptFailed() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, - Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), + Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -258,7 +256,7 @@ void testCompleteAttemptSuccess() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -328,8 +326,8 @@ void testWriteAttemptFailureSummary() throws IOException { } @Nested - @DisplayName("Stats Related Tests") - class Stats { + @DisplayName("Test writing in progress stats") + class WriteStats { @Test @DisplayName("Writing stats the first time should only write record and bytes information correctly") @@ -457,69 +455,6 @@ void testWriteNullNamespace() throws IOException { assertEquals(streamStats, actStreamStats); } - @Test - @DisplayName("Writing multiple stats a stream with null namespace should write correctly without exceptions") - void testGetStatsNoResult() throws IOException { - final long jobId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); - final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); - - final AttemptStats stats = jobPersistence.getAttemptStats(jobId, attemptNumber); - assertNull(stats.combinedStats()); - assertEquals(0, stats.perStreamStats().size()); - - } - - @Test - @DisplayName("Retrieving all attempts stats for a job should return the right information") - void testGetMultipleStats() throws IOException { - final long jobOneId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); - final int jobOneAttemptNumberOne = jobPersistence.createAttempt(jobOneId, LOG_PATH); - - // First write for first attempt. - var streamStats = List.of( - new StreamSyncStats().withStreamName("name1") - .withStats(new SyncStats().withBytesEmitted(500L).withRecordsEmitted(500L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); - jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); - - // Second write for first attempt. This is the record that should be returned. - when(timeSupplier.get()).thenReturn(Instant.now()); - streamStats = List.of( - new StreamSyncStats().withStreamName("name1") - .withStats(new SyncStats().withBytesEmitted(1000L).withRecordsEmitted(1000L).withEstimatedBytes(10000L).withEstimatedRecords(2000L))); - jobPersistence.writeStats(jobOneId, jobOneAttemptNumberOne, 2000, 2000, 2000, 2000, streamStats); - jobPersistence.failAttempt(jobOneId, jobOneAttemptNumberOne); - - // Second attempt for first job. - final int jobOneAttemptNumberTwo = jobPersistence.createAttempt(jobOneId, LOG_PATH); - jobPersistence.writeStats(jobOneId, jobOneAttemptNumberTwo, 1000, 1000, 1000, 1000, streamStats); - - // First attempt for second job. - final long jobTwoId = jobPersistence.enqueueJob(SCOPE, SPEC_JOB_CONFIG).orElseThrow(); - final int jobTwoAttemptNumberOne = jobPersistence.createAttempt(jobTwoId, LOG_PATH); - jobPersistence.writeStats(jobTwoId, jobTwoAttemptNumberOne, 1000, 1000, 1000, 1000, streamStats); - - final var stats = jobPersistence.getAttemptStats(List.of(jobOneId, jobTwoId)); - final var exp = Map.of( - new JobAttemptPair(jobOneId, jobOneAttemptNumberOne), - new AttemptStats( - new SyncStats().withRecordsEmitted(2000L).withBytesEmitted(2000L).withEstimatedBytes(2000L).withEstimatedRecords(2000L), - List.of(new StreamSyncStats().withStreamName("name1").withStats( - new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), - new JobAttemptPair(jobOneId, jobOneAttemptNumberTwo), - new AttemptStats( - new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), - List.of(new StreamSyncStats().withStreamName("name1").withStats( - new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L)))), - new JobAttemptPair(jobTwoId, jobTwoAttemptNumberOne), - new AttemptStats( - new SyncStats().withRecordsEmitted(1000L).withBytesEmitted(1000L).withEstimatedBytes(1000L).withEstimatedRecords(1000L), - List.of(new StreamSyncStats().withStreamName("name1").withStats( - new SyncStats().withEstimatedBytes(10000L).withEstimatedRecords(2000L).withBytesEmitted(1000L).withRecordsEmitted(1000L))))); - - assertEquals(exp, stats); - - } - } @Test @@ -536,8 +471,8 @@ void testGetLastSyncJobWithMultipleAttempts() throws IOException { SYNC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -579,14 +514,15 @@ void testExportImport() throws IOException, SQLException { jobPersistence.importDatabase("test", outputStreams); final List actualList = jobPersistence.listJobs(SPEC_JOB_CONFIG.getConfigType(), CONNECTION_ID.toString(), 9999, 0); + final Job actual = actualList.get(0); final Job expected = createJob( jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -621,8 +557,8 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(jobs.size(), 1); assertEquals(jobs.get(0).getId(), syncJobId); assertEquals(jobs.get(0).getAttempts().size(), 2); - assertEquals(jobs.get(0).getAttempts().get(0).getAttemptNumber(), 0); - assertEquals(jobs.get(0).getAttempts().get(1).getAttemptNumber(), 1); + assertEquals(jobs.get(0).getAttempts().get(0).getId(), 0); + assertEquals(jobs.get(0).getAttempts().get(1).getId(), 1); final Path syncJobThirdAttemptLogPath = LOG_PATH.resolve("3"); final int syncJobAttemptNumber2 = jobPersistence.createAttempt(syncJobId, syncJobThirdAttemptLogPath); @@ -642,12 +578,12 @@ void testListJobsWithTimestamp() throws IOException { assertEquals(secondQueryJobs.size(), 2); assertEquals(secondQueryJobs.get(0).getId(), syncJobId); assertEquals(secondQueryJobs.get(0).getAttempts().size(), 1); - assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getAttemptNumber(), 2); + assertEquals(secondQueryJobs.get(0).getAttempts().get(0).getId(), 2); assertEquals(secondQueryJobs.get(1).getId(), newSyncJobId); assertEquals(secondQueryJobs.get(1).getAttempts().size(), 2); - assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getAttemptNumber(), 0); - assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getAttemptNumber(), 1); + assertEquals(secondQueryJobs.get(1).getAttempts().get(0).getId(), 0); + assertEquals(secondQueryJobs.get(1).getAttempts().get(1).getId(), 1); Long maxEndedAtTimestampAfterSecondQuery = -1L; for (final Job c : secondQueryJobs) { @@ -692,35 +628,35 @@ void testListAttemptsWithJobInfo() throws IOException { assertEquals(6, allAttempts.size()); assertEquals(job1, allAttempts.get(0).getJobInfo().getId()); - assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getAttemptNumber()); + assertEquals(job1Attempt1, allAttempts.get(0).getAttempt().getId()); assertEquals(job2, allAttempts.get(1).getJobInfo().getId()); - assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getAttemptNumber()); + assertEquals(job2Attempt1, allAttempts.get(1).getAttempt().getId()); assertEquals(job2, allAttempts.get(2).getJobInfo().getId()); - assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getAttemptNumber()); + assertEquals(job2Attempt2, allAttempts.get(2).getAttempt().getId()); assertEquals(job1, allAttempts.get(3).getJobInfo().getId()); - assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getAttemptNumber()); + assertEquals(job1Attempt2, allAttempts.get(3).getAttempt().getId()); assertEquals(job1, allAttempts.get(4).getJobInfo().getId()); - assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getAttemptNumber()); + assertEquals(job1Attempt3, allAttempts.get(4).getAttempt().getId()); assertEquals(job2, allAttempts.get(5).getJobInfo().getId()); - assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getAttemptNumber()); + assertEquals(job2Attempt3, allAttempts.get(5).getAttempt().getId()); final List attemptsAfterTimestamp = jobPersistence.listAttemptsWithJobInfo(ConfigType.SYNC, Instant.ofEpochSecond(allAttempts.get(2).getAttempt().getEndedAtInSecond().orElseThrow())); assertEquals(3, attemptsAfterTimestamp.size()); assertEquals(job1, attemptsAfterTimestamp.get(0).getJobInfo().getId()); - assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getAttemptNumber()); + assertEquals(job1Attempt2, attemptsAfterTimestamp.get(0).getAttempt().getId()); assertEquals(job1, attemptsAfterTimestamp.get(1).getJobInfo().getId()); - assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getAttemptNumber()); + assertEquals(job1Attempt3, attemptsAfterTimestamp.get(1).getAttempt().getId()); assertEquals(job2, attemptsAfterTimestamp.get(2).getJobInfo().getId()); - assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getAttemptNumber()); + assertEquals(job2Attempt3, attemptsAfterTimestamp.get(2).getAttempt().getId()); } private static Supplier incrementingSecondSupplier(final Instant startTime) { @@ -951,7 +887,7 @@ void testCreateAttempt() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -965,12 +901,12 @@ void testCreateAttemptAttemptId() throws IOException { final Job jobAfterOneAttempts = jobPersistence.getJob(jobId); assertEquals(0, attemptNumber1); - assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getAttemptNumber()); + assertEquals(0, jobAfterOneAttempts.getAttempts().get(0).getId()); final int attemptNumber2 = jobPersistence.createAttempt(jobId, LOG_PATH); final Job jobAfterTwoAttempts = jobPersistence.getJob(jobId); assertEquals(1, attemptNumber2); - assertEquals(Sets.newHashSet(0, 1), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getAttemptNumber).collect(Collectors.toSet())); + assertEquals(Sets.newHashSet(0L, 1L), jobAfterTwoAttempts.getAttempts().stream().map(Attempt::getId).collect(Collectors.toSet())); } @Test @@ -986,7 +922,7 @@ void testCreateAttemptWhileAttemptAlreadyRunning() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.RUNNING, - Lists.newArrayList(createUnfinishedAttempt(0, jobId, AttemptStatus.RUNNING, LOG_PATH)), + Lists.newArrayList(createUnfinishedAttempt(0L, jobId, AttemptStatus.RUNNING, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -1005,7 +941,7 @@ void testCreateAttemptTerminal() throws IOException { jobId, SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0L, jobId, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(expected, actual); } @@ -1400,8 +1336,8 @@ void testGetNextJobWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList( - createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1L, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(Optional.of(expected), actual); @@ -1626,8 +1562,8 @@ void testListJobsWithMultipleAttempts() throws IOException { SPEC_JOB_CONFIG, JobStatus.SUCCEEDED, Lists.newArrayList( - createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH), - createAttempt(1, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), + createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH), + createAttempt(1L, jobId, AttemptStatus.SUCCEEDED, secondAttemptLogPath)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1746,7 +1682,7 @@ void testListJobsWithStatus() throws IOException { SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0, jobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0L, jobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond()); assertEquals(1, actualList.size()); @@ -1784,7 +1720,7 @@ void testListJobsWithStatusAndConfigType() throws IOException, InterruptedExcept SPEC_JOB_CONFIG, JobStatus.INCOMPLETE, Lists.newArrayList( - createAttempt(0, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), + createAttempt(0L, failedSpecJobId, AttemptStatus.FAILED, LOG_PATH)), NOW.getEpochSecond(), SPEC_SCOPE); @@ -1821,12 +1757,12 @@ void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, I Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED)); final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob2 = createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED, - Lists.newArrayList(createAttempt(0, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), + Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)), NOW.getEpochSecond(), desiredConnectionId.toString()); final Job expectedDesiredJob4 = createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString()); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java index badc1ac68d70..0913a29ca734 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/AttemptTest.java @@ -19,7 +19,7 @@ void testIsAttemptInTerminalState() { } private static Attempt attemptWithStatus(final AttemptStatus attemptStatus) { - return new Attempt(1, 1L, null, null, attemptStatus, null, 0L, 0L, null); + return new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null); } } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java index 4cdb4f15403d..3e10fa003d36 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/models/JobTest.java @@ -68,7 +68,7 @@ void testGetLastFailedAttempt() { final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED); assertTrue(job.getLastFailedAttempt().isPresent()); - assertEquals(2, job.getLastFailedAttempt().get().getAttemptNumber()); + assertEquals(2, job.getLastFailedAttempt().get().getId()); } @Test diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index f478e85b90c0..52c28f3640f1 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -137,7 +137,7 @@ public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) { public static AttemptRead getAttemptRead(final Attempt attempt) { return new AttemptRead() - .id((long) attempt.getAttemptNumber()) + .id(attempt.getId()) .status(Enums.convertTo(attempt.getStatus(), AttemptStatus.class)) .bytesSynced(attempt.getOutput() // TODO (parker) remove after frontend switches to totalStats .map(JobOutput::getSync) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index 752cb2e446ce..f61476a18600 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -5,11 +5,7 @@ package io.airbyte.server.handlers; import com.google.common.base.Preconditions; -import io.airbyte.api.model.generated.AttemptInfoRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; -import io.airbyte.api.model.generated.AttemptRead; -import io.airbyte.api.model.generated.AttemptStats; -import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.model.generated.DestinationDefinitionRead; @@ -37,7 +33,6 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Job; import io.airbyte.persistence.job.models.JobStatus; import io.airbyte.server.converters.JobConverter; @@ -46,14 +41,11 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; -@Slf4j public class JobHistoryHandler { private final ConnectionsHandler connectionsHandler; @@ -126,59 +118,14 @@ public JobReadList listJobsFor(final JobListRequestBody request) throws IOExcept (request.getPagination() != null && request.getPagination().getRowOffset() != null) ? request.getPagination().getRowOffset() : 0); } - final List jobReads = jobs.stream().map(JobConverter::getJobWithAttemptsRead).collect(Collectors.toList()); - final var jobIds = jobReads.stream().map(r -> r.getJob().getId()).toList(); - final Map stats = jobPersistence.getAttemptStats(jobIds); - for (final JobWithAttemptsRead jwar : jobReads) { - for (final AttemptRead a : jwar.getAttempts()) { - final var stat = stats.get(new JobAttemptPair(jwar.getJob().getId(), a.getId().intValue())); - if (stat == null) { - log.error("Missing stats for job {} attempt {}", jwar.getJob().getId(), a.getId().intValue()); - continue; - } - - hydrateWithStats(a, stat); - } - } - final Long totalJobCount = jobPersistence.getJobCount(configTypes, configId); - return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); - } - - /** - * Retrieve stats for a given job id and attempt number and hydrate the api model with the retrieved - * information. - * - * @param jobId the job the attempt belongs to. Used as an index to retrieve stats. - * @param a the attempt to hydrate stats for. - * @throws IOException - */ - private void hydrateWithStats(final AttemptRead a, final JobPersistence.AttemptStats attemptStats) throws IOException { - a.setTotalStats(new AttemptStats()); - - final var combinedStats = attemptStats.combinedStats(); - if (combinedStats == null) { - // If overall stats are missing, assume stream stats are also missing, since overall stats are - // easier to produce than stream stats. Exit early. - return; - } - - a.getTotalStats() - .estimatedBytes(combinedStats.getEstimatedBytes()) - .estimatedRecords(combinedStats.getEstimatedRecords()) - .bytesEmitted(combinedStats.getBytesEmitted()) - .recordsEmitted(combinedStats.getRecordsEmitted()); - final var streamStats = attemptStats.perStreamStats().stream().map(s -> new AttemptStreamStats() - .streamName(s.getStreamName()) - .streamNamespace(s.getStreamNamespace()) - .stats(new AttemptStats() - .bytesEmitted(s.getStats().getBytesEmitted()) - .recordsEmitted(s.getStats().getRecordsEmitted()) - .estimatedBytes(s.getStats().getEstimatedBytes()) - .estimatedRecords(s.getStats().getEstimatedRecords()))) + final List jobReads = jobs + .stream() + .map(JobConverter::getJobWithAttemptsRead) .collect(Collectors.toList()); - a.setStreamStats(streamStats); + + return new JobReadList().jobs(jobReads).totalJobCount(totalJobCount); } public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IOException { @@ -196,12 +143,6 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) final Job job = jobPersistence.getJob(jobIdRequestBody.getId()); final JobInfoRead jobinfoRead = jobConverter.getJobInfoRead(job); - for (final AttemptInfoRead a : jobinfoRead.getAttempts()) { - final int attemptNumber = a.getAttempt().getId().intValue(); - final var attemptStats = jobPersistence.getAttemptStats(job.getId(), attemptNumber); - hydrateWithStats(a.getAttempt(), attemptStats); - } - final JobDebugInfoRead jobDebugInfoRead = buildJobDebugInfoRead(jobinfoRead); if (temporalClient != null) { final UUID connectionId = UUID.fromString(job.getScope()); diff --git a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java index f73ec6e62aad..fe1d084d92ce 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/converters/JobConverterTest.java @@ -68,7 +68,7 @@ class JobConverterTest { private static final long JOB_ID = 100L; - private static final Integer ATTEMPT_NUMBER = 0; + private static final long ATTEMPT_ID = 1002L; private static final String JOB_CONFIG_ID = "123"; private static final JobStatus JOB_STATUS = JobStatus.RUNNING; private static final AttemptStatus ATTEMPT_STATUS = AttemptStatus.RUNNING; @@ -124,7 +124,7 @@ class JobConverterTest { .updatedAt(CREATED_AT)) .attempts(Lists.newArrayList(new AttemptInfoRead() .attempt(new AttemptRead() - .id((long) ATTEMPT_NUMBER) + .id(ATTEMPT_ID) .status(io.airbyte.api.model.generated.AttemptStatus.RUNNING) .recordsSynced(RECORDS_EMITTED) .bytesSynced(BYTES_EMITTED) @@ -195,7 +195,7 @@ public void setUp() { when(job.getCreatedAtInSecond()).thenReturn(CREATED_AT); when(job.getUpdatedAtInSecond()).thenReturn(CREATED_AT); when(job.getAttempts()).thenReturn(Lists.newArrayList(attempt)); - when(attempt.getAttemptNumber()).thenReturn(ATTEMPT_NUMBER); + when(attempt.getId()).thenReturn(ATTEMPT_ID); when(attempt.getStatus()).thenReturn(ATTEMPT_STATUS); when(attempt.getOutput()).thenReturn(Optional.of(JOB_OUTPUT)); when(attempt.getLogPath()).thenReturn(LOG_PATH); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 105e2fbb9c17..cb68ad2b8205 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -6,8 +6,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -16,7 +14,6 @@ import io.airbyte.api.model.generated.AttemptNormalizationStatusRead; import io.airbyte.api.model.generated.AttemptNormalizationStatusReadList; import io.airbyte.api.model.generated.AttemptRead; -import io.airbyte.api.model.generated.AttemptStreamStats; import io.airbyte.api.model.generated.ConnectionRead; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; @@ -45,13 +42,9 @@ import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; -import io.airbyte.config.StreamSyncStats; -import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.JobPersistence.AttemptStats; -import io.airbyte.persistence.job.JobPersistence.JobAttemptPair; import io.airbyte.persistence.job.models.Attempt; import io.airbyte.persistence.job.models.AttemptNormalizationStatus; import io.airbyte.persistence.job.models.AttemptStatus; @@ -70,7 +63,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.UUID; @@ -85,7 +77,7 @@ class JobHistoryHandlerTest { private static final long JOB_ID = 100L; - private static final int ATTEMPT_NUMBER = 0; + private static final long ATTEMPT_ID = 1002L; private static final String JOB_CONFIG_ID = "ef296385-6796-413f-ac1b-49c4caba3f2b"; private static final JobStatus JOB_STATUS = JobStatus.SUCCEEDED; private static final JobConfig.ConfigType CONFIG_TYPE = JobConfig.ConfigType.CHECK_CONNECTION_SOURCE; @@ -97,24 +89,17 @@ class JobHistoryHandlerTest { private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>()); private static final long CREATED_AT = System.currentTimeMillis() / 1000; - private static final AttemptStats ATTEMPT_STATS = new AttemptStats(new SyncStats().withBytesEmitted(10L).withRecordsEmitted(10L), - List.of( - new StreamSyncStats().withStreamNamespace("ns1").withStreamName("stream1") - .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)), - new StreamSyncStats().withStreamName("stream2") - .withStats(new SyncStats().withRecordsEmitted(5L).withBytesEmitted(5L)))); - - private static final io.airbyte.api.model.generated.AttemptStats ATTEMPT_STATS_API = new io.airbyte.api.model.generated.AttemptStats() - .bytesEmitted(10L).recordsEmitted(10L); - - private static final List ATTEMPT_STREAM_STATS = List.of( - new AttemptStreamStats().streamNamespace("ns1").streamName("stream1") - .stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L)), - new AttemptStreamStats().streamName("stream2").stats(new io.airbyte.api.model.generated.AttemptStats().recordsEmitted(5L).bytesEmitted(5L))); - + private SourceRead sourceRead; + private ConnectionRead connectionRead; + private DestinationRead destinationRead; private ConnectionsHandler connectionsHandler; private SourceHandler sourceHandler; private DestinationHandler destinationHandler; + private SourceDefinitionsHandler sourceDefinitionsHandler; + private DestinationDefinitionsHandler destinationDefinitionsHandler; + private StandardDestinationDefinition standardDestinationDefinition; + private StandardSourceDefinition standardSourceDefinition; + private AirbyteVersion airbyteVersion; private Job testJob; private Attempt testJobAttempt; private JobPersistence jobPersistence; @@ -149,7 +134,7 @@ private static List toAttemptInfoList(final List attem private static AttemptRead toAttemptRead(final Attempt a) { return new AttemptRead() - .id((long) a.getAttemptNumber()) + .id(a.getId()) .status(Enums.convertTo(a.getStatus(), io.airbyte.api.model.generated.AttemptStatus.class)) .createdAt(a.getCreatedAtInSecond()) .updatedAt(a.getUpdatedAtInSecond()) @@ -157,22 +142,22 @@ private static AttemptRead toAttemptRead(final Attempt a) { } private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { - return new Attempt(ATTEMPT_NUMBER, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); + return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); } @BeforeEach - void setUp() { + void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { testJobAttempt = createAttempt(JOB_ID, CREATED_AT, AttemptStatus.SUCCEEDED); testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT, CREATED_AT); connectionsHandler = mock(ConnectionsHandler.class); sourceHandler = mock(SourceHandler.class); + sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); destinationHandler = mock(DestinationHandler.class); + destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); + airbyteVersion = mock(AirbyteVersion.class); jobPersistence = mock(JobPersistence.class); - final SourceDefinitionsHandler sourceDefinitionsHandler = mock(SourceDefinitionsHandler.class); - final DestinationDefinitionsHandler destinationDefinitionsHandler = mock(DestinationDefinitionsHandler.class); - final AirbyteVersion airbyteVersion = mock(AirbyteVersion.class); jobHistoryHandler = new JobHistoryHandler(jobPersistence, WorkerEnvironment.DOCKER, LogConfigs.EMPTY, connectionsHandler, sourceHandler, sourceDefinitionsHandler, destinationHandler, destinationDefinitionsHandler, airbyteVersion); } @@ -197,9 +182,6 @@ void testListJobs() throws IOException { when(jobPersistence.listJobs(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, pagesize, rowOffset)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); - when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( - new JobAttemptPair(100, 0), ATTEMPT_STATS, - new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -207,8 +189,8 @@ void testListJobs() throws IOException { .pagination(new Pagination().pageSize(pagesize).rowOffset(rowOffset)); final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); - final var expAttemptRead = toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS); - final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(expAttemptRead)); + final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( + testJobAttempt))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); @@ -241,10 +223,6 @@ void testListJobsFor() throws IOException { when(jobPersistence.listJobs(configTypes, JOB_CONFIG_ID, pagesize, rowOffset)).thenReturn(List.of(latestJob, secondJob, firstJob)); when(jobPersistence.getJobCount(configTypes, JOB_CONFIG_ID)).thenReturn(3L); - when(jobPersistence.getAttemptStats(List.of(300L, 200L, 100L))).thenReturn(Map.of( - new JobAttemptPair(100, 0), ATTEMPT_STATS, - new JobAttemptPair(secondJobId, 0), ATTEMPT_STATS, - new JobAttemptPair(latestJobId, 0), ATTEMPT_STATS)); final JobListRequestBody requestBody = new JobListRequestBody() .configTypes(List.of(CONFIG_TYPE_FOR_API, JobConfigType.SYNC, JobConfigType.DISCOVER_SCHEMA)) @@ -253,11 +231,9 @@ void testListJobsFor() throws IOException { final JobReadList jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var firstJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(firstJob)) - .attempts(ImmutableList.of(toAttemptRead(testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); + new JobWithAttemptsRead().job(toJobInfo(firstJob)).attempts(ImmutableList.of(toAttemptRead(testJobAttempt))); final var secondJobWithAttemptRead = - new JobWithAttemptsRead().job(toJobInfo(secondJob)) - .attempts(ImmutableList.of(toAttemptRead(secondJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); + new JobWithAttemptsRead().job(toJobInfo(secondJob)).attempts(ImmutableList.of(toAttemptRead(secondJobAttempt))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJob)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, secondJobWithAttemptRead, firstJobWithAttemptRead)).totalJobCount(3L); @@ -281,9 +257,6 @@ void testListJobsIncludingJobId() throws IOException { when(jobPersistence.listJobsIncludingId(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID, jobId2, pagesize)) .thenReturn(List.of(latestJobNoAttempt, successfulJob)); when(jobPersistence.getJobCount(Set.of(Enums.convertTo(CONFIG_TYPE_FOR_API, ConfigType.class)), JOB_CONFIG_ID)).thenReturn(2L); - when(jobPersistence.getAttemptStats(List.of(200L, 100L))).thenReturn(Map.of( - new JobAttemptPair(100, 0), ATTEMPT_STATS, - new JobAttemptPair(jobId2, 0), ATTEMPT_STATS)); final var requestBody = new JobListRequestBody() .configTypes(Collections.singletonList(CONFIG_TYPE_FOR_API)) @@ -293,7 +266,7 @@ void testListJobsIncludingJobId() throws IOException { final var jobReadList = jobHistoryHandler.listJobsFor(requestBody); final var successfulJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(successfulJob)).attempts(ImmutableList.of(toAttemptRead( - testJobAttempt).totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS))); + testJobAttempt))); final var latestJobWithAttemptRead = new JobWithAttemptsRead().job(toJobInfo(latestJobNoAttempt)).attempts(Collections.emptyList()); final JobReadList expectedJobReadList = new JobReadList().jobs(List.of(latestJobWithAttemptRead, successfulJobWithAttemptRead)).totalJobCount(2L); @@ -332,16 +305,16 @@ void testGetJobInfoLight() throws IOException { @Test @DisplayName("Should return the right info to debug this job") void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNotFoundException, URISyntaxException { - final StandardSourceDefinition standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); + standardSourceDefinition = SourceDefinitionHelpers.generateSourceDefinition(); final SourceConnection source = SourceHelpers.generateSource(UUID.randomUUID()); - final SourceRead sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); + sourceRead = SourceHelpers.getSourceRead(source, standardSourceDefinition); - final StandardDestinationDefinition standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); + standardDestinationDefinition = DestinationDefinitionHelpers.generateDestination(); final DestinationConnection destination = DestinationHelpers.generateDestination(UUID.randomUUID()); - final DestinationRead destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); + destinationRead = DestinationHelpers.getDestinationRead(destination, standardDestinationDefinition); final StandardSync standardSync = ConnectionHelpers.generateSyncWithSourceId(source.getSourceId()); - final ConnectionRead connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); + connectionRead = ConnectionHelpers.generateExpectedConnectionRead(standardSync); when(connectionsHandler.getConnection(UUID.fromString(testJob.getScope()))).thenReturn(connectionRead); final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody(); @@ -352,13 +325,10 @@ void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNo destinationIdRequestBody.setDestinationId(connectionRead.getDestinationId()); when(destinationHandler.getDestination(destinationIdRequestBody)).thenReturn(destinationRead); when(jobPersistence.getJob(JOB_ID)).thenReturn(testJob); - when(jobPersistence.getAttemptStats(anyLong(), anyInt())).thenReturn(ATTEMPT_STATS); final JobIdRequestBody requestBody = new JobIdRequestBody().id(JOB_ID); final JobDebugInfoRead jobDebugInfoActual = jobHistoryHandler.getJobDebugInfo(requestBody); - final List attemptInfoReads = toAttemptInfoList(ImmutableList.of(testJobAttempt)); - attemptInfoReads.forEach(read -> read.getAttempt().totalStats(ATTEMPT_STATS_API).streamStats(ATTEMPT_STREAM_STATS)); - final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(attemptInfoReads); + final JobDebugInfoRead exp = new JobDebugInfoRead().job(toDebugJobInfo(testJob)).attempts(toAttemptInfoList(ImmutableList.of(testJobAttempt))); assertEquals(exp, jobDebugInfoActual); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 37a9f7647629..39b089f8fef2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -431,7 +431,7 @@ private boolean checkActiveJobPreviousAttempt(final Job activeJob, final int att if (activeJob.getAttempts().size() > minAttemptSize) { final Optional optionalAttempt = activeJob.getAttempts().stream() - .filter(attempt -> attempt.getAttemptNumber() == (attemptId - 1)).findFirst(); + .filter(attempt -> attempt.getId() == (attemptId - 1)).findFirst(); result = optionalAttempt.isPresent() && optionalAttempt.get().getStatus().equals(FAILED); } @@ -451,7 +451,8 @@ private void failNonTerminalJobs(final UUID connectionId) { continue; } - final int attemptNumber = attempt.getAttemptNumber(); + // the Attempt object 'id' is actually the value of the attempt_number column in the db + final int attemptNumber = (int) attempt.getId(); log.info("Failing non-terminal attempt {} for non-terminal job {}", attemptNumber, jobId); jobPersistence.failAttempt(jobId, attemptNumber); jobPersistence.writeAttemptFailureSummary(jobId, attemptNumber,