diff --git a/core/trino-main/src/main/java/io/trino/ExceededMemoryLimitException.java b/core/trino-main/src/main/java/io/trino/ExceededMemoryLimitException.java index b0f57af45bcd..6b95473610b4 100644 --- a/core/trino-main/src/main/java/io/trino/ExceededMemoryLimitException.java +++ b/core/trino-main/src/main/java/io/trino/ExceededMemoryLimitException.java @@ -40,12 +40,6 @@ public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize format("Query exceeded per-node memory limit of %s [%s]", maxMemory, additionalFailureInfo)); } - public static ExceededMemoryLimitException exceededTaskMemoryLimit(DataSize maxMemory, String additionalFailureInfo) - { - return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT, - format("Query exceeded per-task memory limit of %s [%s]", maxMemory, additionalFailureInfo)); - } - private ExceededMemoryLimitException(StandardErrorCode errorCode, String message) { super(errorCode, message); diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 998fbe04ce77..8368cf1e2716 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -130,7 +130,6 @@ public final class SystemSessionProperties public static final String ENABLE_COORDINATOR_DYNAMIC_FILTERS_DISTRIBUTION = "enable_coordinator_dynamic_filters_distribution"; public static final String ENABLE_LARGE_DYNAMIC_FILTERS = "enable_large_dynamic_filters"; public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node"; - public static final String QUERY_MAX_MEMORY_PER_TASK = "query_max_memory_per_task"; public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences"; public static final String FILTERING_SEMI_JOIN_TO_INNER = "rewrite_filtering_semi_join_to_inner_join"; public static final String OPTIMIZE_DUPLICATE_INSENSITIVE_JOINS = "optimize_duplicate_insensitive_joins"; @@ -597,11 +596,6 @@ public SystemSessionProperties( "Maximum amount of memory a query can use per node", nodeMemoryConfig.getMaxQueryMemoryPerNode(), true), - dataSizeProperty( - QUERY_MAX_MEMORY_PER_TASK, - "Maximum amount of memory a single task can use", - nodeMemoryConfig.getMaxQueryMemoryPerTask().orElse(null), - true), booleanProperty( IGNORE_DOWNSTREAM_PREFERENCES, "Ignore Parent's PreferredProperties in AddExchange optimizer", @@ -1178,11 +1172,6 @@ public static DataSize getQueryMaxMemoryPerNode(Session session) return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class); } - public static Optional getQueryMaxMemoryPerTask(Session session) - { - return Optional.ofNullable(session.getSystemProperty(QUERY_MAX_MEMORY_PER_TASK, DataSize.class)); - } - public static boolean ignoreDownStreamPreferences(Session session) { return session.getSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java index 0beb3ac9c66d..5fb155a58dc6 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java @@ -71,7 +71,6 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.Threads.threadsNamed; import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode; -import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerTask; import static io.trino.SystemSessionProperties.resourceOvercommit; import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache; import static io.trino.execution.SqlTask.createSqlTask; @@ -105,7 +104,6 @@ public class SqlTaskManager private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats(); private final long queryMaxMemoryPerNode; - private final Optional queryMaxMemoryPerTask; private final CounterStat failedTasks = new CounterStat(); @@ -144,13 +142,12 @@ public SqlTaskManager( SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, config); DataSize maxQueryMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode(); - queryMaxMemoryPerTask = nodeMemoryConfig.getMaxQueryMemoryPerTask(); DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode(); queryMaxMemoryPerNode = maxQueryMemoryPerNode.toBytes(); queryContexts = buildNonEvictableCache(CacheBuilder.newBuilder().weakValues(), CacheLoader.from( - queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, queryMaxMemoryPerTask, maxQuerySpillPerNode))); + queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQuerySpillPerNode))); tasks = buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from( taskId -> createSqlTask( @@ -173,13 +170,11 @@ private QueryContext createQueryContext( LocalSpillManager localSpillManager, GcMonitor gcMonitor, DataSize maxQueryUserMemoryPerNode, - Optional maxQueryMemoryPerTask, DataSize maxQuerySpillPerNode) { return new QueryContext( queryId, maxQueryUserMemoryPerNode, - maxQueryMemoryPerTask, localMemoryManager.getMemoryPool(), gcMonitor, taskNotificationExecutor, @@ -401,17 +396,10 @@ private TaskInfo doUpdateTask( if (!queryContext.isMemoryLimitsInitialized()) { long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes(); - Optional effectiveQueryMaxMemoryPerTask = getQueryMaxMemoryPerTask(session); - if (queryMaxMemoryPerTask.isPresent() && - (effectiveQueryMaxMemoryPerTask.isEmpty() || effectiveQueryMaxMemoryPerTask.get().toBytes() > queryMaxMemoryPerTask.get().toBytes())) { - effectiveQueryMaxMemoryPerTask = queryMaxMemoryPerTask; - } - // Session properties are only allowed to decrease memory limits, not increase them queryContext.initializeMemoryLimits( resourceOvercommit(session), - min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode), - effectiveQueryMaxMemoryPerTask); + min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode)); } sqlTask.recordHeartbeat(); diff --git a/core/trino-main/src/main/java/io/trino/memory/NodeMemoryConfig.java b/core/trino-main/src/main/java/io/trino/memory/NodeMemoryConfig.java index 7e0ef24df113..a7d54dacf3a7 100644 --- a/core/trino-main/src/main/java/io/trino/memory/NodeMemoryConfig.java +++ b/core/trino-main/src/main/java/io/trino/memory/NodeMemoryConfig.java @@ -16,30 +16,25 @@ import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; -import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import javax.validation.constraints.NotNull; -import java.util.Optional; - // This is separate from MemoryManagerConfig because it's difficult to test the default value of maxQueryMemoryPerNode @DefunctConfig({ "deprecated.legacy-system-pool-enabled", "experimental.reserved-pool-disabled", "experimental.reserved-pool-enabled", "query.max-total-memory-per-node", + "query.max-memory-per-task" }) public class NodeMemoryConfig { public static final long AVAILABLE_HEAP_MEMORY = Runtime.getRuntime().maxMemory(); public static final String QUERY_MAX_MEMORY_PER_NODE_CONFIG = "query.max-memory-per-node"; - public static final String QUERY_MAX_MEMORY_PER_TASK_CONFIG = "query.max-memory-per-task"; private DataSize maxQueryMemoryPerNode = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3)); - private Optional maxQueryMemoryPerTask = Optional.empty(); - private DataSize heapHeadroom = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3)); @NotNull @@ -55,21 +50,6 @@ public NodeMemoryConfig setMaxQueryMemoryPerNode(DataSize maxQueryMemoryPerNode) return this; } - @NotNull - public Optional getMaxQueryMemoryPerTask() - { - return maxQueryMemoryPerTask; - } - - @Config(QUERY_MAX_MEMORY_PER_TASK_CONFIG) - @LegacyConfig("query.max-total-memory-per-task") - @ConfigDescription("Sets memory limit enforced for a single task; there is no memory limit by default") - public NodeMemoryConfig setMaxQueryMemoryPerTask(DataSize maxQueryMemoryPerTask) - { - this.maxQueryMemoryPerTask = Optional.ofNullable(maxQueryMemoryPerTask); - return this; - } - @NotNull public DataSize getHeapHeadroom() { diff --git a/core/trino-main/src/main/java/io/trino/memory/QueryContext.java b/core/trino-main/src/main/java/io/trino/memory/QueryContext.java index 3f374de27b57..778b1fb2671e 100644 --- a/core/trino-main/src/main/java/io/trino/memory/QueryContext.java +++ b/core/trino-main/src/main/java/io/trino/memory/QueryContext.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -74,8 +73,6 @@ public class QueryContext // TODO: This field should be final. However, due to the way QueryContext is constructed the memory limit is not known in advance @GuardedBy("this") private long maxUserMemory; - @GuardedBy("this") - private Optional maxTaskMemory; private final MemoryTrackingContext queryMemoryContext; private final MemoryPool memoryPool; @@ -86,7 +83,6 @@ public class QueryContext public QueryContext( QueryId queryId, DataSize maxUserMemory, - Optional maxTaskMemory, MemoryPool memoryPool, GcMonitor gcMonitor, Executor notificationExecutor, @@ -97,7 +93,6 @@ public QueryContext( this( queryId, maxUserMemory, - maxTaskMemory, memoryPool, GUARANTEED_MEMORY, gcMonitor, @@ -110,7 +105,6 @@ public QueryContext( public QueryContext( QueryId queryId, DataSize maxUserMemory, - Optional maxTaskMemory, MemoryPool memoryPool, long guaranteedMemory, GcMonitor gcMonitor, @@ -121,7 +115,6 @@ public QueryContext( { this.queryId = requireNonNull(queryId, "queryId is null"); this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes(); - this.maxTaskMemory = requireNonNull(maxTaskMemory, "maxTaskMemory is null"); this.memoryPool = requireNonNull(memoryPool, "memoryPool is null"); this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null"); this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); @@ -139,18 +132,16 @@ public boolean isMemoryLimitsInitialized() } // TODO: This method should be removed, and the correct limit set in the constructor. However, due to the way QueryContext is constructed the memory limit is not known in advance - public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory, Optional maxTaskMemory) + public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory) { checkArgument(maxUserMemory >= 0, "maxUserMemory must be >= 0, found: %s", maxUserMemory); if (resourceOverCommit) { // Allow the query to use the entire pool. This way the worker will kill the query, if it uses the entire local memory pool. // The coordinator will kill the query if the cluster runs out of memory. this.maxUserMemory = memoryPool.getMaxBytes(); - this.maxTaskMemory = Optional.empty(); // disabled } else { this.maxUserMemory = maxUserMemory; - this.maxTaskMemory = maxTaskMemory; } memoryLimitsInitialized = true; } @@ -260,8 +251,7 @@ public TaskContext addTaskContext( queryMemoryContext.newMemoryTrackingContext(), notifyStatusChanged, perOperatorCpuTimerEnabled, - cpuTimerEnabled, - maxTaskMemory); + cpuTimerEnabled); taskContexts.put(taskStateMachine.getTaskId(), taskContext); return taskContext; } diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskAllocationValidator.java b/core/trino-main/src/main/java/io/trino/operator/TaskAllocationValidator.java deleted file mode 100644 index 41edc9c6aa55..000000000000 --- a/core/trino-main/src/main/java/io/trino/operator/TaskAllocationValidator.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.operator; - -import io.airlift.units.DataSize; -import io.trino.memory.context.MemoryAllocationValidator; - -import javax.annotation.concurrent.GuardedBy; -import javax.annotation.concurrent.ThreadSafe; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.Map; - -import static com.google.common.base.Verify.verify; -import static com.google.common.collect.ImmutableMap.toImmutableMap; -import static io.airlift.units.DataSize.succinctBytes; -import static io.trino.ExceededMemoryLimitException.exceededTaskMemoryLimit; -import static java.lang.String.format; -import static java.util.Map.Entry.comparingByValue; -import static java.util.Objects.requireNonNull; - -@ThreadSafe -// Keeps track of per-node memory usage of given task. Single instance is shared by multiple ValidatingLocalMemoryContext instances -// originating from single ValidatingAggregateContext. -public class TaskAllocationValidator - implements MemoryAllocationValidator -{ - private final long limitBytes; - @GuardedBy("this") - private long usedBytes; - @GuardedBy("this") - private final Map taggedAllocations = new HashMap<>(); - - public TaskAllocationValidator(DataSize memoryLimit) - { - this.limitBytes = requireNonNull(memoryLimit, "memoryLimit is null").toBytes(); - } - - @Override - public synchronized void reserveMemory(String allocationTag, long delta) - { - if (usedBytes + delta > limitBytes) { - verify(delta > 0, "exceeded limit with negative delta (%s); usedBytes=%s, limitBytes=%s", delta, usedBytes, limitBytes); - raiseLimitExceededFailure(allocationTag, delta); - } - usedBytes += delta; - taggedAllocations.merge(allocationTag, delta, Long::sum); - } - - private synchronized void raiseLimitExceededFailure(String currentAllocationTag, long currentAllocationDelta) - { - Map tmpTaggedAllocations = new HashMap<>(taggedAllocations); - // include current allocation in the output of top-consumers - tmpTaggedAllocations.merge(currentAllocationTag, currentAllocationDelta, Long::sum); - String topConsumers = tmpTaggedAllocations.entrySet().stream() - .sorted(comparingByValue(Comparator.reverseOrder())) - .limit(3) - .filter(e -> e.getValue() >= 0) - .collect(toImmutableMap(Map.Entry::getKey, e -> succinctBytes(e.getValue()))) - .toString(); - - String additionalInfo = format("Allocated: %s, Delta: %s, Top Consumers: %s", succinctBytes(usedBytes), succinctBytes(currentAllocationDelta), topConsumers); - throw exceededTaskMemoryLimit(DataSize.succinctBytes(limitBytes), additionalInfo); - } - - @Override - public synchronized boolean tryReserveMemory(String allocationTag, long delta) - { - if (usedBytes + delta > limitBytes) { - verify(delta > 0, "exceeded limit with negative delta (%s); usedBytes=%s, limitBytes=%s", delta, usedBytes, limitBytes); - return false; - } - usedBytes += delta; - return true; - } -} diff --git a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java index d24964f1b305..ed5005aed39c 100644 --- a/core/trino-main/src/main/java/io/trino/operator/TaskContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/TaskContext.java @@ -33,9 +33,7 @@ import io.trino.memory.QueryContext; import io.trino.memory.QueryContextVisitor; import io.trino.memory.context.LocalMemoryContext; -import io.trino.memory.context.MemoryAllocationValidator; import io.trino.memory.context.MemoryTrackingContext; -import io.trino.memory.context.ValidatingAggregateContext; import io.trino.spi.predicate.Domain; import io.trino.sql.planner.LocalDynamicFiltersCollector; import io.trino.sql.planner.plan.DynamicFilterId; @@ -46,7 +44,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; @@ -122,8 +119,7 @@ public static TaskContext createTaskContext( MemoryTrackingContext taskMemoryContext, Runnable notifyStatusChanged, boolean perOperatorCpuTimerEnabled, - boolean cpuTimerEnabled, - Optional maxMemory) + boolean cpuTimerEnabled) { TaskContext taskContext = new TaskContext( queryContext, @@ -135,8 +131,7 @@ public static TaskContext createTaskContext( taskMemoryContext, notifyStatusChanged, perOperatorCpuTimerEnabled, - cpuTimerEnabled, - maxMemory); + cpuTimerEnabled); taskContext.initialize(); return taskContext; } @@ -151,8 +146,7 @@ private TaskContext( MemoryTrackingContext taskMemoryContext, Runnable notifyStatusChanged, boolean perOperatorCpuTimerEnabled, - boolean cpuTimerEnabled, - Optional maxMemory) + boolean cpuTimerEnabled) { this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null"); this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null"); @@ -160,17 +154,7 @@ private TaskContext( this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null"); this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null"); this.session = session; - - requireNonNull(taskMemoryContext, "taskMemoryContext is null"); - if (maxMemory.isPresent()) { - MemoryAllocationValidator memoryValidator = new TaskAllocationValidator(maxMemory.get()); - this.taskMemoryContext = new MemoryTrackingContext( - new ValidatingAggregateContext(taskMemoryContext.aggregateUserMemoryContext(), memoryValidator), - taskMemoryContext.aggregateRevocableMemoryContext()); - } - else { - this.taskMemoryContext = taskMemoryContext; - } + this.taskMemoryContext = requireNonNull(taskMemoryContext, "taskMemoryContext is null"); // Initialize the local memory contexts with the LazyOutputBuffer tag as LazyOutputBuffer will do the local memory allocations this.taskMemoryContext.initializeLocalMemoryContexts(LazyOutputBuffer.class.getSimpleName()); diff --git a/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java b/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java index 6d3b33162a9e..6ab3dfdea00b 100644 --- a/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java +++ b/core/trino-main/src/main/java/io/trino/testing/TestingTaskContext.java @@ -26,7 +26,6 @@ import io.trino.spi.QueryId; import io.trino.spiller.SpillSpaceTracker; -import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; @@ -142,7 +141,6 @@ public TaskContext build() QueryContext queryContext = new QueryContext( queryId, queryMaxMemory, - Optional.empty(), memoryPool, 0L, GC_MONITOR, diff --git a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java index 2108fe6e2f57..fb1e31d69e34 100644 --- a/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java +++ b/core/trino-main/src/test/java/io/trino/execution/MockRemoteTaskFactory.java @@ -196,7 +196,6 @@ public MockRemoteTask( SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(DataSize.of(1, GIGABYTE)); QueryContext queryContext = new QueryContext(taskId.getQueryId(), DataSize.of(1, MEGABYTE), - Optional.empty(), memoryPool, new TestingGcMonitor(), executor, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java index c54a5614a920..31932e6c88e2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestMemoryRevokingScheduler.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ScheduledExecutorService; @@ -278,7 +277,6 @@ private QueryContext getOrCreateQueryContext(QueryId queryId) { return queryContexts.computeIfAbsent(queryId, id -> new QueryContext(id, DataSize.of(1, MEGABYTE), - Optional.empty(), memoryPool, new TestingGcMonitor(), executor, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java index f6db181ccf07..d300aa7e516a 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTask.java @@ -346,7 +346,6 @@ private SqlTask createInitialTask() QueryContext queryContext = new QueryContext(new QueryId("query"), DataSize.of(1, MEGABYTE), - Optional.empty(), new MemoryPool(DataSize.of(1, GIGABYTE)), new TestingGcMonitor(), taskNotificationExecutor, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java index 2e6f91ec2728..d9402ccfa50c 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestSqlTaskExecution.java @@ -596,7 +596,6 @@ private TaskContext newTestingTaskContext(ScheduledExecutorService taskNotificat QueryContext queryContext = new QueryContext( new QueryId("queryid"), DataSize.of(1, MEGABYTE), - Optional.empty(), new MemoryPool(DataSize.of(1, GIGABYTE)), new TestingGcMonitor(), taskNotificationExecutor, diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java index da190489cc97..cb6b1fd05761 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryPools.java @@ -40,7 +40,6 @@ import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -94,7 +93,6 @@ private void setUp(Supplier> driversSupplier) SpillSpaceTracker spillSpaceTracker = new SpillSpaceTracker(DataSize.of(1, GIGABYTE)); QueryContext queryContext = new QueryContext(new QueryId("query"), TEN_MEGABYTES, - Optional.empty(), userPool, new TestingGcMonitor(), localQueryRunner.getExecutor(), diff --git a/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java b/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java index 331b5af2705b..8e2dc2744227 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestMemoryTracking.java @@ -38,7 +38,6 @@ import org.testng.annotations.Test; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; @@ -93,17 +92,11 @@ public void tearDown() @BeforeMethod public void setUpTest() - { - setupTestWithLimits(queryMaxMemory, Optional.empty()); - } - - private void setupTestWithLimits(DataSize queryMaxMemory, Optional queryMaxTaskMemory) { memoryPool = new MemoryPool(memoryPoolSize); queryContext = new QueryContext( new QueryId("test_query"), queryMaxMemory, - queryMaxTaskMemory, memoryPool, new TestingGcMonitor(), notificationExecutor, @@ -156,21 +149,6 @@ public void testLocalTotalMemoryLimitExceeded() .hasMessage("Query exceeded per-node memory limit of %1$s [Allocated: %1$s, Delta: 1B, Top Consumers: {test=%1$s}]", queryMaxMemory); } - @Test - public void testTaskMemoryLimitExceeded() - { - DataSize taskMaxMemory = DataSize.of(1, GIGABYTE); - setupTestWithLimits(DataSize.of(2, GIGABYTE), Optional.of(taskMaxMemory)); - LocalMemoryContext memoryContext = operatorContext.newLocalUserMemoryContext("test"); - memoryContext.setBytes(100); - assertOperatorMemoryAllocations(operatorContext.getOperatorMemoryContext(), 100, 0); - memoryContext.setBytes(taskMaxMemory.toBytes()); - assertOperatorMemoryAllocations(operatorContext.getOperatorMemoryContext(), taskMaxMemory.toBytes(), 0); - assertThatThrownBy(() -> memoryContext.setBytes(taskMaxMemory.toBytes() + 1)) - .isInstanceOf(ExceededMemoryLimitException.class) - .hasMessage("Query exceeded per-task memory limit of %1$s [Allocated: %s, Delta: 1B, Top Consumers: {test=%s}]", taskMaxMemory, DataSize.succinctBytes(taskMaxMemory.toBytes() + 1)); - } - @Test public void testLocalAllocations() { diff --git a/core/trino-main/src/test/java/io/trino/memory/TestNodeMemoryConfig.java b/core/trino-main/src/test/java/io/trino/memory/TestNodeMemoryConfig.java index 87499c756ee6..9e1177efb0e8 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestNodeMemoryConfig.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestNodeMemoryConfig.java @@ -23,7 +23,6 @@ import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.airlift.units.DataSize.Unit.GIGABYTE; -import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.memory.NodeMemoryConfig.AVAILABLE_HEAP_MEMORY; public class TestNodeMemoryConfig @@ -33,7 +32,6 @@ public void testDefaults() { assertRecordedDefaults(recordDefaults(NodeMemoryConfig.class) .setMaxQueryMemoryPerNode(DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3))) - .setMaxQueryMemoryPerTask(null) .setHeapHeadroom(DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3)))); } @@ -42,13 +40,11 @@ public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() .put("query.max-memory-per-node", "1GB") - .put("query.max-memory-per-task", "200MB") .put("memory.heap-headroom-per-node", "1GB") .buildOrThrow(); NodeMemoryConfig expected = new NodeMemoryConfig() .setMaxQueryMemoryPerNode(DataSize.of(1, GIGABYTE)) - .setMaxQueryMemoryPerTask(DataSize.of(200, MEGABYTE)) .setHeapHeadroom(DataSize.of(1, GIGABYTE)); assertFullMapping(properties, expected); diff --git a/core/trino-main/src/test/java/io/trino/memory/TestQueryContext.java b/core/trino-main/src/test/java/io/trino/memory/TestQueryContext.java index 84e96f13ab13..122117ef056c 100644 --- a/core/trino-main/src/test/java/io/trino/memory/TestQueryContext.java +++ b/core/trino-main/src/test/java/io/trino/memory/TestQueryContext.java @@ -22,7 +22,6 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.Test; -import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import static io.airlift.concurrent.Threads.threadsNamed; @@ -47,7 +46,6 @@ public void testSetMemoryPool() QueryContext queryContext = new QueryContext( new QueryId("query"), DataSize.ofBytes(10), - Optional.empty(), new MemoryPool(DataSize.ofBytes(10)), new TestingGcMonitor(), localQueryRunner.getExecutor(), diff --git a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java index 3165b6647886..681e9d44eb02 100644 --- a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java +++ b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java @@ -26,7 +26,6 @@ import java.util.LinkedList; import java.util.List; -import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Function; @@ -82,7 +81,6 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List< QueryContext queryContext = new QueryContext( queryId, DataSize.of(512, MEGABYTE), - Optional.empty(), memoryPool, new TestingGcMonitor(), EXECUTOR, diff --git a/core/trino-spi/src/main/java/io/trino/spi/StandardErrorCode.java b/core/trino-spi/src/main/java/io/trino/spi/StandardErrorCode.java index 20c45fa9c4cc..59ae471582b4 100644 --- a/core/trino-spi/src/main/java/io/trino/spi/StandardErrorCode.java +++ b/core/trino-spi/src/main/java/io/trino/spi/StandardErrorCode.java @@ -170,8 +170,7 @@ public enum StandardErrorCode EXCEEDED_LOCAL_MEMORY_LIMIT(131079, INSUFFICIENT_RESOURCES), ADMINISTRATIVELY_PREEMPTED(131080, INSUFFICIENT_RESOURCES), EXCEEDED_SCAN_LIMIT(131081, INSUFFICIENT_RESOURCES), - EXCEEDED_TASK_MEMORY_LIMIT(131082, INSUFFICIENT_RESOURCES), - EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY(131083, INSUFFICIENT_RESOURCES), + EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY(131082, INSUFFICIENT_RESOURCES), /**/; diff --git a/docs/src/main/sphinx/admin/properties-resource-management.rst b/docs/src/main/sphinx/admin/properties-resource-management.rst index f831ff093787..ccbb9be4bc59 100644 --- a/docs/src/main/sphinx/admin/properties-resource-management.rst +++ b/docs/src/main/sphinx/admin/properties-resource-management.rst @@ -48,16 +48,6 @@ including revocable memory. When the memory allocated by a query across all workers hits this limit it is killed. The value of ``query.max-total-memory`` must be greater than ``query.max-memory``. -``query.max-memory-per-task`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -* **Type:** :ref:`prop-type-data-size` -* **Default value:** none, and therefore unrestricted -* **Session property:** ``query_max_total_memory_per_task`` - -This is the max amount of the memory a task can use on a node in the -cluster. Support for using this property is experimental only. - ``memory.heap-headroom-per-node`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/lib/trino-memory-context/pom.xml b/lib/trino-memory-context/pom.xml index f901bc0bb1a1..6e0523822ae8 100644 --- a/lib/trino-memory-context/pom.xml +++ b/lib/trino-memory-context/pom.xml @@ -18,11 +18,6 @@ - - io.airlift - log - - io.airlift units diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/MemoryAllocationValidator.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/MemoryAllocationValidator.java deleted file mode 100644 index 316a2a0b7b16..000000000000 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/MemoryAllocationValidator.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.memory.context; - -public interface MemoryAllocationValidator -{ - MemoryAllocationValidator NO_MEMORY_VALIDATION = new MemoryAllocationValidator() { - @Override - public void reserveMemory(String allocationTag, long delta) {} - - @Override - public boolean tryReserveMemory(String allocationTag, long delta) - { - return true; - } - }; - - /** - * Check if memory can be reserved. Account for reserved memory if reservation is possible. Throw exception otherwise. - */ - void reserveMemory(String allocationTag, long delta); - - /** - * Check if memory can be reserved. Account for reserved memory if reservation is possible and return true. Return false otherwise. - */ - boolean tryReserveMemory(String allocationTag, long delta); -} diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingAggregateContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingAggregateContext.java deleted file mode 100644 index e9d854528cf9..000000000000 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingAggregateContext.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.memory.context; - -import static java.util.Objects.requireNonNull; - -public class ValidatingAggregateContext - implements AggregatedMemoryContext -{ - private final AggregatedMemoryContext delegate; - private final MemoryAllocationValidator memoryValidator; - - public ValidatingAggregateContext(AggregatedMemoryContext delegate, MemoryAllocationValidator memoryValidator) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - this.memoryValidator = requireNonNull(memoryValidator, "memoryValidator is null"); - } - - @Override - public AggregatedMemoryContext newAggregatedMemoryContext() - { - return new ValidatingAggregateContext(delegate.newAggregatedMemoryContext(), memoryValidator); - } - - @Override - public LocalMemoryContext newLocalMemoryContext(String allocationTag) - { - return new ValidatingLocalMemoryContext(delegate.newLocalMemoryContext(allocationTag), allocationTag, memoryValidator); - } - - @Override - public long getBytes() - { - return delegate.getBytes(); - } - - @Override - public void close() - { - delegate.close(); - } -} diff --git a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingLocalMemoryContext.java b/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingLocalMemoryContext.java deleted file mode 100644 index be2ee36f097f..000000000000 --- a/lib/trino-memory-context/src/main/java/io/trino/memory/context/ValidatingLocalMemoryContext.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.trino.memory.context; - -import com.google.common.util.concurrent.ListenableFuture; -import io.airlift.log.Logger; - -import static java.util.Objects.requireNonNull; - -public class ValidatingLocalMemoryContext - implements LocalMemoryContext -{ - private static final Logger log = Logger.get(ValidatingLocalMemoryContext.class); - - private final LocalMemoryContext delegate; - private final String allocationTag; - private final MemoryAllocationValidator memoryValidator; - - public ValidatingLocalMemoryContext(LocalMemoryContext delegate, String allocationTag, MemoryAllocationValidator memoryValidator) - { - this.delegate = requireNonNull(delegate, "delegate is null"); - this.allocationTag = requireNonNull(allocationTag, "allocationTag is null"); - this.memoryValidator = requireNonNull(memoryValidator, "memoryValidator is null"); - } - - @Override - public long getBytes() - { - return delegate.getBytes(); - } - - @Override - public ListenableFuture setBytes(long bytes) - { - long delta = bytes - delegate.getBytes(); - - // first consult validator if allocation is possible - memoryValidator.reserveMemory(allocationTag, delta); - - // update the parent before updating usedBytes as it may throw a runtime exception (e.g., ExceededMemoryLimitException) - try { - // do actual allocation - return delegate.setBytes(bytes); - } - catch (Exception e) { - revertReservationInValidatorSuppressing(allocationTag, delta, e); - throw e; - } - } - - @Override - public boolean trySetBytes(long bytes) - { - long delta = bytes - delegate.getBytes(); - - if (!memoryValidator.tryReserveMemory(allocationTag, delta)) { - return false; - } - - try { - if (delegate.trySetBytes(bytes)) { - return true; - } - } - catch (Exception e) { - revertReservationInValidatorSuppressing(allocationTag, delta, e); - throw e; - } - - revertReservationInValidator(allocationTag, delta); - return false; - } - - @Override - public void close() - { - delegate.close(); - } - - private void revertReservationInValidatorSuppressing(String allocationTag, long delta, Exception revertCause) - { - try { - revertReservationInValidator(allocationTag, delta); - } - catch (Exception suppressed) { - log.warn(suppressed, "Could not rollback memory reservation within allocation validator"); - if (suppressed != revertCause) { - revertCause.addSuppressed(suppressed); - } - } - } - - private void revertReservationInValidator(String allocationTag, long delta) - { - memoryValidator.reserveMemory(allocationTag, -delta); - } -} diff --git a/lib/trino-memory-context/src/test/java/io/trino/memory/context/TestMemoryContexts.java b/lib/trino-memory-context/src/test/java/io/trino/memory/context/TestMemoryContexts.java index 4b758cc61b6a..df54ce687e1e 100644 --- a/lib/trino-memory-context/src/test/java/io/trino/memory/context/TestMemoryContexts.java +++ b/lib/trino-memory-context/src/test/java/io/trino/memory/context/TestMemoryContexts.java @@ -22,7 +22,6 @@ import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.memory.context.AggregatedMemoryContext.newRootAggregatedMemoryContext; import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotEquals; @@ -165,135 +164,16 @@ public void testClosedAggregateMemoryContext() localContext.setBytes(100); } - @Test - public void testValidatingAggregateContext() - { - TestMemoryReservationHandler reservationHandler = new TestMemoryReservationHandler(1_000, true); - AggregatedMemoryContext rootContext = newRootAggregatedMemoryContext(reservationHandler, GUARANTEED_MEMORY); - - AggregatedMemoryContext childContext = new ValidatingAggregateContext(rootContext, new TestAllocationValidator(500)); - - LocalMemoryContext localContext = childContext.newLocalMemoryContext("test"); - - assertEquals(localContext.setBytes(500), NOT_BLOCKED); - assertEquals(localContext.getBytes(), 500); - assertEquals(rootContext.getBytes(), 500); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // reserve above validator limit - assertThatThrownBy(() -> localContext.setBytes(501)).hasMessage("limit exceeded"); - assertEquals(localContext.getBytes(), 500); - assertEquals(rootContext.getBytes(), 500); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // try reserve above validator limit - assertFalse(localContext.trySetBytes(501)); - assertEquals(localContext.getBytes(), 500); - assertEquals(rootContext.getBytes(), 500); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // unreserve a bit - assertEquals(localContext.setBytes(400), NOT_BLOCKED); - assertEquals(localContext.getBytes(), 400); - assertEquals(rootContext.getBytes(), 400); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // unreserve a bit using trySetBytes - assertTrue(localContext.trySetBytes(300)); - assertEquals(localContext.getBytes(), 300); - assertEquals(rootContext.getBytes(), 300); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // another context based directly on rootContext - LocalMemoryContext anotherLocalContext = rootContext.newLocalMemoryContext("another"); - - assertEquals(anotherLocalContext.setBytes(650), NOT_BLOCKED); - // total reservation is 950 at root level now - assertEquals(localContext.getBytes(), 300); - assertEquals(anotherLocalContext.getBytes(), 650); - assertEquals(rootContext.getBytes(), 950); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // exceed root context limit but be within validator boundaries - assertThatThrownBy(() -> localContext.setBytes(400)).hasMessage("out of memory"); - assertEquals(localContext.getBytes(), 300); - assertEquals(anotherLocalContext.getBytes(), 650); - assertEquals(rootContext.getBytes(), 950); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // exceed root context limit but be within validator boundaries using trySetBytes - assertFalse(localContext.trySetBytes(400)); - assertEquals(localContext.getBytes(), 300); - assertEquals(anotherLocalContext.getBytes(), 650); - assertEquals(rootContext.getBytes(), 950); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // if we free space in root context we can still allocate up to validator imposed limit - assertEquals(anotherLocalContext.setBytes(499), NOT_BLOCKED); - - // reserve using setBytes - assertEquals(localContext.setBytes(400), NOT_BLOCKED); - assertEquals(localContext.getBytes(), 400); - assertEquals(anotherLocalContext.getBytes(), 499); - assertEquals(rootContext.getBytes(), 899); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - - // reserve using trySetBytes - assertEquals(localContext.setBytes(500), NOT_BLOCKED); - assertEquals(localContext.getBytes(), 500); - assertEquals(anotherLocalContext.getBytes(), 499); - assertEquals(rootContext.getBytes(), 999); - assertEquals(reservationHandler.getReservation(), rootContext.getBytes()); - } - - private static class TestAllocationValidator - implements MemoryAllocationValidator - { - private final long limit; - private long reserved; - - public TestAllocationValidator(long limit) - { - this.limit = limit; - } - - @Override - public void reserveMemory(String allocationTag, long delta) - { - if (reserved + delta > limit) { - throw new IllegalArgumentException("limit exceeded"); - } - reserved = reserved + delta; - } - - @Override - public boolean tryReserveMemory(String allocationTag, long delta) - { - if (reserved + delta > limit) { - return false; - } - reserved = reserved + delta; - return true; - } - } - private static class TestMemoryReservationHandler implements MemoryReservationHandler { private long reservation; private final long maxMemory; - private final boolean throwWhenExceeded; private SettableFuture future; public TestMemoryReservationHandler(long maxMemory) - { - this(maxMemory, false); - } - - public TestMemoryReservationHandler(long maxMemory, boolean throwWhenExceeded) { this.maxMemory = maxMemory; - this.throwWhenExceeded = throwWhenExceeded; } public long getReservation() @@ -304,9 +184,6 @@ public long getReservation() @Override public ListenableFuture reserveMemory(String allocationTag, long delta) { - if (delta > 0 && reservation + delta > maxMemory && throwWhenExceeded) { - throw new IllegalStateException("out of memory"); - } reservation += delta; if (delta >= 0) { if (reservation >= maxMemory) { diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java index 9b20bfa9ba5a..65a580694784 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/AbstractOperatorBenchmark.java @@ -300,7 +300,6 @@ protected Map runOnce() TaskContext taskContext = new QueryContext( new QueryId("test"), DataSize.of(256, MEGABYTE), - Optional.empty(), memoryPool, new TestingGcMonitor(), localQueryRunner.getExecutor(), diff --git a/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java b/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java index fe8d5607e6f5..488e51a1c3fd 100644 --- a/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java +++ b/testing/trino-benchmark/src/test/java/io/trino/benchmark/MemoryLocalQueryRunner.java @@ -72,7 +72,6 @@ public List execute(@Language("SQL") String query) QueryContext queryContext = new QueryContext( new QueryId("test"), DataSize.of(1, GIGABYTE), - Optional.empty(), memoryPool, new TestingGcMonitor(), localQueryRunner.getExecutor(),