diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java index 040067cc8588..5623de419042 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/planprinter/PlanPrinter.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Streams; import io.airlift.json.JsonCodec; +import io.airlift.stats.TDigest; import io.airlift.units.Duration; import io.trino.Session; import io.trino.client.NodeVersion; @@ -153,6 +154,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.json.JsonCodec.mapJsonCodec; +import static io.airlift.units.DataSize.succinctBytes; import static io.airlift.units.Duration.succinctNanos; import static io.trino.execution.StageInfo.getAllStages; import static io.trino.metadata.ResolvedFunction.extractFunctionName; @@ -530,6 +532,23 @@ private static String formatFragment( formatDouble(outputBufferUtilization.get().getP99() * 100), formatDouble(outputBufferUtilization.get().getMax() * 100))); } + + TDigest taskOutputDistribution = new TDigest(); + stageInfo.get().getTasks().forEach(task -> taskOutputDistribution.add(task.getStats().getOutputDataSize().toBytes())); + TDigest taskInputDistribution = new TDigest(); + stageInfo.get().getTasks().forEach(task -> taskInputDistribution.add(task.getStats().getProcessedInputDataSize().toBytes())); + + if (verbose) { + builder.append(indentString(1)) + .append(format("Task output distribution: %s\n", formatSizeDistribution(taskOutputDistribution))); + builder.append(indentString(1)) + .append(format("Task input distribution: %s\n", formatSizeDistribution(taskInputDistribution))); + } + + if (taskInputDistribution.valueAt(0.99) > taskInputDistribution.valueAt(0.49) * 2) { + builder.append(indentString(1)) + .append("Amount of input data processed by the workers for this stage might be skewed\n"); + } } PartitioningScheme partitioningScheme = fragment.getPartitioningScheme(); @@ -581,6 +600,22 @@ private static String formatFragment( return builder.toString(); } + private static String formatSizeDistribution(TDigest digest) + { + return format("{count=%s, p01=%s, p05=%s, p10=%s, p25=%s, p50=%s, p75=%s, p90=%s, p95=%s, p99=%s, max=%s}", + formatDouble(digest.getCount()), + succinctBytes((long) digest.valueAt(0.01)), + succinctBytes((long) digest.valueAt(0.05)), + succinctBytes((long) digest.valueAt(0.10)), + succinctBytes((long) digest.valueAt(0.25)), + succinctBytes((long) digest.valueAt(0.50)), + succinctBytes((long) digest.valueAt(0.75)), + succinctBytes((long) digest.valueAt(0.90)), + succinctBytes((long) digest.valueAt(0.95)), + succinctBytes((long) digest.valueAt(0.99)), + succinctBytes((long) digest.getMax())); + } + private static TypeProvider getTypeProvider(List fragments) { return TypeProvider.copyOf(fragments.stream() diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java index 49e891aa2150..41fcf48f62c5 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractDistributedEngineOnlyQueries.java @@ -233,6 +233,8 @@ public void testExplainAnalyzeVerbose() "'CPU time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", "'Scheduled time distribution \\(s\\)' = \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, min=.*, max=.*}", "Output buffer active time: .*, buffer utilization distribution \\(%\\): \\{p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, max=.*}", + "Task output distribution: \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, max=.*}", + "Task input distribution: \\{count=.*, p01=.*, p05=.*, p10=.*, p25=.*, p50=.*, p75=.*, p90=.*, p95=.*, p99=.*, max=.*}", "Trino version: .*"); }