Skip to content

Commit

Permalink
rewrite buildWebBackendConnectionRead to avoid fetching all jobs (air…
Browse files Browse the repository at this point in the history
…bytehq#16811)

* rewrite buildWebBackendConnectionRead to avoid fetching all jobs

* format

* query for latest sync job in addition to latest running sync job

* fix query

* add test coverage for new methods

* simplify optional logic and add another test case
  • Loading branch information
pmossman authored and robbinhan committed Sep 29, 2022
1 parent a11b757 commit f247683
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,20 @@ public Optional<Job> getLastReplicationJob(final UUID connectionId) throws IOExc
.flatMap(r -> getJobOptional(ctx, r.get(JOB_ID, Long.class))));
}

@Override
public Optional<Job> getLastSyncJob(final UUID connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + WHERE +
"CAST(jobs.config_type AS VARCHAR) = ? " + AND +
"scope = ? " +
"ORDER BY jobs.created_at DESC LIMIT 1",
Sqls.toSqlName(ConfigType.SYNC),
connectionId.toString())
.stream()
.findFirst()
.flatMap(r -> getJobOptional(ctx, r.get(JOB_ID, Long.class))));
}

@Override
public Optional<Job> getFirstReplicationJob(final UUID connectionId) throws IOException {
return jobDatabase.query(ctx -> ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(UUID con

Optional<Job> getLastReplicationJob(UUID connectionId) throws IOException;

Optional<Job> getLastSyncJob(UUID connectionId) throws IOException;

Optional<Job> getFirstReplicationJob(UUID connectionId) throws IOException;

Optional<Job> getNextJob() throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ class DefaultJobPersistenceTest {
.withConfigType(ConfigType.SYNC)
.withSync(new JobSyncConfig());

private static final JobConfig RESET_JOB_CONFIG = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withSync(new JobSyncConfig());

private static final int DEFAULT_MINIMUM_AGE_IN_DAYS = 30;
private static final int DEFAULT_EXCESSIVE_NUMBER_OF_JOBS = 500;
private static final int DEFAULT_MINIMUM_RECENCY_COUNT = 10;
Expand Down Expand Up @@ -853,14 +857,14 @@ class GetLastReplicationJob {

@Test
@DisplayName("Should return nothing if no job exists")
void testGetLastSyncJobForConnectionIdEmpty() throws IOException {
void testGetLastReplicationJobForConnectionIdEmpty() throws IOException {
final Optional<Job> actual = jobPersistence.getLastReplicationJob(CONNECTION_ID);

assertTrue(actual.isEmpty());
}

@Test
@DisplayName("Should return the last enqueued job")
@DisplayName("Should return the last sync job")
void testGetLastSyncJobForConnectionId() throws IOException {
final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH));
Expand All @@ -875,6 +879,72 @@ void testGetLastSyncJobForConnectionId() throws IOException {
assertEquals(Optional.of(expected), actual);
}

@Test
@DisplayName("Should return the last reset job")
void testGetLastResetJobForConnectionId() throws IOException {
final long jobId1 = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH));

final Instant afterNow = NOW.plusSeconds(1000);
when(timeSupplier.get()).thenReturn(afterNow);
final long jobId2 = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow();

final Optional<Job> actual = jobPersistence.getLastReplicationJob(CONNECTION_ID);
final Job expected = createJob(jobId2, RESET_JOB_CONFIG, JobStatus.PENDING, Collections.emptyList(), afterNow.getEpochSecond());

assertEquals(Optional.of(expected), actual);
}

}

@Nested
@DisplayName("When getting last sync job")
class GetLastSyncJob {

@Test
@DisplayName("Should return nothing if no job exists")
void testGetLastSyncJobForConnectionIdEmpty() throws IOException {
final Optional<Job> actual = jobPersistence.getLastSyncJob(CONNECTION_ID);

assertTrue(actual.isEmpty());
}

@Test
@DisplayName("Should return the last enqueued sync job")
void testGetLastSyncJobForConnectionId() throws IOException {
final long jobId1 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(jobId1, jobPersistence.createAttempt(jobId1, LOG_PATH));

final Instant afterNow = NOW.plusSeconds(1000);
when(timeSupplier.get()).thenReturn(afterNow);
final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow();
final int attemptNumber = jobPersistence.createAttempt(jobId2, LOG_PATH);

// Return the latest sync job even if failed
jobPersistence.failAttempt(jobId2, attemptNumber);
final Attempt attempt = jobPersistence.getJob(jobId2).getAttempts().stream().findFirst().orElseThrow();
jobPersistence.failJob(jobId2);

final Optional<Job> actual = jobPersistence.getLastSyncJob(CONNECTION_ID);
final Job expected = createJob(jobId2, SYNC_JOB_CONFIG, JobStatus.FAILED, List.of(attempt), afterNow.getEpochSecond());

assertEquals(Optional.of(expected), actual);
}

@Test
@DisplayName("Should return nothing if only reset job exists")
void testGetLastSyncJobForConnectionIdEmptyBecauseOnlyReset() throws IOException {
final long jobId = jobPersistence.enqueueJob(SCOPE, RESET_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH));

final Instant afterNow = NOW.plusSeconds(1000);
when(timeSupplier.get()).thenReturn(afterNow);

final Optional<Job> actual = jobPersistence.getLastSyncJob(CONNECTION_ID);

assertTrue(actual.isEmpty());
}

}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList());
}

