Skip to content

Commit

Permalink
Add memory requirements debug logs to stage scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Apr 9, 2022
1 parent 588209a commit 6fba6a4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
*/
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Ordering;
import com.google.common.collect.Streams;
import io.airlift.stats.TDigest;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.spi.ErrorCode;

import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile;
import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor;
Expand Down Expand Up @@ -88,4 +92,25 @@ private synchronized DataSize getEstimatedMemoryUsage(Session session)
}
return DataSize.ofBytes((long) estimation);
}

private String memoryUsageDistributionInfo()
{
List<Double> quantiles = ImmutableList.of(0.01, 0.05, 0.1, 0.2, 0.5, 0.8, 0.9, 0.95, 0.99);
List<Double> values;
synchronized (this) {
values = memoryUsageDistribution.valuesAt(quantiles);
}

return Streams.zip(
quantiles.stream(),
values.stream(),
(quantile, value) -> "" + quantile + "=" + value)
.collect(Collectors.joining(", ", "[", "]"));
}

@Override
public String toString()
{
return "memoryUsageDistribution=" + memoryUsageDistributionInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ public synchronized void schedule()
TaskDescriptor taskDescriptor = taskDescriptorOptional.get();

MemoryRequirements memoryRequirements = partitionMemoryRequirements.computeIfAbsent(partition, ignored -> partitionMemoryEstimator.getInitialMemoryRequirements(session, taskDescriptor.getNodeRequirements().getMemory()));
log.debug("Computed initial memory requirements for task from stage %s; requirements=%s; estimator=%s", stage.getStageId(), memoryRequirements, partitionMemoryEstimator);
if (nodeLease == null) {
NodeRequirements nodeRequirements = taskDescriptor.getNodeRequirements();
nodeRequirements = nodeRequirements.withMemory(memoryRequirements.getRequiredMemory());
Expand Down Expand Up @@ -553,6 +554,7 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan

// update memory limits for next attempt
MemoryRequirements newMemoryLimits = partitionMemoryEstimator.getNextRetryMemoryRequirements(session, memoryLimits, taskStatus.getPeakMemoryReservation(), errorCode);
log.debug("Computed next memory requirements for task from stage %s; previous=%s; new=%s; peak=%s; estimator=%s", stage.getStageId(), memoryLimits, newMemoryLimits, taskStatus.getPeakMemoryReservation(), partitionMemoryEstimator);
partitionMemoryRequirements.put(partitionId, newMemoryLimits);

// reschedule
Expand Down

0 comments on commit 6fba6a4

Please sign in to comment.