From 1f51e08121797eaac0de989999b13d30a03aa88a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 7 Apr 2022 22:05:39 +0200 Subject: [PATCH 1/5] Static import --- .../scheduler/ExponentialGrowthPartitionMemoryEstimator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java index 586e3936cbcf..b237d24dfa0f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java @@ -16,9 +16,9 @@ import com.google.common.collect.Ordering; import io.airlift.units.DataSize; import io.trino.Session; -import io.trino.SystemSessionProperties; import io.trino.spi.ErrorCode; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; @@ -39,7 +39,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory DataSize previousMemory = previousMemoryRequirements.getRequiredMemory(); DataSize baseMemory = Ordering.natural().max(peakMemoryUsage, previousMemory); if (shouldIncreaseMemory(errorCode)) { - double growthFactor = SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor(session); + double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session); return new MemoryRequirements(DataSize.of((long) (baseMemory.toBytes() * growthFactor), DataSize.Unit.BYTE), false); } return new MemoryRequirements(baseMemory, false); From 0621b7cba014bfd0a93e65b6feaa29a4190bfc2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 6 Apr 2022 16:19:33 +0200 Subject: [PATCH 2/5] Introduce factory for PartitionMemoryEstimator --- .../io/trino/execution/SqlQueryExecution.java | 18 ++++++++--------- .../PartitionMemoryEstimatorFactory.java | 20 +++++++++++++++++++ .../scheduler/SqlQueryScheduler.java | 12 +++++------ .../io/trino/server/CoordinatorModule.java | 8 ++++---- 4 files changed, 39 insertions(+), 19 deletions(-) create mode 100644 core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimatorFactory.java diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java index 459f008d08ed..d7231a11943f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlQueryExecution.java @@ -27,7 +27,7 @@ import io.trino.execution.StateMachine.StateChangeListener; import io.trino.execution.scheduler.NodeAllocatorService; import io.trino.execution.scheduler.NodeScheduler; -import io.trino.execution.scheduler.PartitionMemoryEstimator; +import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory; import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.SqlQueryScheduler; import io.trino.execution.scheduler.TaskDescriptorStorage; @@ -102,7 +102,7 @@ public class SqlQueryExecution private final NodePartitioningManager nodePartitioningManager; private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; - private final PartitionMemoryEstimator partitionMemoryEstimator; + private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; private final List planOptimizers; private final PlanFragmenter planFragmenter; private final RemoteTaskFactory remoteTaskFactory; @@ -137,7 +137,7 @@ private SqlQueryExecution( NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, - PartitionMemoryEstimator partitionMemoryEstimator, + PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, List planOptimizers, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -166,7 +166,7 @@ private SqlQueryExecution( this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); - this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); + this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); this.planOptimizers = requireNonNull(planOptimizers, "planOptimizers is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -506,7 +506,7 @@ private void planDistribution(PlanRoot plan) nodePartitioningManager, nodeScheduler, nodeAllocatorService, - partitionMemoryEstimator, + partitionMemoryEstimatorFactory, remoteTaskFactory, plan.isSummarizeTaskInfos(), scheduleSplitBatchSize, @@ -709,7 +709,7 @@ public static class SqlQueryExecutionFactory private final NodePartitioningManager nodePartitioningManager; private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; - private final PartitionMemoryEstimator partitionMemoryEstimator; + private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; private final List planOptimizers; private final PlanFragmenter planFragmenter; private final RemoteTaskFactory remoteTaskFactory; @@ -737,7 +737,7 @@ public static class SqlQueryExecutionFactory NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, - PartitionMemoryEstimator partitionMemoryEstimator, + PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, PlanOptimizersFactory planOptimizersFactory, PlanFragmenter planFragmenter, RemoteTaskFactory remoteTaskFactory, @@ -766,7 +766,7 @@ public static class SqlQueryExecutionFactory this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); - this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); + this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); this.planFragmenter = requireNonNull(planFragmenter, "planFragmenter is null"); this.remoteTaskFactory = requireNonNull(remoteTaskFactory, "remoteTaskFactory is null"); this.queryExecutor = requireNonNull(queryExecutor, "queryExecutor is null"); @@ -807,7 +807,7 @@ public QueryExecution createQueryExecution( nodePartitioningManager, nodeScheduler, nodeAllocatorService, - partitionMemoryEstimator, + partitionMemoryEstimatorFactory, planOptimizers, planFragmenter, remoteTaskFactory, diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimatorFactory.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimatorFactory.java new file mode 100644 index 000000000000..59f27b30dd5a --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/PartitionMemoryEstimatorFactory.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.execution.scheduler; + +@FunctionalInterface +public interface PartitionMemoryEstimatorFactory +{ + PartitionMemoryEstimator createPartitionMemoryEstimator(); +} diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java index 602a6354e797..c74ea5c40075 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/SqlQueryScheduler.java @@ -177,7 +177,7 @@ public class SqlQueryScheduler private final NodePartitioningManager nodePartitioningManager; private final NodeScheduler nodeScheduler; private final NodeAllocatorService nodeAllocatorService; - private final PartitionMemoryEstimator partitionMemoryEstimator; + private final PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory; private final int splitBatchSize; private final ExecutorService executor; private final ScheduledExecutorService schedulerExecutor; @@ -216,7 +216,7 @@ public SqlQueryScheduler( NodePartitioningManager nodePartitioningManager, NodeScheduler nodeScheduler, NodeAllocatorService nodeAllocatorService, - PartitionMemoryEstimator partitionMemoryEstimator, + PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory, RemoteTaskFactory remoteTaskFactory, boolean summarizeTaskInfo, int splitBatchSize, @@ -239,7 +239,7 @@ public SqlQueryScheduler( this.nodePartitioningManager = requireNonNull(nodePartitioningManager, "nodePartitioningManager is null"); this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null"); this.nodeAllocatorService = requireNonNull(nodeAllocatorService, "nodeAllocatorService is null"); - this.partitionMemoryEstimator = requireNonNull(partitionMemoryEstimator, "partitionMemoryEstimator is null"); + this.partitionMemoryEstimatorFactory = requireNonNull(partitionMemoryEstimatorFactory, "partitionMemoryEstimatorFactory is null"); this.splitBatchSize = splitBatchSize; this.executor = requireNonNull(queryExecutor, "queryExecutor is null"); this.schedulerExecutor = requireNonNull(schedulerExecutor, "schedulerExecutor is null"); @@ -355,7 +355,7 @@ private synchronized Optional createDistributedStage schedulerExecutor, schedulerStats, nodeAllocatorService, - partitionMemoryEstimator); + partitionMemoryEstimatorFactory); break; case QUERY: case NONE: @@ -1757,7 +1757,7 @@ public static FaultTolerantDistributedStagesScheduler create( ScheduledExecutorService scheduledExecutorService, SplitSchedulerStats schedulerStats, NodeAllocatorService nodeAllocatorService, - PartitionMemoryEstimator partitionMemoryEstimator) + PartitionMemoryEstimatorFactory partitionMemoryEstimatorFactory) { taskDescriptorStorage.initialize(queryStateMachine.getQueryId()); queryStateMachine.addStateChangeListener(state -> { @@ -1821,7 +1821,7 @@ public static FaultTolerantDistributedStagesScheduler create( taskSourceFactory, nodeAllocator, taskDescriptorStorage, - partitionMemoryEstimator, + partitionMemoryEstimatorFactory.createPartitionMemoryEstimator(), taskLifecycleListener, exchange, bucketToPartitionCache.apply(fragment.getPartitioningScheme().getPartitioning().getHandle()).getBucketToPartitionMap(), diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index af28b8be6250..8e94660eb3cf 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -68,7 +68,7 @@ import io.trino.execution.scheduler.FullNodeCapableNodeAllocatorService; import io.trino.execution.scheduler.NodeAllocatorService; import io.trino.execution.scheduler.NodeSchedulerConfig; -import io.trino.execution.scheduler.PartitionMemoryEstimator; +import io.trino.execution.scheduler.PartitionMemoryEstimatorFactory; import io.trino.execution.scheduler.SplitSchedulerStats; import io.trino.execution.scheduler.StageTaskSourceFactory; import io.trino.execution.scheduler.TaskDescriptorStorage; @@ -227,21 +227,21 @@ protected void setup(Binder binder) config -> FIXED_COUNT == config.getNodeAllocatorType(), innerBinder -> { innerBinder.bind(NodeAllocatorService.class).to(FixedCountNodeAllocatorService.class).in(Scopes.SINGLETON); - innerBinder.bind(PartitionMemoryEstimator.class).to(ConstantPartitionMemoryEstimator.class).in(Scopes.SINGLETON); + innerBinder.bind(PartitionMemoryEstimatorFactory.class).toInstance(ConstantPartitionMemoryEstimator::new); })); install(conditionalModule( NodeSchedulerConfig.class, config -> FULL_NODE_CAPABLE == config.getNodeAllocatorType(), innerBinder -> { innerBinder.bind(NodeAllocatorService.class).to(FullNodeCapableNodeAllocatorService.class).in(Scopes.SINGLETON); - innerBinder.bind(PartitionMemoryEstimator.class).to(FallbackToFullNodePartitionMemoryEstimator.class).in(Scopes.SINGLETON); + innerBinder.bind(PartitionMemoryEstimatorFactory.class).toInstance(FallbackToFullNodePartitionMemoryEstimator::new); })); install(conditionalModule( NodeSchedulerConfig.class, config -> BIN_PACKING == config.getNodeAllocatorType(), innerBinder -> { innerBinder.bind(NodeAllocatorService.class).to(BinPackingNodeAllocatorService.class).in(Scopes.SINGLETON); - innerBinder.bind(PartitionMemoryEstimator.class).to(ExponentialGrowthPartitionMemoryEstimator.class).in(Scopes.SINGLETON); + innerBinder.bind(PartitionMemoryEstimatorFactory.class).toInstance(ExponentialGrowthPartitionMemoryEstimator::new); })); // node monitor From 6c08e66eb7375b42fd9f35ffc6c27e7bd4e3453a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Wed, 6 Apr 2022 18:58:02 +0200 Subject: [PATCH 3/5] Register partition processing finished in size estimator --- .../scheduler/ConstantPartitionMemoryEstimator.java | 5 +++++ .../ExponentialGrowthPartitionMemoryEstimator.java | 5 +++++ .../FallbackToFullNodePartitionMemoryEstimator.java | 5 +++++ .../execution/scheduler/FaultTolerantStageScheduler.java | 8 ++++++-- .../execution/scheduler/PartitionMemoryEstimator.java | 3 +++ 5 files changed, 24 insertions(+), 2 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java index d6bf6b77208a..d65615da50e5 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ConstantPartitionMemoryEstimator.java @@ -17,6 +17,8 @@ import io.trino.Session; import io.trino.spi.ErrorCode; +import java.util.Optional; + public class ConstantPartitionMemoryEstimator implements PartitionMemoryEstimator { @@ -33,4 +35,7 @@ public MemoryRequirements getNextRetryMemoryRequirements(Session session, Memory { return previousMemoryRequirements; } + + @Override + public void registerPartitionFinished(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) {} } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java index b237d24dfa0f..6cf0b1df77e2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java @@ -18,6 +18,8 @@ import io.trino.Session; import io.trino.spi.ErrorCode; +import java.util.Optional; + import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; @@ -50,4 +52,7 @@ private boolean shouldIncreaseMemory(ErrorCode errorCode) return EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode().equals(errorCode) // too many tasks from single query on a node || CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode); // too many tasks in general on a node } + + @Override + public void registerPartitionFinished(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) {} } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java index e1fb689f3fc9..0a2dc40235bf 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FallbackToFullNodePartitionMemoryEstimator.java @@ -17,6 +17,8 @@ import io.trino.Session; import io.trino.spi.ErrorCode; +import java.util.Optional; + import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; @@ -50,4 +52,7 @@ private boolean shouldRescheduleWithFullNode(ErrorCode errorCode) return EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode().equals(errorCode) // too many tasks from single query on a node || CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode); // too many tasks in general on a node } + + @Override + public void registerPartitionFinished(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) {} } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index 98d7519167b6..22b5acd67787 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -515,6 +515,8 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional 0 @@ -546,8 +552,6 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional errorCode); + class MemoryRequirements { private final DataSize requiredMemory; From 29e11fa582e1296b964c8eb26544e28fe29b9025 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 7 Apr 2022 22:05:09 +0200 Subject: [PATCH 4/5] Estimate partition memory usage based on previous attempts --- .../io/trino/SystemSessionProperties.java | 12 +++ ...nentialGrowthPartitionMemoryEstimator.java | 47 ++++++++-- .../io/trino/memory/MemoryManagerConfig.java | 17 ++++ ...nentialGrowthPartitionMemoryEstimator.java | 89 ++++++++++++++++--- .../trino/memory/TestMemoryManagerConfig.java | 7 +- 5 files changed, 150 insertions(+), 22 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 9d7a1e957e0d..0d05030187f0 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -162,6 +162,7 @@ public final class SystemSessionProperties public static final String FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT = "fault_tolerant_execution_max_task_split_count"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_GROWTH_FACTOR = "fault_tolerant_execution_task_memory_growth_factor"; + public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE = "fault_tolerant_execution_task_memory_estimation_quantile"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; @@ -777,6 +778,12 @@ public SystemSessionProperties( "Factor by which estimated task memory is increased if task execution runs out of memory; value is used allocating nodes for tasks execution", memoryManagerConfig.getFaultTolerantExecutionTaskMemoryGrowthFactor(), false), + doubleProperty( + FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, + "What quantile of memory usage of completed tasks to look at when estimating memory usage for upcoming tasks", + memoryManagerConfig.getFaultTolerantExecutionTaskMemoryEstimationQuantile(), + value -> validateDoubleRange(value, FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, 0.0, 1.0), + false), booleanProperty( ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, "When enabled, partial aggregation might be adaptively turned off when it does not provide any performance gain", @@ -1414,6 +1421,11 @@ public static double getFaultTolerantExecutionTaskMemoryGrowthFactor(Session ses return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY_GROWTH_FACTOR, Double.class); } + public static double getFaultTolerantExecutionTaskMemoryEstimationQuantile(Session session) + { + return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY_ESTIMATION_QUANTILE, Double.class); + } + public static boolean isAdaptivePartialAggregationEnabled(Session session) { return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java index 6cf0b1df77e2..8b6f62037278 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java @@ -14,12 +14,14 @@ package io.trino.execution.scheduler; import com.google.common.collect.Ordering; +import io.airlift.stats.TDigest; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.spi.ErrorCode; import java.util.Optional; +import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; @@ -27,11 +29,13 @@ public class ExponentialGrowthPartitionMemoryEstimator implements PartitionMemoryEstimator { + private final TDigest memoryUsageDistribution = new TDigest(); + @Override public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize defaultMemoryLimit) { return new MemoryRequirements( - defaultMemoryLimit, + Ordering.natural().max(defaultMemoryLimit, getEstimatedMemoryUsage(session)), false); } @@ -39,20 +43,49 @@ public MemoryRequirements getInitialMemoryRequirements(Session session, DataSize public MemoryRequirements getNextRetryMemoryRequirements(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, ErrorCode errorCode) { DataSize previousMemory = previousMemoryRequirements.getRequiredMemory(); - DataSize baseMemory = Ordering.natural().max(peakMemoryUsage, previousMemory); - if (shouldIncreaseMemory(errorCode)) { + + // start with the maximum of previously used memory and actual usage + DataSize newMemory = Ordering.natural().max(peakMemoryUsage, previousMemory); + if (isOutOfMemoryError(errorCode)) { + // multiply if we hit an oom error double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session); - return new MemoryRequirements(DataSize.of((long) (baseMemory.toBytes() * growthFactor), DataSize.Unit.BYTE), false); + newMemory = DataSize.of((long) (newMemory.toBytes() * growthFactor), DataSize.Unit.BYTE); } - return new MemoryRequirements(baseMemory, false); + + // if we are still below current estimate for new partition let's bump further + newMemory = Ordering.natural().max(newMemory, getEstimatedMemoryUsage(session)); + + return new MemoryRequirements(newMemory, false); } - private boolean shouldIncreaseMemory(ErrorCode errorCode) + private boolean isOutOfMemoryError(ErrorCode errorCode) { return EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode().equals(errorCode) // too many tasks from single query on a node || CLUSTER_OUT_OF_MEMORY.toErrorCode().equals(errorCode); // too many tasks in general on a node } @Override - public void registerPartitionFinished(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) {} + public synchronized void registerPartitionFinished(Session session, MemoryRequirements previousMemoryRequirements, DataSize peakMemoryUsage, boolean success, Optional errorCode) + { + if (success) { + memoryUsageDistribution.add(peakMemoryUsage.toBytes()); + } + if (!success && errorCode.isPresent() && isOutOfMemoryError(errorCode.get())) { + double growthFactor = getFaultTolerantExecutionTaskMemoryGrowthFactor(session); + // take previousRequiredBytes into account when registering failure on oom. It is conservative hence safer (and in-line with getNextRetryMemoryRequirements) + long previousRequiredBytes = previousMemoryRequirements.getRequiredMemory().toBytes(); + long previousPeakBytes = peakMemoryUsage.toBytes(); + memoryUsageDistribution.add(Math.max(previousRequiredBytes, previousPeakBytes) * growthFactor); + } + } + + private synchronized DataSize getEstimatedMemoryUsage(Session session) + { + double estimationQuantile = getFaultTolerantExecutionTaskMemoryEstimationQuantile(session); + double estimation = memoryUsageDistribution.valueAt(estimationQuantile); + if (Double.isNaN(estimation)) { + return DataSize.ofBytes(0); + } + return DataSize.ofBytes((long) estimation); + } } diff --git a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java index 81b021d3f0c7..9859ca322e94 100644 --- a/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/MemoryManagerConfig.java @@ -44,6 +44,7 @@ public class MemoryManagerConfig private DataSize maxQueryTotalMemory; private DataSize faultTolerantExecutionTaskMemory = DataSize.of(4, GIGABYTE); private double faultTolerantExecutionTaskMemoryGrowthFactor = 3.0; + private double faultTolerantExecutionTaskMemoryEstimationQuantile = 0.9; private LowMemoryKillerPolicy lowMemoryKillerPolicy = LowMemoryKillerPolicy.TOTAL_RESERVATION_ON_BLOCKED_NODES; private Duration killOnOutOfMemoryDelay = new Duration(5, MINUTES); @@ -132,6 +133,22 @@ public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryGrowthFactor(doubl return this; } + @NotNull + public double getFaultTolerantExecutionTaskMemoryEstimationQuantile() + { + return faultTolerantExecutionTaskMemoryEstimationQuantile; + } + + @Config("fault-tolerant-execution-task-memory-estimation-quantile") + @ConfigDescription("What quantile of memory usage of completed tasks to look at when estimating memory usage for upcoming tasks") + public MemoryManagerConfig setFaultTolerantExecutionTaskMemoryEstimationQuantile(double faultTolerantExecutionTaskMemoryEstimationQuantile) + { + checkArgument(faultTolerantExecutionTaskMemoryEstimationQuantile >= 0.0 && faultTolerantExecutionTaskMemoryEstimationQuantile <= 1.0, + "fault-tolerant-execution-task-memory-estimation-quantile must not be in [0.0, 1.0] range"); + this.faultTolerantExecutionTaskMemoryEstimationQuantile = faultTolerantExecutionTaskMemoryEstimationQuantile; + return this; + } + public enum LowMemoryKillerPolicy { NONE, diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java index 7e3c1007ad5b..62ef1f253ca3 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java @@ -15,11 +15,17 @@ import io.airlift.units.DataSize; import io.trino.Session; +import io.trino.execution.scheduler.PartitionMemoryEstimator.MemoryRequirements; import io.trino.spi.StandardErrorCode; import io.trino.testing.TestingSession; import org.testng.annotations.Test; +import java.util.Optional; + import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.spi.StandardErrorCode.ADMINISTRATIVELY_PREEMPTED; +import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; +import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; import static org.assertj.core.api.Assertions.assertThat; public class TestExponentialGrowthPartitionMemoryEstimator @@ -32,48 +38,105 @@ public void testEstimator() Session session = TestingSession.testSessionBuilder().build(); assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(107, MEGABYTE))) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(107, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(107, MEGABYTE), false)); // peak memory of failed task 10MB assertThat( estimator.getNextRetryMemoryRequirements( session, - new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), DataSize.of(10, MEGABYTE), StandardErrorCode.CORRUPT_PAGE.toErrorCode())) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(50, MEGABYTE), false)); assertThat( estimator.getNextRetryMemoryRequirements( session, - new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), DataSize.of(10, MEGABYTE), StandardErrorCode.CLUSTER_OUT_OF_MEMORY.toErrorCode())) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(150, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE), false)); assertThat( estimator.getNextRetryMemoryRequirements( session, - new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), DataSize.of(10, MEGABYTE), - StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(150, MEGABYTE), false)); + EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) + .isEqualTo(new MemoryRequirements(DataSize.of(150, MEGABYTE), false)); // peak memory of failed task 70MB assertThat( estimator.getNextRetryMemoryRequirements( session, - new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), DataSize.of(70, MEGABYTE), StandardErrorCode.CORRUPT_PAGE.toErrorCode())) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(70, MEGABYTE), false)); + .isEqualTo(new MemoryRequirements(DataSize.of(70, MEGABYTE), false)); assertThat( estimator.getNextRetryMemoryRequirements( session, - new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(50, MEGABYTE), false), + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), DataSize.of(70, MEGABYTE), - StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) - .isEqualTo(new PartitionMemoryEstimator.MemoryRequirements(DataSize.of(210, MEGABYTE), false)); + EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) + .isEqualTo(new MemoryRequirements(DataSize.of(210, MEGABYTE), false)); + + // register a couple successful attempts; 90th percentile is at 300MB + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(1000, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(300, MEGABYTE), true, Optional.empty()); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(100, MEGABYTE), true, Optional.empty()); + + // for initial we should pick estimate if greater than default + assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) + .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE), false)); + + // if default memory requirements is greater than estimate it should be picked still + assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(500, MEGABYTE))) + .isEqualTo(new MemoryRequirements(DataSize.of(500, MEGABYTE), false)); + + // for next we should still pick current initial if greater + assertThat( + estimator.getNextRetryMemoryRequirements( + session, + new MemoryRequirements(DataSize.of(50, MEGABYTE), false), + DataSize.of(70, MEGABYTE), + EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())) + .isEqualTo(new MemoryRequirements(DataSize.of(300, MEGABYTE), false)); + + // a couple oom errors are registered + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); + + // 90th percentile should be now at 200*3 (600) + assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) + .isEqualTo(new MemoryRequirements(DataSize.of(600, MEGABYTE), false)); + + // a couple oom errors are registered with requested memory greater than peak + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), false, Optional.of(EXCEEDED_LOCAL_MEMORY_LIMIT.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(300, MEGABYTE), false), DataSize.of(200, MEGABYTE), true, Optional.of(CLUSTER_OUT_OF_MEMORY.toErrorCode())); + + // 90th percentile should be now at 300*3 (900) + assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) + .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE), false)); + + // other errors should not change estimate + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + estimator.registerPartitionFinished(session, new MemoryRequirements(DataSize.of(100, MEGABYTE), false), DataSize.of(500, MEGABYTE), false, Optional.of(ADMINISTRATIVELY_PREEMPTED.toErrorCode())); + + assertThat(estimator.getInitialMemoryRequirements(session, DataSize.of(100, MEGABYTE))) + .isEqualTo(new MemoryRequirements(DataSize.of(900, MEGABYTE), false)); } } diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java index e99cac9fb582..767f2f91e550 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryManagerConfig.java @@ -40,7 +40,8 @@ public void testDefaults() .setMaxQueryMemory(DataSize.of(20, GIGABYTE)) .setMaxQueryTotalMemory(DataSize.of(40, GIGABYTE)) .setFaultTolerantExecutionTaskMemory(DataSize.of(4, GIGABYTE)) - .setFaultTolerantExecutionTaskMemoryGrowthFactor(3.0)); + .setFaultTolerantExecutionTaskMemoryGrowthFactor(3.0) + .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.9)); } @Test @@ -53,6 +54,7 @@ public void testExplicitPropertyMappings() .put("query.max-total-memory", "3GB") .put("fault-tolerant-execution-task-memory", "2GB") .put("fault-tolerant-execution-task-memory-growth-factor", "17.3") + .put("fault-tolerant-execution-task-memory-estimation-quantile", "0.7") .buildOrThrow(); MemoryManagerConfig expected = new MemoryManagerConfig() @@ -61,7 +63,8 @@ public void testExplicitPropertyMappings() .setMaxQueryMemory(DataSize.of(2, GIGABYTE)) .setMaxQueryTotalMemory(DataSize.of(3, GIGABYTE)) .setFaultTolerantExecutionTaskMemory(DataSize.of(2, GIGABYTE)) - .setFaultTolerantExecutionTaskMemoryGrowthFactor(17.3); + .setFaultTolerantExecutionTaskMemoryGrowthFactor(17.3) + .setFaultTolerantExecutionTaskMemoryEstimationQuantile(0.7); assertFullMapping(properties, expected); } From 57457cfe418e4b50c37b92f48fbd28ac760fb8fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Fri, 8 Apr 2022 15:03:59 +0200 Subject: [PATCH 5/5] Add memory requirements debug logs to stage scheduler --- ...nentialGrowthPartitionMemoryEstimator.java | 25 +++++++++++++++++++ .../FaultTolerantStageScheduler.java | 2 ++ 2 files changed, 27 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java index 8b6f62037278..de3f7b650f7f 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/ExponentialGrowthPartitionMemoryEstimator.java @@ -13,13 +13,17 @@ */ package io.trino.execution.scheduler; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Ordering; +import com.google.common.collect.Streams; import io.airlift.stats.TDigest; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.spi.ErrorCode; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryEstimationQuantile; import static io.trino.SystemSessionProperties.getFaultTolerantExecutionTaskMemoryGrowthFactor; @@ -88,4 +92,25 @@ private synchronized DataSize getEstimatedMemoryUsage(Session session) } return DataSize.ofBytes((long) estimation); } + + private String memoryUsageDistributionInfo() + { + List quantiles = ImmutableList.of(0.01, 0.05, 0.1, 0.2, 0.5, 0.8, 0.9, 0.95, 0.99); + List values; + synchronized (this) { + values = memoryUsageDistribution.valuesAt(quantiles); + } + + return Streams.zip( + quantiles.stream(), + values.stream(), + (quantile, value) -> "" + quantile + "=" + value) + .collect(Collectors.joining(", ", "[", "]")); + } + + @Override + public String toString() + { + return "memoryUsageDistribution=" + memoryUsageDistributionInfo(); + } } diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java index 22b5acd67787..9c100e8555de 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/FaultTolerantStageScheduler.java @@ -266,6 +266,7 @@ public synchronized void schedule() TaskDescriptor taskDescriptor = taskDescriptorOptional.get(); MemoryRequirements memoryRequirements = partitionMemoryRequirements.computeIfAbsent(partition, ignored -> partitionMemoryEstimator.getInitialMemoryRequirements(session, taskDescriptor.getNodeRequirements().getMemory())); + log.debug("Computed initial memory requirements for task from stage %s; requirements=%s; estimator=%s", stage.getStageId(), memoryRequirements, partitionMemoryEstimator); if (nodeLease == null) { NodeRequirements nodeRequirements = taskDescriptor.getNodeRequirements(); nodeRequirements = nodeRequirements.withMemory(memoryRequirements.getRequiredMemory()); @@ -553,6 +554,7 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional