Skip to content

Commit

Permalink
Drop mechanism of per memory task limit
Browse files Browse the repository at this point in the history
It turned out we do not need that functionality after all for
task-level retries. Removing as currently we do not see the benefit of the
mechanism and it increases complexity.
  • Loading branch information
losipiuk committed Mar 10, 2022
1 parent 9d3783d commit abb4c03
Show file tree
Hide file tree
Showing 26 changed files with 10 additions and 552 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize
format("Query exceeded per-node memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededTaskMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-task memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
{
super(errorCode, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ public final class SystemSessionProperties
public static final String ENABLE_COORDINATOR_DYNAMIC_FILTERS_DISTRIBUTION = "enable_coordinator_dynamic_filters_distribution";
public static final String ENABLE_LARGE_DYNAMIC_FILTERS = "enable_large_dynamic_filters";
public static final String QUERY_MAX_MEMORY_PER_NODE = "query_max_memory_per_node";
public static final String QUERY_MAX_MEMORY_PER_TASK = "query_max_memory_per_task";
public static final String IGNORE_DOWNSTREAM_PREFERENCES = "ignore_downstream_preferences";
public static final String FILTERING_SEMI_JOIN_TO_INNER = "rewrite_filtering_semi_join_to_inner_join";
public static final String OPTIMIZE_DUPLICATE_INSENSITIVE_JOINS = "optimize_duplicate_insensitive_joins";
Expand Down Expand Up @@ -597,11 +596,6 @@ public SystemSessionProperties(
"Maximum amount of memory a query can use per node",
nodeMemoryConfig.getMaxQueryMemoryPerNode(),
true),
dataSizeProperty(
QUERY_MAX_MEMORY_PER_TASK,
"Maximum amount of memory a single task can use",
nodeMemoryConfig.getMaxQueryMemoryPerTask().orElse(null),
true),
booleanProperty(
IGNORE_DOWNSTREAM_PREFERENCES,
"Ignore Parent's PreferredProperties in AddExchange optimizer",
Expand Down Expand Up @@ -1178,11 +1172,6 @@ public static DataSize getQueryMaxMemoryPerNode(Session session)
return session.getSystemProperty(QUERY_MAX_MEMORY_PER_NODE, DataSize.class);
}

public static Optional<DataSize> getQueryMaxMemoryPerTask(Session session)
{
return Optional.ofNullable(session.getSystemProperty(QUERY_MAX_MEMORY_PER_TASK, DataSize.class));
}

public static boolean ignoreDownStreamPreferences(Session session)
{
return session.getSystemProperty(IGNORE_DOWNSTREAM_PREFERENCES, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.concurrent.Threads.threadsNamed;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerNode;
import static io.trino.SystemSessionProperties.getQueryMaxMemoryPerTask;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
Expand Down Expand Up @@ -105,7 +104,6 @@ public class SqlTaskManager
private final SqlTaskIoStats finishedTaskStats = new SqlTaskIoStats();

private final long queryMaxMemoryPerNode;
private final Optional<DataSize> queryMaxMemoryPerTask;

private final CounterStat failedTasks = new CounterStat();

Expand Down Expand Up @@ -144,13 +142,12 @@ public SqlTaskManager(
SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, config);

DataSize maxQueryMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
queryMaxMemoryPerTask = nodeMemoryConfig.getMaxQueryMemoryPerTask();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();

queryMaxMemoryPerNode = maxQueryMemoryPerNode.toBytes();

queryContexts = buildNonEvictableCache(CacheBuilder.newBuilder().weakValues(), CacheLoader.from(
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, queryMaxMemoryPerTask, maxQuerySpillPerNode)));
queryId -> createQueryContext(queryId, localMemoryManager, localSpillManager, gcMonitor, maxQueryMemoryPerNode, maxQuerySpillPerNode)));

tasks = buildNonEvictableCache(CacheBuilder.newBuilder(), CacheLoader.from(
taskId -> createSqlTask(
Expand All @@ -173,13 +170,11 @@ private QueryContext createQueryContext(
LocalSpillManager localSpillManager,
GcMonitor gcMonitor,
DataSize maxQueryUserMemoryPerNode,
Optional<DataSize> maxQueryMemoryPerTask,
DataSize maxQuerySpillPerNode)
{
return new QueryContext(
queryId,
maxQueryUserMemoryPerNode,
maxQueryMemoryPerTask,
localMemoryManager.getMemoryPool(),
gcMonitor,
taskNotificationExecutor,
Expand Down Expand Up @@ -401,17 +396,10 @@ private TaskInfo doUpdateTask(
if (!queryContext.isMemoryLimitsInitialized()) {
long sessionQueryMaxMemoryPerNode = getQueryMaxMemoryPerNode(session).toBytes();

Optional<DataSize> effectiveQueryMaxMemoryPerTask = getQueryMaxMemoryPerTask(session);
if (queryMaxMemoryPerTask.isPresent() &&
(effectiveQueryMaxMemoryPerTask.isEmpty() || effectiveQueryMaxMemoryPerTask.get().toBytes() > queryMaxMemoryPerTask.get().toBytes())) {
effectiveQueryMaxMemoryPerTask = queryMaxMemoryPerTask;
}

// Session properties are only allowed to decrease memory limits, not increase them
queryContext.initializeMemoryLimits(
resourceOvercommit(session),
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode),
effectiveQueryMaxMemoryPerTask);
min(sessionQueryMaxMemoryPerNode, queryMaxMemoryPerNode));
}

sqlTask.recordHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,25 @@
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;

import javax.validation.constraints.NotNull;

import java.util.Optional;

// This is separate from MemoryManagerConfig because it's difficult to test the default value of maxQueryMemoryPerNode
@DefunctConfig({
"deprecated.legacy-system-pool-enabled",
"experimental.reserved-pool-disabled",
"experimental.reserved-pool-enabled",
"query.max-total-memory-per-node",
"query.max-memory-per-task"
})
public class NodeMemoryConfig
{
public static final long AVAILABLE_HEAP_MEMORY = Runtime.getRuntime().maxMemory();
public static final String QUERY_MAX_MEMORY_PER_NODE_CONFIG = "query.max-memory-per-node";
public static final String QUERY_MAX_MEMORY_PER_TASK_CONFIG = "query.max-memory-per-task";

private DataSize maxQueryMemoryPerNode = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3));

private Optional<DataSize> maxQueryMemoryPerTask = Optional.empty();

private DataSize heapHeadroom = DataSize.ofBytes(Math.round(AVAILABLE_HEAP_MEMORY * 0.3));

@NotNull
Expand All @@ -55,21 +50,6 @@ public NodeMemoryConfig setMaxQueryMemoryPerNode(DataSize maxQueryMemoryPerNode)
return this;
}

@NotNull
public Optional<DataSize> getMaxQueryMemoryPerTask()
{
return maxQueryMemoryPerTask;
}

@Config(QUERY_MAX_MEMORY_PER_TASK_CONFIG)
@LegacyConfig("query.max-total-memory-per-task")
@ConfigDescription("Sets memory limit enforced for a single task; there is no memory limit by default")
public NodeMemoryConfig setMaxQueryMemoryPerTask(DataSize maxQueryMemoryPerTask)
{
this.maxQueryMemoryPerTask = Optional.ofNullable(maxQueryMemoryPerTask);
return this;
}

@NotNull
public DataSize getHeapHeadroom()
{
Expand Down
14 changes: 2 additions & 12 deletions core/trino-main/src/main/java/io/trino/memory/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -74,8 +73,6 @@ public class QueryContext
// TODO: This field should be final. However, due to the way QueryContext is constructed the memory limit is not known in advance
@GuardedBy("this")
private long maxUserMemory;
@GuardedBy("this")
private Optional<DataSize> maxTaskMemory;

private final MemoryTrackingContext queryMemoryContext;
private final MemoryPool memoryPool;
Expand All @@ -86,7 +83,6 @@ public class QueryContext
public QueryContext(
QueryId queryId,
DataSize maxUserMemory,
Optional<DataSize> maxTaskMemory,
MemoryPool memoryPool,
GcMonitor gcMonitor,
Executor notificationExecutor,
Expand All @@ -97,7 +93,6 @@ public QueryContext(
this(
queryId,
maxUserMemory,
maxTaskMemory,
memoryPool,
GUARANTEED_MEMORY,
gcMonitor,
Expand All @@ -110,7 +105,6 @@ public QueryContext(
public QueryContext(
QueryId queryId,
DataSize maxUserMemory,
Optional<DataSize> maxTaskMemory,
MemoryPool memoryPool,
long guaranteedMemory,
GcMonitor gcMonitor,
Expand All @@ -121,7 +115,6 @@ public QueryContext(
{
this.queryId = requireNonNull(queryId, "queryId is null");
this.maxUserMemory = requireNonNull(maxUserMemory, "maxUserMemory is null").toBytes();
this.maxTaskMemory = requireNonNull(maxTaskMemory, "maxTaskMemory is null");
this.memoryPool = requireNonNull(memoryPool, "memoryPool is null");
this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
Expand All @@ -139,18 +132,16 @@ public boolean isMemoryLimitsInitialized()
}

// TODO: This method should be removed, and the correct limit set in the constructor. However, due to the way QueryContext is constructed the memory limit is not known in advance
public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory, Optional<DataSize> maxTaskMemory)
public synchronized void initializeMemoryLimits(boolean resourceOverCommit, long maxUserMemory)
{
checkArgument(maxUserMemory >= 0, "maxUserMemory must be >= 0, found: %s", maxUserMemory);
if (resourceOverCommit) {
// Allow the query to use the entire pool. This way the worker will kill the query, if it uses the entire local memory pool.
// The coordinator will kill the query if the cluster runs out of memory.
this.maxUserMemory = memoryPool.getMaxBytes();
this.maxTaskMemory = Optional.empty(); // disabled
}
else {
this.maxUserMemory = maxUserMemory;
this.maxTaskMemory = maxTaskMemory;
}
memoryLimitsInitialized = true;
}
Expand Down Expand Up @@ -260,8 +251,7 @@ public TaskContext addTaskContext(
queryMemoryContext.newMemoryTrackingContext(),
notifyStatusChanged,
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
maxTaskMemory);
cpuTimerEnabled);
taskContexts.put(taskStateMachine.getTaskId(), taskContext);
return taskContext;
}
Expand Down

This file was deleted.

24 changes: 4 additions & 20 deletions core/trino-main/src/main/java/io/trino/operator/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
import io.trino.memory.QueryContext;
import io.trino.memory.QueryContextVisitor;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryAllocationValidator;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.memory.context.ValidatingAggregateContext;
import io.trino.spi.predicate.Domain;
import io.trino.sql.planner.LocalDynamicFiltersCollector;
import io.trino.sql.planner.plan.DynamicFilterId;
Expand All @@ -46,7 +44,6 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -122,8 +119,7 @@ public static TaskContext createTaskContext(
MemoryTrackingContext taskMemoryContext,
Runnable notifyStatusChanged,
boolean perOperatorCpuTimerEnabled,
boolean cpuTimerEnabled,
Optional<DataSize> maxMemory)
boolean cpuTimerEnabled)
{
TaskContext taskContext = new TaskContext(
queryContext,
Expand All @@ -135,8 +131,7 @@ public static TaskContext createTaskContext(
taskMemoryContext,
notifyStatusChanged,
perOperatorCpuTimerEnabled,
cpuTimerEnabled,
maxMemory);
cpuTimerEnabled);
taskContext.initialize();
return taskContext;
}
Expand All @@ -151,26 +146,15 @@ private TaskContext(
MemoryTrackingContext taskMemoryContext,
Runnable notifyStatusChanged,
boolean perOperatorCpuTimerEnabled,
boolean cpuTimerEnabled,
Optional<DataSize> maxMemory)
boolean cpuTimerEnabled)
{
this.taskStateMachine = requireNonNull(taskStateMachine, "taskStateMachine is null");
this.gcMonitor = requireNonNull(gcMonitor, "gcMonitor is null");
this.queryContext = requireNonNull(queryContext, "queryContext is null");
this.notificationExecutor = requireNonNull(notificationExecutor, "notificationExecutor is null");
this.yieldExecutor = requireNonNull(yieldExecutor, "yieldExecutor is null");
this.session = session;

requireNonNull(taskMemoryContext, "taskMemoryContext is null");
if (maxMemory.isPresent()) {
MemoryAllocationValidator memoryValidator = new TaskAllocationValidator(maxMemory.get());
this.taskMemoryContext = new MemoryTrackingContext(
new ValidatingAggregateContext(taskMemoryContext.aggregateUserMemoryContext(), memoryValidator),
taskMemoryContext.aggregateRevocableMemoryContext());
}
else {
this.taskMemoryContext = taskMemoryContext;
}
this.taskMemoryContext = requireNonNull(taskMemoryContext, "taskMemoryContext is null");

// Initialize the local memory contexts with the LazyOutputBuffer tag as LazyOutputBuffer will do the local memory allocations
this.taskMemoryContext.initializeLocalMemoryContexts(LazyOutputBuffer.class.getSimpleName());
Expand Down
Loading

0 comments on commit abb4c03

Please sign in to comment.