private static JobRead getJobRead(final Job job) {
public static JobRead getJobRead(final Job job) {
final String configId = job.getScope();
final JobConfigType configType = Enums.convertTo(job.getConfigType(), JobConfigType.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody;
Expand All @@ -30,11 +31,14 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -120,6 +124,22 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
return buildJobDebugInfoRead(jobinfoRead);
}

public Optional<JobRead> getLatestRunningSyncJob(final UUID connectionId) throws IOException {
final List<Job> nonTerminalSyncJobsForConnection = jobPersistence.listJobsForConnectionWithStatuses(
connectionId,
Collections.singleton(ConfigType.SYNC),
JobStatus.NON_TERMINAL_STATUSES);

// there *should* only be a single running sync job for a connection, but
// jobPersistence.listJobsForConnectionWithStatuses orders by created_at desc so
// .findFirst will always return what we want.
return nonTerminalSyncJobsForConnection.stream().map(JobConverter::getJobRead).findFirst();
}

public Optional<JobRead> getLatestSyncJob(final UUID connectionId) throws IOException {
return jobPersistence.getLastSyncJob(connectionId).map(JobConverter::getJobRead);
}

private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId());
return sourceHandler.getSource(sourceIdRequestBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.api.model.generated.AirbyteCatalog;
import io.airbyte.api.model.generated.AirbyteStream;
import io.airbyte.api.model.generated.AirbyteStreamAndConfiguration;
Expand All @@ -22,12 +21,7 @@
import io.airbyte.api.model.generated.ConnectionUpdate;
import io.airbyte.api.model.generated.DestinationIdRequestBody;
import io.airbyte.api.model.generated.DestinationRead;
import io.airbyte.api.model.generated.JobConfigType;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.api.model.generated.JobStatus;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.OperationCreate;
import io.airbyte.api.model.generated.OperationReadList;
import io.airbyte.api.model.generated.OperationUpdate;
Expand Down Expand Up @@ -74,8 +68,6 @@
@Slf4j
public class WebBackendConnectionsHandler {

private static final Set<JobStatus> TERMINAL_STATUSES = Sets.newHashSet(JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.CANCELLED);

private final ConnectionsHandler connectionsHandler;
private final StateHandler stateHandler;
private final SourceHandler sourceHandler;
Expand Down Expand Up @@ -128,20 +120,20 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
final SourceRead source = getSourceRead(connectionRead);
final DestinationRead destination = getDestinationRead(connectionRead);
final OperationReadList operations = getOperationReadList(connectionRead);
final JobReadList syncJobReadList = getSyncJobs(connectionRead);
final Optional<JobRead> latestSyncJob = jobHistoryHandler.getLatestSyncJob(connectionRead.getConnectionId());
final Optional<JobRead> latestRunningSyncJob = jobHistoryHandler.getLatestRunningSyncJob(connectionRead.getConnectionId());

final WebBackendConnectionRead webBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations)
.catalogId(connectionRead.getSourceCatalogId())
.isSyncing(syncJobReadList.getJobs()
.stream()
.map(JobWithAttemptsRead::getJob)
.anyMatch(WebBackendConnectionsHandler::isRunningJob));
setLatestSyncJobProperties(webBackendConnectionRead, syncJobReadList);
return webBackendConnectionRead;
}
.catalogId(connectionRead.getSourceCatalogId());

webBackendConnectionRead.setIsSyncing(latestRunningSyncJob.isPresent());

latestSyncJob.ifPresent(job -> {
webBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
webBackendConnectionRead.setLatestSyncJobStatus(job.getStatus());
});

private static boolean isRunningJob(final JobRead job) {
return !TERMINAL_STATUSES.contains(job.getStatus());
return webBackendConnectionRead;
}

private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down Expand Up @@ -185,21 +177,6 @@ private static WebBackendConnectionRead getWebBackendConnectionRead(final Connec
.resourceRequirements(connectionRead.getResourceRequirements());
}

private JobReadList getSyncJobs(final ConnectionRead connectionRead) throws IOException {
final JobListRequestBody jobListRequestBody = new JobListRequestBody()
.configId(connectionRead.getConnectionId().toString())
.configTypes(Collections.singletonList(JobConfigType.SYNC));
return jobHistoryHandler.listJobsFor(jobListRequestBody);
}

private static void setLatestSyncJobProperties(final WebBackendConnectionRead WebBackendConnectionRead, final JobReadList syncJobReadList) {
syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst()
.ifPresent(job -> {
WebBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
WebBackendConnectionRead.setLatestSyncJobStatus(job.getStatus());
});
}

public WebBackendConnectionReadList webBackendSearchConnections(final WebBackendConnectionSearch webBackendConnectionSearch)
throws ConfigNotFoundException, IOException, JsonValidationException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.DestinationDefinitionHelpers;
import io.airbyte.server.helpers.DestinationHelpers;
Expand Down Expand Up @@ -108,13 +109,13 @@ private static AttemptRead toAttemptRead(final Attempt a) {
.endedAt(a.getEndedAtInSecond().orElse(null));
}

private static Attempt createSuccessfulAttempt(final long jobId, final long timestamps) {
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, AttemptStatus.SUCCEEDED, null, timestamps, timestamps, timestamps);
private static Attempt createAttempt(final long jobId, final long timestamps, final AttemptStatus status) {
return new Attempt(ATTEMPT_ID, jobId, LOG_PATH, null, status, null, timestamps, timestamps, timestamps);
}

@BeforeEach
void setUp() throws IOException, JsonValidationException, ConfigNotFoundException {
testJobAttempt = createSuccessfulAttempt(JOB_ID, CREATED_AT);
testJobAttempt = createAttempt(JOB_ID, CREATED_AT, AttemptStatus.SUCCEEDED);
testJob = new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt), JOB_STATUS, null, CREATED_AT,
CREATED_AT);

