Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remerge Progress Bar Read API. #21124

Merged
merged 5 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -72,6 +73,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
Expand Down Expand Up @@ -511,6 +513,76 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
});
}

@Override
public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException {
if (jobIds == null || jobIds.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

first change from pre-revert

return Map.of();
}

final var jobIdsStr = StringUtils.join(jobIds, ',');
return jobDatabase.query(ctx -> {
// Instead of one massive join query, separate this query into two queries for better readability
// for now.
// We can combine the queries at a later date if this still proves to be not efficient enough.
final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx);
hydrateStreamStats(jobIdsStr, ctx, attemptStats);
return attemptStats;
});
}

private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id,"
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM sync_stats stats "
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
+ "WHERE job_id IN ( " + jobIdsStr + ");");
syncResults.forEach(r -> {
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
final var syncStats = new SyncStats()
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
});
return attemptStats;
}

/**
* This method needed to be called after
* {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats
* has prepopulated the map.
*/
private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) {
final var streamResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id, "
+ "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM stream_stats stats "
+ "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id "
+ "WHERE attempt_id IN "
+ "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));");

streamResults.forEach(r -> {
final var streamSyncStats = new StreamSyncStats()
.withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE))
.withStreamName(r.get(STREAM_STATS.STREAM_NAME))
.withStats(new SyncStats()
.withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES)));

final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
if (!attemptStats.containsKey(key)) {
LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key);
return;
}
attemptStats.get(key).perStreamStats().add(streamSyncStats);
});
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
Expand All @@ -528,6 +600,10 @@ static Long getAttemptId(final long jobId, final int attemptNumber, final DSLCon
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
if (record.isEmpty()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

second change from pre-revert

return -1L;
}

return record.get().get("id", Long.class);
}

Expand Down Expand Up @@ -849,7 +925,7 @@ private static Job getJobFromRecord(final Record record) {

private static Attempt getAttemptFromRecord(final Record record) {
return new Attempt(
record.get(ATTEMPT_NUMBER, Long.class),
record.get(ATTEMPT_NUMBER, int.class),
record.get(JOB_ID, Long.class),
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public interface JobPersistence {
*/
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}

record JobAttemptPair(long id, int attemptNumber) {}

/**
* Retrieve the combined and per stream stats for a single attempt.
*
Expand All @@ -57,6 +59,19 @@ record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStat
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;

/**
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
* avoid overloading the database with too many queries.
* <p>
* This implementation is intended to utilise complex joins under the hood to reduce the potential
* N+1 database pattern.
*
* @param jobIds
* @return
* @throws IOException
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class Attempt {

private final long id;
private final int attemptNumber;
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
Expand All @@ -23,7 +23,7 @@ public class Attempt {
private final long createdAtInSecond;
private final Long endedAtInSecond;

public Attempt(final long id,
public Attempt(final int attemptNumber,
final long jobId,
final Path logPath,
final @Nullable JobOutput output,
Expand All @@ -32,7 +32,7 @@ public Attempt(final long id,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.id = id;
this.attemptNumber = attemptNumber;
this.jobId = jobId;
this.output = output;
this.status = status;
Expand All @@ -43,8 +43,8 @@ public Attempt(final long id,
this.endedAtInSecond = endedAtInSecond;
}

public long getId() {
return id;
public int getAttemptNumber() {
return attemptNumber;
}

public long getJobId() {
Expand Down Expand Up @@ -92,7 +92,7 @@ public boolean equals(final Object o) {
return false;
}
final Attempt attempt = (Attempt) o;
return id == attempt.id &&
return attemptNumber == attempt.attemptNumber &&
jobId == attempt.jobId &&
updatedAtInSecond == attempt.updatedAtInSecond &&
createdAtInSecond == attempt.createdAtInSecond &&
Expand All @@ -105,13 +105,13 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
public String toString() {
return "Attempt{" +
"id=" + id +
"id=" + attemptNumber +
", jobId=" + jobId +
", output=" + output +
", status=" + status +
Expand Down
Loading