Skip to content

Commit

Permalink
Remove support for reserved memory pool
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Feb 18, 2022
1 parent ec6553e commit fc01273
Show file tree
Hide file tree
Showing 20 changed files with 54 additions and 573 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ private static List<MemoryPool> getMemoryPools(LocalMemoryManager localMemoryMan
requireNonNull(localMemoryManager, "localMemoryManager cannot be null");
ImmutableList.Builder<MemoryPool> builder = new ImmutableList.Builder<>();
builder.add(localMemoryManager.getGeneralPool());
localMemoryManager.getReservedPool().ifPresent(builder::add);
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
import io.trino.execution.executor.TaskExecutor;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.MemoryPool;
import io.trino.memory.MemoryPoolAssignment;
import io.trino.memory.MemoryPoolAssignmentsRequest;
import io.trino.memory.NodeMemoryConfig;
import io.trino.memory.QueryContext;
import io.trino.spi.QueryId;
Expand All @@ -57,7 +54,6 @@

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.concurrent.GuardedBy;
import javax.inject.Inject;

import java.io.Closeable;
Expand All @@ -79,8 +75,6 @@
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
import static io.trino.memory.LocalMemoryManager.GENERAL_POOL;
import static io.trino.memory.LocalMemoryManager.RESERVED_POOL;
import static io.trino.spi.StandardErrorCode.ABANDONED_TASK;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static java.lang.Math.min;
Expand All @@ -104,7 +98,6 @@ public class SqlTaskManager
private final Duration infoCacheTime;
private final Duration clientTimeout;

private final LocalMemoryManager localMemoryManager;
private final NonEvictableLoadingCache<QueryId, QueryContext> queryContexts;
private final NonEvictableLoadingCache<TaskId, SqlTask> tasks;

Expand All @@ -114,11 +107,6 @@ public class SqlTaskManager
private final long queryMaxMemoryPerNode;
private final Optional<DataSize> queryMaxMemoryPerTask;

@GuardedBy("this")
private long currentMemoryPoolAssignmentVersion;
@GuardedBy("this")
private String coordinatorId;

private final CounterStat failedTasks = new CounterStat();

@Inject
Expand Down Expand Up @@ -155,7 +143,6 @@ public SqlTaskManager(

SqlTaskExecutionFactory sqlTaskExecutionFactory = new SqlTaskExecutionFactory(taskNotificationExecutor, taskExecutor, planner, splitMonitor, config);

this.localMemoryManager = requireNonNull(localMemoryManager, "localMemoryManager is null");
DataSize maxQueryMemoryPerNode = nodeMemoryConfig.getMaxQueryMemoryPerNode();
queryMaxMemoryPerTask = nodeMemoryConfig.getMaxQueryMemoryPerTask();
DataSize maxQuerySpillPerNode = nodeSpillConfig.getQueryMaxSpillPerNode();
Expand Down Expand Up @@ -201,33 +188,6 @@ private QueryContext createQueryContext(
localSpillManager.getSpillSpaceTracker());
}

@Override
public synchronized void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments)
{
if (coordinatorId != null && coordinatorId.equals(assignments.getCoordinatorId()) && assignments.getVersion() <= currentMemoryPoolAssignmentVersion) {
return;
}
currentMemoryPoolAssignmentVersion = assignments.getVersion();
if (coordinatorId != null && !coordinatorId.equals(assignments.getCoordinatorId())) {
log.warn("Switching coordinator affinity from %s to %s", coordinatorId, assignments.getCoordinatorId());
}
coordinatorId = assignments.getCoordinatorId();

for (MemoryPoolAssignment assignment : assignments.getAssignments()) {
if (assignment.getPoolId().equals(GENERAL_POOL)) {
queryContexts.getUnchecked(assignment.getQueryId()).setMemoryPool(localMemoryManager.getGeneralPool());
}
else if (assignment.getPoolId().equals(RESERVED_POOL)) {
MemoryPool reservedPool = localMemoryManager.getReservedPool()
.orElseThrow(() -> new IllegalArgumentException(format("Cannot move %s to the reserved pool as the reserved pool is not enabled", assignment.getQueryId())));
queryContexts.getUnchecked(assignment.getQueryId()).setMemoryPool(reservedPool);
}
else {
throw new IllegalArgumentException(format("Cannot move %s to %s as the target memory pool id is invalid", assignment.getQueryId(), assignment.getPoolId()));
}
}
}

