Skip to content

Commit

Permalink
[trinodb#30] Add Stats SkippedSplitsByIndex and IndexReadTime
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjinde authored and fengguangyuan committed Mar 9, 2022
1 parent 2ed4235 commit 8825f29
Show file tree
Hide file tree
Showing 34 changed files with 370 additions and 16 deletions.
10 changes: 10 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/Split.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ public SplitWeight getSplitWeight()
return connectorSplit.getSplitWeight();
}

public boolean isSkippedByIndex()
{
return connectorSplit.isSkippedByIndex();
}

public long getIndexReadTime()
{
return connectorSplit.getIndexReadTime();
}

@Override
public String toString()
{
Expand Down
11 changes: 11 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/DriverContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,9 @@ public DriverStats getDriverStats()
List<OperatorStats> operators = getOperatorStats();
OperatorStats inputOperator = getFirst(operators, null);

long skippedSplitsByIndex;
Duration indexReadTime;

DataSize physicalInputDataSize;
long physicalInputPositions;
Duration physicalInputReadTime;
Expand All @@ -352,6 +355,9 @@ public DriverStats getDriverStats()
DataSize outputDataSize;
long outputPositions;
if (inputOperator != null) {
skippedSplitsByIndex = inputOperator.getSkippedSplitsByIndex();
indexReadTime = inputOperator.getIndexReadTime();

physicalInputDataSize = inputOperator.getPhysicalInputDataSize();
physicalInputPositions = inputOperator.getPhysicalInputPositions();
physicalInputReadTime = inputOperator.getAddInputWall();
Expand All @@ -372,6 +378,9 @@ public DriverStats getDriverStats()
outputPositions = outputOperator.getOutputPositions();
}
else {
skippedSplitsByIndex = 0;
indexReadTime = new Duration(0, MILLISECONDS);

physicalInputDataSize = DataSize.ofBytes(0);
physicalInputPositions = 0;
physicalInputReadTime = new Duration(0, MILLISECONDS);
Expand Down Expand Up @@ -415,6 +424,8 @@ public DriverStats getDriverStats()
new Duration(totalBlockedTime, NANOSECONDS).convertToMostSuccinctTimeUnit(),
blockedMonitor != null,
builder.build(),
skippedSplitsByIndex,
indexReadTime,
physicalInputDataSize.succinct(),
physicalInputPositions,
physicalInputReadTime,
Expand Down
24 changes: 24 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/DriverStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public class DriverStats
private final boolean fullyBlocked;
private final Set<BlockedReason> blockedReasons;

private final long skippedSplitsByIndex;
private final Duration indexReadTime;

private final DataSize physicalInputDataSize;
private final long physicalInputPositions;
private final Duration physicalInputReadTime;
Expand Down Expand Up @@ -96,6 +99,9 @@ public DriverStats()
this.fullyBlocked = false;
this.blockedReasons = ImmutableSet.of();

this.skippedSplitsByIndex = 0;
this.indexReadTime = new Duration(0, MILLISECONDS);

this.physicalInputDataSize = DataSize.ofBytes(0);
this.physicalInputPositions = 0;
this.physicalInputReadTime = new Duration(0, MILLISECONDS);
Expand Down Expand Up @@ -139,6 +145,9 @@ public DriverStats(
@JsonProperty("fullyBlocked") boolean fullyBlocked,
@JsonProperty("blockedReasons") Set<BlockedReason> blockedReasons,

@JsonProperty("skippedSplitsByIndex") long skippedSplitsByIndex,
@JsonProperty("indexReadTime") Duration indexReadTime,

@JsonProperty("physicalInputDataSize") DataSize physicalInputDataSize,
@JsonProperty("physicalInputPositions") long physicalInputPositions,
@JsonProperty("physicalInputReadTime") Duration physicalInputReadTime,
Expand Down Expand Up @@ -179,6 +188,9 @@ public DriverStats(
this.fullyBlocked = fullyBlocked;
this.blockedReasons = ImmutableSet.copyOf(requireNonNull(blockedReasons, "blockedReasons is null"));

this.skippedSplitsByIndex = skippedSplitsByIndex;
this.indexReadTime = requireNonNull(indexReadTime, "indexReadTIme is null");

this.physicalInputDataSize = requireNonNull(physicalInputDataSize, "physicalInputDataSize is null");
checkArgument(physicalInputPositions >= 0, "physicalInputPositions is negative");
this.physicalInputPositions = physicalInputPositions;
Expand Down Expand Up @@ -293,6 +305,18 @@ public Set<BlockedReason> getBlockedReasons()
return blockedReasons;
}

@JsonProperty
public long getSkippedSplitsByIndex()
{
return skippedSplitsByIndex;
}

@JsonProperty
public Duration getIndexReadTime()
{
return indexReadTime;
}

@JsonProperty
public DataSize getPhysicalInputDataSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static java.lang.Math.max;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
Expand All @@ -72,6 +73,9 @@ public class OperatorContext
private final DriverContext driverContext;
private final Executor executor;

private final AtomicLong skippedSplitsByIndex = new AtomicLong();
private final AtomicLong indexReadTime = new AtomicLong();

private final CounterStat physicalInputDataSize = new CounterStat();
private final CounterStat physicalInputPositions = new CounterStat();

Expand Down Expand Up @@ -165,6 +169,16 @@ public boolean isDone()
return driverContext.isDone();
}

/**
* Record the number of splits skipped by index and the total time of reading indices in an operator.
* This metric is valid only for source operators reading iceberg tables.
*/
public void recordIndexStats(long skippedSplitsByIndex, long indexReadTime)
{
this.skippedSplitsByIndex.getAndAdd(skippedSplitsByIndex);
this.indexReadTime.getAndAdd(indexReadTime);
}

void recordAddInput(OperationTimer operationTimer, Page page)
{
operationTimer.recordOperationComplete(addInputTiming);
Expand Down Expand Up @@ -553,6 +567,9 @@ private OperatorStats getOperatorStats()

1,

skippedSplitsByIndex.get(),
new Duration(indexReadTime.get(), MILLISECONDS).convertToMostSuccinctTimeUnit(),

addInputTiming.getCalls(),
new Duration(addInputTiming.getWallNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(addInputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down
33 changes: 33 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/OperatorStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static com.google.common.base.Verify.verify;
import static java.lang.Math.max;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

@Immutable
Expand All @@ -44,6 +45,9 @@ public class OperatorStats

private final long totalDrivers;

private final long skippedSplitsByIndex;
private final Duration indexReadTime;

private final long addInputCalls;
private final Duration addInputWall;
private final Duration addInputCpu;
Expand Down Expand Up @@ -99,6 +103,9 @@ public OperatorStats(

@JsonProperty("totalDrivers") long totalDrivers,

@JsonProperty("skippedSplitsByIndex") long skippedSplitsByIndex,
@JsonProperty("indexReadTime") Duration indexReadTime,

@JsonProperty("addInputCalls") long addInputCalls,
@JsonProperty("addInputWall") Duration addInputWall,
@JsonProperty("addInputCpu") Duration addInputCpu,
Expand Down Expand Up @@ -154,6 +161,9 @@ public OperatorStats(

this.totalDrivers = totalDrivers;

this.skippedSplitsByIndex = skippedSplitsByIndex;
this.indexReadTime = requireNonNull(indexReadTime, "indexReadTime is null");

this.addInputCalls = addInputCalls;
this.addInputWall = requireNonNull(addInputWall, "addInputWall is null");
this.addInputCpu = requireNonNull(addInputCpu, "addInputCpu is null");
Expand Down Expand Up @@ -238,6 +248,18 @@ public long getTotalDrivers()
return totalDrivers;
}

@JsonProperty
public long getSkippedSplitsByIndex()
{
return skippedSplitsByIndex;
}

@JsonProperty
public Duration getIndexReadTime()
{
return indexReadTime;
}

@JsonProperty
public long getAddInputCalls()
{
Expand Down Expand Up @@ -452,6 +474,9 @@ public OperatorStats add(Iterable<OperatorStats> operators)
{
long totalDrivers = this.totalDrivers;

long skippedSplitsByIndex = this.skippedSplitsByIndex;
long indexReadTime = this.indexReadTime.roundTo(MILLISECONDS);

long addInputCalls = this.addInputCalls;
long addInputWall = this.addInputWall.roundTo(NANOSECONDS);
long addInputCpu = this.addInputCpu.roundTo(NANOSECONDS);
Expand Down Expand Up @@ -501,6 +526,9 @@ public OperatorStats add(Iterable<OperatorStats> operators)

totalDrivers += operator.totalDrivers;

skippedSplitsByIndex += operator.getSkippedSplitsByIndex();
indexReadTime += operator.getIndexReadTime().roundTo(MILLISECONDS);

addInputCalls += operator.getAddInputCalls();
addInputWall += operator.getAddInputWall().roundTo(NANOSECONDS);
addInputCpu += operator.getAddInputCpu().roundTo(NANOSECONDS);
Expand Down Expand Up @@ -562,6 +590,9 @@ public OperatorStats add(Iterable<OperatorStats> operators)

totalDrivers,

skippedSplitsByIndex,
new Duration(indexReadTime, MILLISECONDS).convertToMostSuccinctTimeUnit(),

addInputCalls,
new Duration(addInputWall, NANOSECONDS).convertToMostSuccinctTimeUnit(),
new Duration(addInputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(),
Expand Down Expand Up @@ -636,6 +667,8 @@ public OperatorStats summarize()
planNodeId,
operatorType,
totalDrivers,
skippedSplitsByIndex,
indexReadTime,
addInputCalls,
addInputWall,
addInputCpu,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.succinctBytes;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.toList;

Expand All @@ -69,6 +70,9 @@ public class PipelineContext
private final AtomicInteger completedDrivers = new AtomicInteger();
private final AtomicLong completedSplitsWeight = new AtomicLong();

private final AtomicLong skippedSplitsByIndex = new AtomicLong();
private final Distribution indexReadTime = new Distribution();

private final AtomicReference<DateTime> executionStartTime = new AtomicReference<>();
private final AtomicReference<DateTime> lastExecutionStartTime = new AtomicReference<>();
private final AtomicReference<DateTime> lastExecutionEndTime = new AtomicReference<>();
Expand Down Expand Up @@ -192,6 +196,12 @@ public void driverFinished(DriverContext driverContext)
completedSplitsWeight.addAndGet(driverContext.getSplitWeight());
}

skippedSplitsByIndex.getAndAdd(driverStats.getSkippedSplitsByIndex());
long driverIndexReadTime = driverStats.getIndexReadTime().roundTo(MILLISECONDS);
if (driverIndexReadTime > 0) {
indexReadTime.add(driverIndexReadTime);
}

queuedTime.add(driverStats.getQueuedTime().roundTo(NANOSECONDS));
elapsedTime.add(driverStats.getElapsedTime().roundTo(NANOSECONDS));

Expand Down Expand Up @@ -353,6 +363,9 @@ public PipelineStats getPipelineStats()

int totalDrivers = completedDrivers + driverContexts.size();

long skippedSplitsByIndex = this.skippedSplitsByIndex.get();
Distribution indexReadTime = this.indexReadTime.duplicate();

Distribution queuedTime = this.queuedTime.duplicate();
Distribution elapsedTime = this.elapsedTime.duplicate();

Expand Down Expand Up @@ -469,6 +482,9 @@ public PipelineStats getPipelineStats()
pipelineStatus.getBlockedDrivers(),
completedDrivers,

skippedSplitsByIndex,
indexReadTime.snapshot(),

succinctBytes(pipelineMemoryContext.getUserMemory()),
succinctBytes(pipelineMemoryContext.getRevocableMemory()),
succinctBytes(pipelineMemoryContext.getSystemMemory()),
Expand Down
23 changes: 23 additions & 0 deletions core/trino-main/src/main/java/io/trino/operator/PipelineStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class PipelineStats
private final int blockedDrivers;
private final int completedDrivers;

private final long skippedSplitsByIndex;
private final DistributionSnapshot indexReadTime;

private final DataSize userMemoryReservation;
private final DataSize revocableMemoryReservation;
private final DataSize systemMemoryReservation;
Expand Down Expand Up @@ -108,6 +111,9 @@ public PipelineStats(
@JsonProperty("blockedDrivers") int blockedDrivers,
@JsonProperty("completedDrivers") int completedDrivers,

@JsonProperty("skippedSplitsByIndex") long skippedSplitsByIndex,
@JsonProperty("indexReadTime") DistributionSnapshot indexReadTime,

@JsonProperty("userMemoryReservation") DataSize userMemoryReservation,
@JsonProperty("revocableMemoryReservation") DataSize revocableMemoryReservation,
@JsonProperty("systemMemoryReservation") DataSize systemMemoryReservation,
Expand Down Expand Up @@ -170,6 +176,9 @@ public PipelineStats(
checkArgument(completedDrivers >= 0, "completedDrivers is negative");
this.completedDrivers = completedDrivers;

this.skippedSplitsByIndex = skippedSplitsByIndex;
this.indexReadTime = requireNonNull(indexReadTime, "indexReadTime is null");

this.userMemoryReservation = requireNonNull(userMemoryReservation, "userMemoryReservation is null");
this.revocableMemoryReservation = requireNonNull(revocableMemoryReservation, "revocableMemoryReservation is null");
this.systemMemoryReservation = requireNonNull(systemMemoryReservation, "systemMemoryReservation is null");
Expand Down Expand Up @@ -303,6 +312,18 @@ public int getCompletedDrivers()
return completedDrivers;
}

@JsonProperty
public long getSkippedSplitsByIndex()
{
return skippedSplitsByIndex;
}

@JsonProperty
public DistributionSnapshot getIndexReadTime()
{
return indexReadTime;
}

@JsonProperty
public DataSize getUserMemoryReservation()
{
Expand Down Expand Up @@ -465,6 +486,8 @@ public PipelineStats summarize()
runningPartitionedSplitsWeight,
blockedDrivers,
completedDrivers,
skippedSplitsByIndex,
indexReadTime,
userMemoryReservation,
revocableMemoryReservation,
systemMemoryReservation,
Expand Down
Loading

0 comments on commit 8825f29

Please sign in to comment.