Skip to content

Commit

Permalink
Revert "Progress Bar Read APIs (#20937)" (#21115)
Browse files Browse the repository at this point in the history
Breaks when there is no config present
#21112
This reverts commit 3a2b040.
  • Loading branch information
supertopher authored Jan 6, 2023
1 parent e51853e commit ef335e2
Show file tree
Hide file tree
Showing 11 changed files with 87 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import io.airbyte.commons.enums.Enums;
Expand Down Expand Up @@ -73,7 +72,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.InsertValuesStepN;
Expand Down Expand Up @@ -513,72 +511,6 @@ public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) t
});
}

@Override
public Map<JobAttemptPair, AttemptStats> getAttemptStats(final List<Long> jobIds) throws IOException {
final var jobIdsStr = StringUtils.join(jobIds, ',');
return jobDatabase.query(ctx -> {
// Instead of one massive join query, separate this query into two queries for better readability
// for now.
// We can combine the queries at a later date if this still proves to be not efficient enough.
final Map<JobAttemptPair, AttemptStats> attemptStats = hydrateSyncStats(jobIdsStr, ctx);
hydrateStreamStats(jobIdsStr, ctx, attemptStats);
return attemptStats;
});
}

private static Map<JobAttemptPair, AttemptStats> hydrateSyncStats(final String jobIdsStr, final DSLContext ctx) {
final var attemptStats = new HashMap<JobAttemptPair, AttemptStats>();
final var syncResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id,"
+ "stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM sync_stats stats "
+ "INNER JOIN attempts atmpt ON stats.attempt_id = atmpt.id "
+ "WHERE job_id IN ( " + jobIdsStr + ");");
syncResults.forEach(r -> {
final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
final var syncStats = new SyncStats()
.withBytesEmitted(r.get(SYNC_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(SYNC_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(SYNC_STATS.ESTIMATED_BYTES));
attemptStats.put(key, new AttemptStats(syncStats, Lists.newArrayList()));
});
return attemptStats;
}

/**
* This method needed to be called after
* {@link DefaultJobPersistence#hydrateSyncStats(String, DSLContext)} as it assumes hydrateSyncStats
* has prepopulated the map.
*/
private static void hydrateStreamStats(final String jobIdsStr, final DSLContext ctx, final Map<JobAttemptPair, AttemptStats> attemptStats) {
final var streamResults = ctx.fetch(
"SELECT atmpt.attempt_number, atmpt.job_id, "
+ "stats.stream_name, stats.stream_namespace, stats.estimated_bytes, stats.estimated_records, stats.bytes_emitted, stats.records_emitted "
+ "FROM stream_stats stats "
+ "INNER JOIN attempts atmpt ON atmpt.id = stats.attempt_id "
+ "WHERE attempt_id IN "
+ "( SELECT id FROM attempts WHERE job_id IN ( " + jobIdsStr + "));");

streamResults.forEach(r -> {
final var streamSyncStats = new StreamSyncStats()
.withStreamNamespace(r.get(STREAM_STATS.STREAM_NAMESPACE))
.withStreamName(r.get(STREAM_STATS.STREAM_NAME))
.withStats(new SyncStats()
.withBytesEmitted(r.get(STREAM_STATS.BYTES_EMITTED))
.withRecordsEmitted(r.get(STREAM_STATS.RECORDS_EMITTED))
.withEstimatedRecords(r.get(STREAM_STATS.ESTIMATED_RECORDS))
.withEstimatedBytes(r.get(STREAM_STATS.ESTIMATED_BYTES)));

final var key = new JobAttemptPair(r.get(ATTEMPTS.JOB_ID), r.get(ATTEMPTS.ATTEMPT_NUMBER));
if (!attemptStats.containsKey(key)) {
LOGGER.error("{} stream stats entry does not have a corresponding sync stats entry. This suggest the database is in a bad state.", key);
return;
}
attemptStats.get(key).perStreamStats().add(streamSyncStats);
});
}

@Override
public List<NormalizationSummary> getNormalizationSummary(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
Expand Down Expand Up @@ -917,7 +849,7 @@ private static Job getJobFromRecord(final Record record) {

private static Attempt getAttemptFromRecord(final Record record) {
return new Attempt(
record.get(ATTEMPT_NUMBER, int.class),
record.get(ATTEMPT_NUMBER, Long.class),
record.get(JOB_ID, Long.class),
Path.of(record.get("log_path", String.class)),
record.get("attempt_output", String.class) == null ? null : Jsons.deserialize(record.get("attempt_output", String.class), JobOutput.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public interface JobPersistence {
*/
record AttemptStats(SyncStats combinedStats, List<StreamSyncStats> perStreamStats) {}

record JobAttemptPair(long id, int attemptNumber) {}

/**
* Retrieve the combined and per stream stats for a single attempt.
*
Expand All @@ -59,19 +57,6 @@ record JobAttemptPair(long id, int attemptNumber) {}
*/
AttemptStats getAttemptStats(long jobId, int attemptNumber) throws IOException;

/**
* Alternative method to retrieve combined and per stream stats per attempt for a list of jobs to
* avoid overloading the database with too many queries.
* <p>
* This implementation is intended to utilise complex joins under the hood to reduce the potential
* N+1 database pattern.
*
* @param jobIds
* @return
* @throws IOException
*/
Map<JobAttemptPair, AttemptStats> getAttemptStats(List<Long> jobIds) throws IOException;

List<NormalizationSummary> getNormalizationSummary(long jobId, int attemptNumber) throws IOException;

Job getJob(long jobId) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class Attempt {

private final int attemptNumber;
private final long id;
private final long jobId;
private final JobOutput output;
private final AttemptStatus status;
Expand All @@ -23,7 +23,7 @@ public class Attempt {
private final long createdAtInSecond;
private final Long endedAtInSecond;

public Attempt(final int attemptNumber,
public Attempt(final long id,
final long jobId,
final Path logPath,
final @Nullable JobOutput output,
Expand All @@ -32,7 +32,7 @@ public Attempt(final int attemptNumber,
final long createdAtInSecond,
final long updatedAtInSecond,
final @Nullable Long endedAtInSecond) {
this.attemptNumber = attemptNumber;
this.id = id;
this.jobId = jobId;
this.output = output;
this.status = status;
Expand All @@ -43,8 +43,8 @@ public Attempt(final int attemptNumber,
this.endedAtInSecond = endedAtInSecond;
}

public int getAttemptNumber() {
return attemptNumber;
public long getId() {
return id;
}

public long getJobId() {
Expand Down Expand Up @@ -92,7 +92,7 @@ public boolean equals(final Object o) {
return false;
}
final Attempt attempt = (Attempt) o;
return attemptNumber == attempt.attemptNumber &&
return id == attempt.id &&
jobId == attempt.jobId &&
updatedAtInSecond == attempt.updatedAtInSecond &&
createdAtInSecond == attempt.createdAtInSecond &&
Expand All @@ -105,13 +105,13 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(attemptNumber, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
return Objects.hash(id, jobId, output, status, failureSummary, logPath, updatedAtInSecond, createdAtInSecond, endedAtInSecond);
}

@Override
public String toString() {
return "Attempt{" +
"id=" + attemptNumber +
"id=" + id +
", jobId=" + jobId +
", output=" + output +
", status=" + status +
Expand Down
Loading

0 comments on commit ef335e2

Please sign in to comment.