From 07c5f13d5a6438e2b747807fe5c6c6ff9d19b7af Mon Sep 17 00:00:00 2001 From: Parker Mossman Date: Fri, 16 Sep 2022 11:16:11 -0700 Subject: [PATCH] rewrite buildWebBackendConnectionRead to avoid fetching all jobs (#16811) * rewrite buildWebBackendConnectionRead to avoid fetching all jobs * format * query for latest sync job in addition to latest running sync job * fix query * add test coverage for new methods * simplify optional logic and add another test case --- .../persistence/DefaultJobPersistence.java | 14 ++++ .../scheduler/persistence/JobPersistence.java | 2 + .../DefaultJobPersistenceTest.java | 74 +++++++++++++++++- .../server/converters/JobConverter.java | 2 +- .../server/handlers/JobHistoryHandler.java | 20 +++++ .../WebBackendConnectionsHandler.java | 45 +++-------- .../handlers/JobHistoryHandlerTest.java | 78 ++++++++++++++++++- .../WebBackendConnectionsHandlerTest.java | 9 +-- 8 files changed, 195 insertions(+), 49 deletions(-) diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 859b5a0d5eb9..6d8453a3e313 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -561,6 +561,20 @@ public Optional getLastReplicationJob(final UUID connectionId) throws IOExc .flatMap(r -> getJobOptional(ctx, r.get(JOB_ID, Long.class)))); } + @Override + public Optional getLastSyncJob(final UUID connectionId) throws IOException { + return jobDatabase.query(ctx -> ctx + .fetch(BASE_JOB_SELECT_AND_JOIN + WHERE + + "CAST(jobs.config_type AS VARCHAR) = ? " + AND + + "scope = ? " + + "ORDER BY jobs.created_at DESC LIMIT 1", + Sqls.toSqlName(ConfigType.SYNC), + connectionId.toString()) + .stream() + .findFirst() + .flatMap(r -> getJobOptional(ctx, r.get(JOB_ID, Long.class)))); + } + @Override public Optional getFirstReplicationJob(final UUID connectionId) throws IOException { return jobDatabase.query(ctx -> ctx diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 66ec976baba8..b04539066bc4 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -207,6 +207,8 @@ List listJobStatusAndTimestampWithConnection(UUID con Optional getLastReplicationJob(UUID connectionId) throws IOException; + Optional getLastSyncJob(UUID connectionId) throws IOException; + Optional getFirstReplicationJob(UUID connectionId) throws IOException; Optional getNextJob() throws IOException; diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index dbf60c5af6cc..04d467e0aeeb 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -106,6 +106,10 @@ class DefaultJobPersistenceTest { .withConfigType(ConfigType.SYNC) .withSync(new JobSyncConfig()); + private static final JobConfig RESET_JOB_CONFIG = new JobConfig() + .withConfigType(ConfigType.RESET_CONNECTION) + .withSync(new JobSyncConfig()); + private static final int DEFAULT_MINIMUM_AGE_IN_DAYS = 30; private static final int DEFAULT_EXCESSIVE_NUMBER_OF_JOBS = 500; private static final int DEFAULT_MINIMUM_RECENCY_COUNT = 10; @@ -853,14 +857,14 @@ class GetLastReplicationJob { @Test @DisplayName("Should return nothing if no job exists") - void testGetLastSyncJobForConnectionIdEmpty() throws IOException { + void testGetLastReplicationJobForConnectionIdEmpty() throws IOException { final Optional actual = jobPersistence.getLastReplicationJob(CONNECTION_ID); assertTrue(actual.isEmpty()); } @Test - @DisplayName("Should return the last enqueued job") + @DisplayName("Should return the last sync job") void testGetLastSyncJobForConnectionId() throws IOException { final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH)); @@ -875,6 +879,72 @@ void testGetLastSyncJobForConnectionId() throws IOException { assertEquals(Optional.of(expected), actual); } + @Test + @DisplayName("Should return the last reset job") + void testGetLastResetJobForConnectionId() throws IOException { + final long jobId1 = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH)); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + final long jobId2 = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow(); + + final Optional actual = jobPersistence.getLastReplicationJob(CONNECTION_ID); + final Job expected = createJob(jobId2, RESET_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), afterNow.getEpochSecond()); + + assertEquals(Optional.of(expected), actual); + } + + } + + @Nested + @DisplayName("When getting last sync job") + class GetLastSyncJob { + + @Test + @DisplayName("Should return nothing if no job exists") + void testGetLastSyncJobForConnectionIdEmpty() throws IOException { + final Optional actual = jobPersistence.getLastSyncJob(CONNECTION_ID); + + assertTrue(actual.isEmpty()); + } + + @Test + @DisplayName("Should return the last enqueued sync job") + void testGetLastSyncJobForConnectionId() throws IOException { + final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH)); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); + final int attemptNumber = jobPersistence.createAttempt(jobId2, LOG_PATH); + + // Return the latest sync job even if failed + jobPersistence.failAttempt(jobId2, attemptNumber); + final Attempt attempt = jobPersistence.getJob(jobId2).getAttempts().stream().findFirst().orElseThrow(); + jobPersistence.failJob(jobId2); + + final Optional actual = jobPersistence.getLastSyncJob(CONNECTION_ID); + final Job expected = createJob(jobId2, SYNC_JOB_CONFIG, JobStatus.FAILED, List.of(attempt), afterNow.getEpochSecond()); + + assertEquals(Optional.of(expected), actual); + } + + @Test + @DisplayName("Should return nothing if only reset job exists") + void testGetLastSyncJobForConnectionIdEmptyBecauseOnlyReset() throws IOException { + final long jobId = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow(); + jobPersistence.succeedAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH)); + + final Instant afterNow = NOW.plusSeconds(1000); + when(timeSupplier.get()).thenReturn(afterNow); + + final Optional actual = jobPersistence.getLastSyncJob(CONNECTION_ID); + + assertTrue(actual.isEmpty()); + } + } @Nested diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index a7577dcc9264..20867983d6aa 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -90,7 +90,7 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) { .attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList()); } - private static JobRead getJobRead(final Job job) { + public static JobRead getJobRead(final Job job) { final String configId = job.getScope(); final JobConfigType configType = Enums.convertTo(job.getConfigType(), JobConfigType.class); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java index e0acc8c1cb25..6156e522440c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/JobHistoryHandler.java @@ -16,6 +16,7 @@ import io.airbyte.api.model.generated.JobInfoLightRead; import io.airbyte.api.model.generated.JobInfoRead; import io.airbyte.api.model.generated.JobListRequestBody; +import io.airbyte.api.model.generated.JobRead; import io.airbyte.api.model.generated.JobReadList; import io.airbyte.api.model.generated.JobWithAttemptsRead; import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody; @@ -30,11 +31,14 @@ import io.airbyte.config.helpers.LogConfigs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.scheduler.models.Job; +import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.server.converters.JobConverter; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; @@ -120,6 +124,22 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody) return buildJobDebugInfoRead(jobinfoRead); } + public Optional getLatestRunningSyncJob(final UUID connectionId) throws IOException { + final List nonTerminalSyncJobsForConnection = jobPersistence.listJobsForConnectionWithStatuses( + connectionId, + Collections.singleton(ConfigType.SYNC), + JobStatus.NON_TERMINAL_STATUSES); + + // there *should* only be a single running sync job for a connection, but + // jobPersistence.listJobsForConnectionWithStatuses orders by created_at desc so + // .findFirst will always return what we want. + return nonTerminalSyncJobsForConnection.stream().map(JobConverter::getJobRead).findFirst(); + } + + public Optional getLatestSyncJob(final UUID connectionId) throws IOException { + return jobPersistence.getLastSyncJob(connectionId).map(JobConverter::getJobRead); + } + private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException { final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId()); return sourceHandler.getSource(sourceIdRequestBody); diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java index 2c5fcf56a7e6..bba7153b209a 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/WebBackendConnectionsHandler.java @@ -8,7 +8,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import io.airbyte.api.model.generated.AirbyteCatalog; import io.airbyte.api.model.generated.AirbyteStream; import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration; @@ -22,12 +21,7 @@ import io.airbyte.api.model.generated.ConnectionUpdate; import io.airbyte.api.model.generated.DestinationIdRequestBody; import io.airbyte.api.model.generated.DestinationRead; -import io.airbyte.api.model.generated.JobConfigType; -import io.airbyte.api.model.generated.JobListRequestBody; import io.airbyte.api.model.generated.JobRead; -import io.airbyte.api.model.generated.JobReadList; -import io.airbyte.api.model.generated.JobStatus; -import io.airbyte.api.model.generated.JobWithAttemptsRead; import io.airbyte.api.model.generated.OperationCreate; import io.airbyte.api.model.generated.OperationReadList; import io.airbyte.api.model.generated.OperationUpdate; @@ -74,8 +68,6 @@ @Slf4j public class WebBackendConnectionsHandler { - private static final Set TERMINAL_STATUSES = Sets.newHashSet(JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.CANCELLED); - private final ConnectionsHandler connectionsHandler; private final StateHandler stateHandler; private final SourceHandler sourceHandler; @@ -128,20 +120,20 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR final SourceRead source = getSourceRead(connectionRead); final DestinationRead destination = getDestinationRead(connectionRead); final OperationReadList operations = getOperationReadList(connectionRead); - final JobReadList syncJobReadList = getSyncJobs(connectionRead); + final Optional latestSyncJob = jobHistoryHandler.getLatestSyncJob(connectionRead.getConnectionId()); + final Optional latestRunningSyncJob = jobHistoryHandler.getLatestRunningSyncJob(connectionRead.getConnectionId()); final WebBackendConnectionRead webBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations) - .catalogId(connectionRead.getSourceCatalogId()) - .isSyncing(syncJobReadList.getJobs() - .stream() - .map(JobWithAttemptsRead::getJob) - .anyMatch(WebBackendConnectionsHandler::isRunningJob)); - setLatestSyncJobProperties(webBackendConnectionRead, syncJobReadList); - return webBackendConnectionRead; - } + .catalogId(connectionRead.getSourceCatalogId()); + + webBackendConnectionRead.setIsSyncing(latestRunningSyncJob.isPresent()); + + latestSyncJob.ifPresent(job -> { + webBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt()); + webBackendConnectionRead.setLatestSyncJobStatus(job.getStatus()); + }); - private static boolean isRunningJob(final JobRead job) { - return !TERMINAL_STATUSES.contains(job.getStatus()); + return webBackendConnectionRead; } private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException { @@ -185,21 +177,6 @@ private static WebBackendConnectionRead getWebBackendConnectionRead(final Connec .resourceRequirements(connectionRead.getResourceRequirements()); } - private JobReadList getSyncJobs(final ConnectionRead connectionRead) throws IOException { - final JobListRequestBody jobListRequestBody = new JobListRequestBody() - .configId(connectionRead.getConnectionId().toString()) - .configTypes(Collections.singletonList(JobConfigType.SYNC)); - return jobHistoryHandler.listJobsFor(jobListRequestBody); - } - - private static void setLatestSyncJobProperties(final WebBackendConnectionRead WebBackendConnectionRead, final JobReadList syncJobReadList) { - syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst() - .ifPresent(job -> { - WebBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt()); - WebBackendConnectionRead.setLatestSyncJobStatus(job.getStatus()); - }); - } - public WebBackendConnectionReadList webBackendSearchConnections(final WebBackendConnectionSearch webBackendConnectionSearch) throws ConfigNotFoundException, IOException, JsonValidationException { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java index 73678b7a960a..6b89a27b874e 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/JobHistoryHandlerTest.java @@ -23,6 +23,7 @@ import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.server.converters.JobConverter; import io.airbyte.server.helpers.ConnectionHelpers; import io.airbyte.server.helpers.DestinationDefinitionHelpers; import io.airbyte.server.helpers.DestinationHelpers; @@ -108,13 +109,13 @@ private static AttemptRead toAttemptRead(final Attempt a) { .endedAt(a.getEndedAtInSecond().orElse(null)); } - private static Attempt createSuccessfulAttempt(final long jobId, final long timestamps) { - return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, null, timestamps, timestamps, timestamps); + private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) { + return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps); } @BeforeEach void setUp() throws IOException, JsonValidationException, ConfigNotFoundException { - testJobAttempt = createSuccessfulAttempt(JOB_ID, CREATED_AT); + testJobAttempt = createAttempt(JOB_ID, CREATED_AT, AttemptStatus.SUCCEEDED); testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT, CREATED_AT); @@ -174,7 +175,7 @@ void testListJobsFor() throws IOException { final var secondJobId = JOB_ID + 100; final var createdAt2 = CREATED_AT + 1000; - final var secondJobAttempt = createSuccessfulAttempt(secondJobId, createdAt2); + final var secondJobAttempt = createAttempt(secondJobId, createdAt2, AttemptStatus.SUCCEEDED); final var secondJob = new Job(secondJobId, ConfigType.DISCOVER_SCHEMA, JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(secondJobAttempt), JobStatus.SUCCEEDED, null, createdAt2, createdAt2); @@ -300,6 +301,75 @@ void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNo assertEquals(exp, jobDebugInfoActual); } + @Test + @DisplayName("Should return the latest running sync job") + void testGetLatestRunningSyncJob() throws IOException { + final var connectionId = UUID.randomUUID(); + + final var olderRunningJobId = JOB_ID + 100; + final var olderRunningCreatedAt = CREATED_AT + 1000; + final var olderRunningJobAttempt = createAttempt(olderRunningJobId, olderRunningCreatedAt, AttemptStatus.RUNNING); + final var olderRunningJob = new Job(olderRunningJobId, ConfigType.SYNC, JOB_CONFIG_ID, + JOB_CONFIG, ImmutableList.of(olderRunningJobAttempt), + JobStatus.RUNNING, null, olderRunningCreatedAt, olderRunningCreatedAt); + + // expect that we return the newer of the two running jobs. this should not happen in the real + // world but might as + // well test that we handle it properly. + final var newerRunningJobId = JOB_ID + 200; + final var newerRunningCreatedAt = CREATED_AT + 2000; + final var newerRunningJobAttempt = createAttempt(newerRunningJobId, newerRunningCreatedAt, AttemptStatus.RUNNING); + final var newerRunningJob = new Job(newerRunningJobId, ConfigType.SYNC, JOB_CONFIG_ID, + JOB_CONFIG, ImmutableList.of(newerRunningJobAttempt), + JobStatus.RUNNING, null, newerRunningCreatedAt, newerRunningCreatedAt); + + when(jobPersistence.listJobsForConnectionWithStatuses( + connectionId, + Collections.singleton(ConfigType.SYNC), + JobStatus.NON_TERMINAL_STATUSES)).thenReturn(List.of(newerRunningJob, olderRunningJob)); + + final Optional expectedJob = Optional.of(JobConverter.getJobRead(newerRunningJob)); + final Optional actualJob = jobHistoryHandler.getLatestRunningSyncJob(connectionId); + + assertEquals(expectedJob, actualJob); + } + + @Test + @DisplayName("Should return an empty optional if no running sync job") + void testGetLatestRunningSyncJobWhenNone() throws IOException { + final var connectionId = UUID.randomUUID(); + + when(jobPersistence.listJobsForConnectionWithStatuses( + connectionId, + Collections.singleton(ConfigType.SYNC), + JobStatus.NON_TERMINAL_STATUSES)).thenReturn(Collections.emptyList()); + + final Optional actual = jobHistoryHandler.getLatestRunningSyncJob(connectionId); + + assertTrue(actual.isEmpty()); + } + + @Test + @DisplayName("Should return the latest sync job") + void testGetLatestSyncJob() throws IOException { + final var connectionId = UUID.randomUUID(); + + // expect the newest job overall to be returned, even if it is failed + final var newerFailedJobId = JOB_ID + 200; + final var newerFailedCreatedAt = CREATED_AT + 2000; + final var newerFailedJobAttempt = createAttempt(newerFailedJobId, newerFailedCreatedAt, AttemptStatus.FAILED); + final var newerFailedJob = new Job(newerFailedJobId, ConfigType.SYNC, JOB_CONFIG_ID, + JOB_CONFIG, ImmutableList.of(newerFailedJobAttempt), + JobStatus.RUNNING, null, newerFailedCreatedAt, newerFailedCreatedAt); + + when(jobPersistence.getLastSyncJob(connectionId)).thenReturn(Optional.of(newerFailedJob)); + + final Optional expectedJob = Optional.of(JobConverter.getJobRead(newerFailedJob)); + final Optional actualJob = jobHistoryHandler.getLatestSyncJob(connectionId); + + assertEquals(expectedJob, actualJob); + } + @Test @DisplayName("Should have compatible config enums") void testEnumConversion() { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java index 758610d07b4a..cba2c0a1ff56 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/WebBackendConnectionsHandlerTest.java @@ -39,9 +39,7 @@ import io.airbyte.api.model.generated.DestinationSyncMode; import io.airbyte.api.model.generated.JobConfigType; import io.airbyte.api.model.generated.JobInfoRead; -import io.airbyte.api.model.generated.JobListRequestBody; import io.airbyte.api.model.generated.JobRead; -import io.airbyte.api.model.generated.JobReadList; import io.airbyte.api.model.generated.JobStatus; import io.airbyte.api.model.generated.JobWithAttemptsRead; import io.airbyte.api.model.generated.NamespaceDefinitionType; @@ -186,12 +184,7 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio .updatedAt(now.getEpochSecond()) .endedAt(now.getEpochSecond()))); - final JobReadList jobReadList = new JobReadList(); - jobReadList.setJobs(Collections.singletonList(jobRead)); - final JobListRequestBody jobListRequestBody = new JobListRequestBody(); - jobListRequestBody.setConfigTypes(Collections.singletonList(JobConfigType.SYNC)); - jobListRequestBody.setConfigId(connectionRead.getConnectionId().toString()); - when(jobHistoryHandler.listJobsFor(jobListRequestBody)).thenReturn(jobReadList); + when(jobHistoryHandler.getLatestSyncJob(connectionRead.getConnectionId())).thenReturn(Optional.of(jobRead.getJob())); expected = new WebBackendConnectionRead() .connectionId(connectionRead.getConnectionId())