@PostConstruct
public void start()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.trino.execution.buffer.BufferResult;
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
import io.trino.memory.MemoryPoolAssignmentsRequest;
import io.trino.spi.predicate.Domain;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
Expand Down Expand Up @@ -82,8 +81,6 @@ public interface TaskManager

VersionedDynamicFilterDomains acknowledgeAndGetNewDynamicFilterDomains(TaskId taskId, long currentDynamicFiltersVersion);

void updateMemoryPoolAssignments(MemoryPoolAssignmentsRequest assignments);

/**
* Updates the task plan, splitAssignments and output buffers. If the task does not
* already exist, it is created and then updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Streams;
Expand Down Expand Up @@ -64,7 +63,6 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.base.Verify.verifyNotNull;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.collect.MoreCollectors.toOptional;
Expand All @@ -78,7 +76,6 @@
import static io.trino.SystemSessionProperties.getQueryMaxTotalMemory;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.memory.LocalMemoryManager.GENERAL_POOL;
import static io.trino.memory.LocalMemoryManager.RESERVED_POOL;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
Expand All @@ -98,14 +95,11 @@ public class ClusterMemoryManager
private final HttpClient httpClient;
private final MBeanExporter exporter;
private final JsonCodec<MemoryInfo> memoryInfoCodec;
private final JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec;
private final DataSize maxQueryMemory;
private final DataSize maxQueryTotalMemory;
private final LowMemoryKiller lowMemoryKiller;
private final Duration killOnOutOfMemoryDelay;
private final String coordinatorId;
private final AtomicLong totalAvailableProcessors = new AtomicLong();
private final AtomicLong memoryPoolAssignmentsVersion = new AtomicLong();
private final AtomicLong clusterUserMemoryReservation = new AtomicLong();
private final AtomicLong clusterTotalMemoryReservation = new AtomicLong();
private final AtomicLong clusterMemoryBytes = new AtomicLong();
Expand Down Expand Up @@ -134,7 +128,6 @@ public ClusterMemoryManager(
LocationFactory locationFactory,
MBeanExporter exporter,
JsonCodec<MemoryInfo> memoryInfoCodec,
JsonCodec<MemoryPoolAssignmentsRequest> assignmentsRequestJsonCodec,
QueryIdGenerator queryIdGenerator,
LowMemoryKiller lowMemoryKiller,
ServerConfig serverConfig,
Expand All @@ -153,27 +146,22 @@ public ClusterMemoryManager(
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.exporter = requireNonNull(exporter, "exporter is null");
this.memoryInfoCodec = requireNonNull(memoryInfoCodec, "memoryInfoCodec is null");
this.assignmentsRequestJsonCodec = requireNonNull(assignmentsRequestJsonCodec, "assignmentsRequestJsonCodec is null");
this.lowMemoryKiller = requireNonNull(lowMemoryKiller, "lowMemoryKiller is null");
this.maxQueryMemory = config.getMaxQueryMemory();
this.maxQueryTotalMemory = config.getMaxQueryTotalMemory();
this.coordinatorId = queryIdGenerator.getCoordinatorId();
this.killOnOutOfMemoryDelay = config.getKillOnOutOfMemoryDelay();
this.isWorkScheduledOnCoordinator = schedulerConfig.isIncludeCoordinator();

verify(maxQueryMemory.toBytes() <= maxQueryTotalMemory.toBytes(),
"maxQueryMemory cannot be greater than maxQueryTotalMemory");

this.pools = createClusterMemoryPools(!nodeMemoryConfig.isReservedPoolDisabled());
this.pools = createClusterMemoryPools();
}

private Map<MemoryPoolId, ClusterMemoryPool> createClusterMemoryPools(boolean reservedPoolEnabled)
private Map<MemoryPoolId, ClusterMemoryPool> createClusterMemoryPools()
{
Set<MemoryPoolId> memoryPools = new HashSet<>();
memoryPools.add(GENERAL_POOL);
if (reservedPoolEnabled) {
memoryPools.add(RESERVED_POOL);
}

ImmutableMap.Builder<MemoryPoolId, ClusterMemoryPool> builder = ImmutableMap.builder();
for (MemoryPoolId poolId : memoryPools) {
Expand Down Expand Up @@ -268,19 +256,7 @@ public synchronized void process(Iterable<QueryExecution> runningQueries, Suppli
}

updatePools(countByPool);

MemoryPoolAssignmentsRequest assignmentsRequest;
if (pools.containsKey(RESERVED_POOL)) {
assignmentsRequest = updateAssignments(runningQueries);
}
else {
// If reserved pool is not enabled, we don't create a MemoryPoolAssignmentsRequest that puts all the queries
// in the general pool (as they already are). In this case we create an effectively NOOP MemoryPoolAssignmentsRequest.
// Once the reserved pool is removed we should get rid of the logic of putting queries into reserved pool including
// this piece of code.
assignmentsRequest = new MemoryPoolAssignmentsRequest(coordinatorId, Long.MIN_VALUE, ImmutableList.of());
}
updateNodes(assignmentsRequest);
updateNodes();
}

private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
Expand Down Expand Up @@ -369,52 +345,7 @@ public synchronized Map<MemoryPoolId, MemoryPoolInfo> getMemoryPoolInfo()

private synchronized boolean isClusterOutOfMemory()
{
ClusterMemoryPool reservedPool = pools.get(RESERVED_POOL);
ClusterMemoryPool generalPool = pools.get(GENERAL_POOL);
if (reservedPool == null) {
return generalPool.getBlockedNodes() > 0;
}
return reservedPool.getAssignedQueries() > 0 && generalPool.getBlockedNodes() > 0;
}

// TODO once the reserved pool is removed we can remove this method. We can also update
// RemoteNodeMemory as we don't need to POST anything.
private synchronized MemoryPoolAssignmentsRequest updateAssignments(Iterable<QueryExecution> queries)
{
ClusterMemoryPool reservedPool = verifyNotNull(pools.get(RESERVED_POOL), "reservedPool is null");
ClusterMemoryPool generalPool = verifyNotNull(pools.get(GENERAL_POOL), "generalPool is null");
long version = memoryPoolAssignmentsVersion.incrementAndGet();
// Check that all previous assignments have propagated to the visible nodes. This doesn't account for temporary network issues,
// and is more of a safety check than a guarantee
if (allAssignmentsHavePropagated(queries)) {
if (reservedPool.getAssignedQueries() == 0 && generalPool.getBlockedNodes() > 0) {
QueryExecution biggestQuery = null;
long maxMemory = -1;
for (QueryExecution queryExecution : queries) {
if (resourceOvercommit(queryExecution.getSession())) {
// Don't promote queries that requested resource overcommit to the reserved pool,
// since their memory usage is unbounded.
continue;
}

long bytesUsed = getQueryMemoryReservation(queryExecution);
if (bytesUsed > maxMemory) {
biggestQuery = queryExecution;
maxMemory = bytesUsed;
}
}
if (biggestQuery != null) {
log.info("Moving query %s to the reserved pool", biggestQuery.getQueryId());
biggestQuery.setMemoryPool(new VersionedMemoryPoolId(RESERVED_POOL, version));
}
}
}

ImmutableList.Builder<MemoryPoolAssignment> assignments = ImmutableList.builder();
for (QueryExecution queryExecution : queries) {
assignments.add(new MemoryPoolAssignment(queryExecution.getQueryId(), queryExecution.getMemoryPool().getId()));
}
return new MemoryPoolAssignmentsRequest(coordinatorId, version, assignments.build());
return pools.get(GENERAL_POOL).getBlockedNodes() > 0;
}

private QueryMemoryInfo createQueryMemoryInfo(QueryExecution query)
Expand All @@ -427,27 +358,7 @@ private long getQueryMemoryReservation(QueryExecution query)
return query.getTotalMemoryReservation().toBytes();
}

private synchronized boolean allAssignmentsHavePropagated(Iterable<QueryExecution> queries)
{
if (nodes.isEmpty()) {
// Assignments can't have propagated, if there are no visible nodes.
return false;
}
long newestAssignment = ImmutableList.copyOf(queries).stream()
.map(QueryExecution::getMemoryPool)
.mapToLong(VersionedMemoryPoolId::getVersion)
.min()
.orElse(-1);

long mostOutOfDateNode = nodes.values().stream()
.mapToLong(RemoteNodeMemory::getCurrentAssignmentVersion)
.min()
.orElse(Long.MAX_VALUE);

return newestAssignment <= mostOutOfDateNode;
}

private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments)
private synchronized void updateNodes()
{
ImmutableSet.Builder<InternalNode> builder = ImmutableSet.builder();
Set<InternalNode> aliveNodes = builder
Expand All @@ -467,19 +378,19 @@ private synchronized void updateNodes(MemoryPoolAssignmentsRequest assignments)
// Add new nodes
for (InternalNode node : aliveNodes) {
if (!nodes.containsKey(node.getNodeIdentifier())) {
nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, assignmentsRequestJsonCodec, locationFactory.createMemoryInfoLocation(node)));
nodes.put(node.getNodeIdentifier(), new RemoteNodeMemory(node, httpClient, memoryInfoCodec, locationFactory.createMemoryInfoLocation(node)));
}
}

// If work isn't scheduled on the coordinator (the current node) there is no point
// in polling or updating (when moving queries to the reserved pool) its memory pools
// in polling or updating its memory pools
if (!isWorkScheduledOnCoordinator) {
nodes.remove(nodeManager.getCurrentNode().getNodeIdentifier());
}

// Schedule refresh
for (RemoteNodeMemory node : nodes.values()) {
node.asyncRefresh(assignments);
node.asyncRefresh();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.lang.management.OperatingSystemMXBean;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static java.lang.String.format;
Expand All @@ -35,7 +34,6 @@
public final class LocalMemoryManager
{
public static final MemoryPoolId GENERAL_POOL = new MemoryPoolId("general");
public static final MemoryPoolId RESERVED_POOL = new MemoryPoolId("reserved");
private static final OperatingSystemMXBean OPERATING_SYSTEM_MX_BEAN = ManagementFactory.getOperatingSystemMXBean();

private DataSize maxMemory;
Expand All @@ -60,10 +58,6 @@ private void configureMemoryPools(NodeMemoryConfig config, long availableMemory)
maxMemory = DataSize.ofBytes(availableMemory - config.getHeapHeadroom().toBytes());
ImmutableMap.Builder<MemoryPoolId, MemoryPool> builder = ImmutableMap.builder();
long generalPoolSize = maxMemory.toBytes();
if (!config.isReservedPoolDisabled()) {
builder.put(RESERVED_POOL, new MemoryPool(RESERVED_POOL, config.getMaxQueryMemoryPerNode()));
generalPoolSize -= config.getMaxQueryMemoryPerNode().toBytes();
}
verify(generalPoolSize > 0, "general memory pool size is 0");
builder.put(GENERAL_POOL, new MemoryPool(GENERAL_POOL, DataSize.ofBytes(generalPoolSize)));
this.pools = builder.buildOrThrow();
Expand Down Expand Up @@ -102,9 +96,4 @@ public MemoryPool getGeneralPool()
{
return pools.get(GENERAL_POOL);
}

public Optional<MemoryPool> getReservedPool()
{
return Optional.ofNullable(pools.get(RESERVED_POOL));
}
}
Loading

0 comments on commit fc01273

Please sign in to comment.