Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite buildWebBackendConnectionRead to avoid fetching all jobs #16811

Merged
merged 6 commits into from
Sep 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static JobWithAttemptsRead getJobWithAttemptsRead(final Job job) {
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptRead).toList());
}

private static JobRead getJobRead(final Job job) {
public static JobRead getJobRead(final Job job) {
final String configId = job.getScope();
final JobConfigType configType = Enums.convertTo(job.getConfigType(), JobConfigType.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.api.model.generated.JobInfoLightRead;
import io.airbyte.api.model.generated.JobInfoRead;
import io.airbyte.api.model.generated.JobListRequestBody;
import io.airbyte.api.model.generated.JobRead;
import io.airbyte.api.model.generated.JobReadList;
import io.airbyte.api.model.generated.JobWithAttemptsRead;
import io.airbyte.api.model.generated.SourceDefinitionIdRequestBody;
Expand All @@ -30,11 +31,14 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -120,6 +124,25 @@ public JobDebugInfoRead getJobDebugInfo(final JobIdRequestBody jobIdRequestBody)
return buildJobDebugInfoRead(jobinfoRead);
}

public Optional<JobRead> getLatestRunningSyncJob(final UUID connectionId) throws IOException {
final List<Job> nonTerminalSyncJobsForConnection = jobPersistence.listJobsForConnectionWithStatuses(
connectionId,
Collections.singleton(ConfigType.SYNC),
JobStatus.NON_TERMINAL_STATUSES);

if (!nonTerminalSyncJobsForConnection.isEmpty()) {

// jobPersistence.listJobsForConnectionWithStatuses orders by created_at desc, so index 0 is the latest.
// there *should* only be a single running sync job for a connection regardless so .get(0) should always be
// what we want.
final Job job = nonTerminalSyncJobsForConnection.get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for later: We could do something like nonTerminalSyncJobsForConnection.stream().findFirst() which returns an optional -- either the first element is there or the list is empty/there is nothing.


return Optional.of(JobConverter.getJobRead(job));
} else {
return Optional.empty();
}
}

private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(connectionRead.getSourceId());
return sourceHandler.getSource(sourceIdRequestBody);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,6 @@
@Slf4j
public class WebBackendConnectionsHandler {

private static final Set<JobStatus> TERMINAL_STATUSES = Sets.newHashSet(JobStatus.FAILED, JobStatus.SUCCEEDED, JobStatus.CANCELLED);

private final ConnectionsHandler connectionsHandler;
private final StateHandler stateHandler;
private final SourceHandler sourceHandler;
Expand Down Expand Up @@ -128,20 +126,18 @@ private WebBackendConnectionRead buildWebBackendConnectionRead(final ConnectionR
final SourceRead source = getSourceRead(connectionRead);
final DestinationRead destination = getDestinationRead(connectionRead);
final OperationReadList operations = getOperationReadList(connectionRead);
final JobReadList syncJobReadList = getSyncJobs(connectionRead);
final Optional<JobRead> latestRunningSyncJob = jobHistoryHandler.getLatestRunningSyncJob(connectionRead.getConnectionId());

final WebBackendConnectionRead webBackendConnectionRead = getWebBackendConnectionRead(connectionRead, source, destination, operations)
.catalogId(connectionRead.getSourceCatalogId())
.isSyncing(syncJobReadList.getJobs()
.stream()
.map(JobWithAttemptsRead::getJob)
.anyMatch(WebBackendConnectionsHandler::isRunningJob));
setLatestSyncJobProperties(webBackendConnectionRead, syncJobReadList);
return webBackendConnectionRead;
}
.catalogId(connectionRead.getSourceCatalogId());

if (latestRunningSyncJob.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit for later: could do latestRunningSyncJob.ifPresent( -> { //set the fields here }) to avoid the if check.

webBackendConnectionRead.isSyncing(true);
webBackendConnectionRead.setLatestSyncJobCreatedAt(latestRunningSyncJob.get().getCreatedAt());
webBackendConnectionRead.setLatestSyncJobStatus(latestRunningSyncJob.get().getStatus());
}

private static boolean isRunningJob(final JobRead job) {
return !TERMINAL_STATUSES.contains(job.getStatus());
return webBackendConnectionRead;
}

private SourceRead getSourceRead(final ConnectionRead connectionRead) throws JsonValidationException, IOException, ConfigNotFoundException {
Expand Down Expand Up @@ -185,21 +181,6 @@ private static WebBackendConnectionRead getWebBackendConnectionRead(final Connec
.resourceRequirements(connectionRead.getResourceRequirements());
}

private JobReadList getSyncJobs(final ConnectionRead connectionRead) throws IOException {
final JobListRequestBody jobListRequestBody = new JobListRequestBody()
.configId(connectionRead.getConnectionId().toString())
.configTypes(Collections.singletonList(JobConfigType.SYNC));
return jobHistoryHandler.listJobsFor(jobListRequestBody);
}

private static void setLatestSyncJobProperties(final WebBackendConnectionRead WebBackendConnectionRead, final JobReadList syncJobReadList) {
syncJobReadList.getJobs().stream().map(JobWithAttemptsRead::getJob).findFirst()
.ifPresent(job -> {
WebBackendConnectionRead.setLatestSyncJobCreatedAt(job.getCreatedAt());
WebBackendConnectionRead.setLatestSyncJobStatus(job.getStatus());
});
}

public WebBackendConnectionReadList webBackendSearchConnections(final WebBackendConnectionSearch webBackendConnectionSearch)
throws ConfigNotFoundException, IOException, JsonValidationException {

Expand Down