Expand Down Expand Up @@ -174,7 +175,7 @@ void testListJobsFor() throws IOException {

final var secondJobId = JOB_ID + 100;
final var createdAt2 = CREATED_AT + 1000;
final var secondJobAttempt = createSuccessfulAttempt(secondJobId, createdAt2);
final var secondJobAttempt = createAttempt(secondJobId, createdAt2, AttemptStatus.SUCCEEDED);
final var secondJob = new Job(secondJobId, ConfigType.DISCOVER_SCHEMA, JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(secondJobAttempt),
JobStatus.SUCCEEDED, null, createdAt2, createdAt2);

Expand Down Expand Up @@ -300,6 +301,75 @@ void testGetDebugJobInfo() throws IOException, JsonValidationException, ConfigNo
assertEquals(exp, jobDebugInfoActual);
}

@Test
@DisplayName("Should return the latest running sync job")
void testGetLatestRunningSyncJob() throws IOException {
final var connectionId = UUID.randomUUID();

final var olderRunningJobId = JOB_ID + 100;
final var olderRunningCreatedAt = CREATED_AT + 1000;
final var olderRunningJobAttempt = createAttempt(olderRunningJobId, olderRunningCreatedAt, AttemptStatus.RUNNING);
final var olderRunningJob = new Job(olderRunningJobId, ConfigType.SYNC, JOB_CONFIG_ID,
JOB_CONFIG, ImmutableList.of(olderRunningJobAttempt),
JobStatus.RUNNING, null, olderRunningCreatedAt, olderRunningCreatedAt);

// expect that we return the newer of the two running jobs. this should not happen in the real
// world but might as
// well test that we handle it properly.
final var newerRunningJobId = JOB_ID + 200;
final var newerRunningCreatedAt = CREATED_AT + 2000;
final var newerRunningJobAttempt = createAttempt(newerRunningJobId, newerRunningCreatedAt, AttemptStatus.RUNNING);
final var newerRunningJob = new Job(newerRunningJobId, ConfigType.SYNC, JOB_CONFIG_ID,
JOB_CONFIG, ImmutableList.of(newerRunningJobAttempt),
JobStatus.RUNNING, null, newerRunningCreatedAt, newerRunningCreatedAt);

when(jobPersistence.listJobsForConnectionWithStatuses(
connectionId,
Collections.singleton(ConfigType.SYNC),
JobStatus.NON_TERMINAL_STATUSES)).thenReturn(List.of(newerRunningJob, olderRunningJob));

final Optional<JobRead> expectedJob = Optional.of(JobConverter.getJobRead(newerRunningJob));
final Optional<JobRead> actualJob = jobHistoryHandler.getLatestRunningSyncJob(connectionId);

assertEquals(expectedJob, actualJob);
}

@Test
@DisplayName("Should return an empty optional if no running sync job")
void testGetLatestRunningSyncJobWhenNone() throws IOException {
final var connectionId = UUID.randomUUID();

when(jobPersistence.listJobsForConnectionWithStatuses(
connectionId,
Collections.singleton(ConfigType.SYNC),
JobStatus.NON_TERMINAL_STATUSES)).thenReturn(Collections.emptyList());

final Optional<JobRead> actual = jobHistoryHandler.getLatestRunningSyncJob(connectionId);

assertTrue(actual.isEmpty());
}

@Test
@DisplayName("Should return the latest sync job")
void testGetLatestSyncJob() throws IOException {
final var connectionId = UUID.randomUUID();

// expect the newest job overall to be returned, even if it is failed
final var newerFailedJobId = JOB_ID + 200;
final var newerFailedCreatedAt = CREATED_AT + 2000;
final var newerFailedJobAttempt = createAttempt(newerFailedJobId, newerFailedCreatedAt, AttemptStatus.FAILED);
final var newerFailedJob = new Job(newerFailedJobId, ConfigType.SYNC, JOB_CONFIG_ID,
JOB_CONFIG, ImmutableList.of(newerFailedJobAttempt),
JobStatus.RUNNING, null, newerFailedCreatedAt, newerFailedCreatedAt);

when(jobPersistence.getLastSyncJob(connectionId)).thenReturn(Optional.of(newerFailedJob));

final Optional<JobRead> expectedJob = Optional.of(JobConverter.getJobRead(newerFailedJob));
final Optional<JobRead> actualJob = jobHistoryHandler.getLatestSyncJob(connectionId);

assertEquals(expectedJob, actualJob);
}

@Test
@DisplayName("Should have compatible config enums")
void testEnumConversion() {
Expand Down
Loading

0 comments on commit f247683

Please sign in to comment.