Skip to content

Commit

Permalink
Take runtime node memory usage into account in node allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Apr 12, 2022
1 parent df3bb54 commit 7146165
Show file tree
Hide file tree
Showing 2 changed files with 298 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.airlift.log.Logger;
Expand All @@ -35,6 +37,7 @@

import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -43,6 +46,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -58,6 +62,7 @@
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
import static java.util.Comparator.comparing;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -166,6 +171,7 @@ synchronized void processPendingAcquires()
BinPackingSimulation simulation = new BinPackingSimulation(
nodeManager.getActiveNodesSnapshot(),
nodePoolMemoryInfos.get(),
fulfilledAcquires,
allocatedMemory,
scheduleOnCoordinator);

Expand All @@ -182,13 +188,15 @@ synchronized void processPendingAcquires()
switch (result.getStatus()) {
case RESERVED:
InternalNode reservedNode = result.getNode().orElseThrow();
fulfilledAcquires.add(pendingAcquire.getLease());
pendingAcquire.getFuture().set(reservedNode);
if (!pendingAcquire.getFuture().isCancelled()) {
updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease());
fulfilledAcquires.add(pendingAcquire.getLease());
}
else {
// request was cancelled in the meantime
fulfilledAcquires.remove(pendingAcquire.getLease());

// run once again when we are done
wakeupProcessPendingAcquires();
}
Expand Down Expand Up @@ -300,6 +308,16 @@ public ListenableFuture<InternalNode> getNode()
return node;
}

InternalNode getAssignedNode()
{
try {
return Futures.getDone(node);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}

SettableFuture<InternalNode> getNodeSettableFuture()
{
return node;
Expand All @@ -318,6 +336,11 @@ public Optional<TaskId> getAttachedTaskId()
return Optional.ofNullable(this.taskId.get());
}

public long getMemoryLease()
{
return memoryLease;
}

@Override
public void release()
{
Expand All @@ -340,13 +363,15 @@ private static class BinPackingSimulation
private final NodesSnapshot nodesSnapshot;
private final List<InternalNode> allNodesSorted;
private final Map<String, Long> nodesRemainingMemory;
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;

private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;

public BinPackingSimulation(
NodesSnapshot nodesSnapshot,
Map<String, MemoryPoolInfo> nodeMemoryPoolInfos,
Set<BinPackingNodeLease> fulfilledAcquires,
Map<String, Long> preReservedMemory,
boolean scheduleOnCoordinator)
{
Expand All @@ -362,6 +387,28 @@ public BinPackingSimulation(
requireNonNull(preReservedMemory, "preReservedMemory is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;

Map<String, Map<String, Long>> realtimeTasksMemoryPerNode = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
if (memoryPoolInfo == null) {
realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), ImmutableMap.of());
break;
}
realtimeTasksMemoryPerNode.put(node.getNodeIdentifier(), memoryPoolInfo.getTaskMemoryReservations());
}

Map<String, Set<BinPackingNodeLease>> fulfilledAcquiresByNode = new HashMap<>();
for (BinPackingNodeLease fulfilledAcquire : fulfilledAcquires) {
InternalNode node = fulfilledAcquire.getAssignedNode();
fulfilledAcquiresByNode.compute(node.getNodeIdentifier(), (key, set) -> {
if (set == null) {
set = new HashSet<>();
}
set.add(fulfilledAcquire);
return set;
});
}

nodesRemainingMemory = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
Expand All @@ -372,14 +419,41 @@ public BinPackingSimulation(
long nodeReservedMemory = preReservedMemory.getOrDefault(node.getNodeIdentifier(), 0L);
nodesRemainingMemory.put(node.getNodeIdentifier(), memoryPoolInfo.getMaxBytes() - nodeReservedMemory);
}

nodesRemainingMemoryRuntimeAdjusted = new HashMap<>();
for (InternalNode node : nodesSnapshot.getAllNodes()) {
MemoryPoolInfo memoryPoolInfo = nodeMemoryPoolInfos.get(node.getNodeIdentifier());
if (memoryPoolInfo == null) {
nodesRemainingMemoryRuntimeAdjusted.put(node.getNodeIdentifier(), 0L);
continue;
}

Map<String, Long> realtimeNodeMemory = realtimeTasksMemoryPerNode.get(node.getNodeIdentifier());
Set<BinPackingNodeLease> nodeFulfilledAcquires = fulfilledAcquiresByNode.getOrDefault(node.getNodeIdentifier(), ImmutableSet.of());

long nodeUsedMemoryRuntimeAdjusted = 0;
for (BinPackingNodeLease lease : nodeFulfilledAcquires) {
long realtimeTaskMemory = 0;
if (lease.getAttachedTaskId().isPresent()) {
realtimeTaskMemory = realtimeNodeMemory.getOrDefault(lease.getAttachedTaskId().get().toString(), 0L);
}
long reservedTaskMemory = lease.getMemoryLease();
nodeUsedMemoryRuntimeAdjusted += max(realtimeTaskMemory, reservedTaskMemory);
}

// if globally reported memory usage of node is greater than computed one lets use that.
// it can be greater if there are tasks executed on cluster which do not have task retries enabled.
nodeUsedMemoryRuntimeAdjusted = max(nodeUsedMemoryRuntimeAdjusted, memoryPoolInfo.getReservedBytes());
nodesRemainingMemoryRuntimeAdjusted.put(node.getNodeIdentifier(), memoryPoolInfo.getMaxBytes() - nodeUsedMemoryRuntimeAdjusted);
}
}

public ReserveResult tryReserve(PendingAcquire acquire)
{
NodeRequirements requirements = acquire.getNodeRequirements();
Optional<Set<InternalNode>> catalogNodes = requirements.getCatalogName().map(nodesSnapshot::getConnectorNodes);

Optional<InternalNode> selectedNode = allNodesSorted.stream()
List<InternalNode> candidates = allNodesSorted.stream()
.filter(node -> catalogNodes.isEmpty() || catalogNodes.get().contains(node))
.filter(node -> {
// Allow using coordinator if explicitly requested
Expand All @@ -391,32 +465,46 @@ public ReserveResult tryReserve(PendingAcquire acquire)
}
return false;
})
.max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())));
.collect(toImmutableList());

if (selectedNode.isEmpty()) {
if (candidates.isEmpty()) {
return ReserveResult.NONE_MATCHING;
}

boolean selectedCandidateMatches = false;
if (nodesRemainingMemory.get(selectedNode.get().getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.get().getNodeIdentifier())) {
InternalNode selectedNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemoryRuntimeAdjusted.get(node.getNodeIdentifier())))
.orElseThrow();

if (nodesRemainingMemoryRuntimeAdjusted.get(selectedNode.getNodeIdentifier()) >= acquire.getMemoryLease() || isNodeEmpty(selectedNode.getNodeIdentifier())) {
// there is enough unreserved memory on the node
// OR
// there is not enough memory available on the node but the node is empty so we cannot to better anyway

// todo: currant logic does not handle heterogenous clusters best. There is a chance that there is a larger node in the cluster but
// with less memory available right now, hence that one was not selected as a candidate.
selectedCandidateMatches = true;
// mark memory reservation
subtractFromRemainingMemory(selectedNode.getNodeIdentifier(), acquire.getMemoryLease());
return ReserveResult.reserved(selectedNode);
}

// mark memory reservation
nodesRemainingMemory.compute(
selectedNode.get().getNodeIdentifier(),
(key, free) -> free - acquire.getMemoryLease());
// If selected node cannot be used right now, select best one ignoring runtime memory usage and reserve space there
// for later use. This is important from algorithm liveliness perspective. If we did not reserve space for a task which
// is too big to be scheduled right now, it could be starved by smaller tasks coming later.
InternalNode fallbackNode = candidates.stream()
.max(comparing(node -> nodesRemainingMemory.get(node.getNodeIdentifier())))
.orElseThrow();
subtractFromRemainingMemory(fallbackNode.getNodeIdentifier(), acquire.getMemoryLease());
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}

if (!selectedCandidateMatches) {
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}
return ReserveResult.reserved(selectedNode.get());
private void subtractFromRemainingMemory(String nodeIdentifier, long memoryLease)
{
nodesRemainingMemoryRuntimeAdjusted.compute(
nodeIdentifier,
(key, free) -> free - memoryLease);
nodesRemainingMemory.compute(
nodeIdentifier,
(key, free) -> free - memoryLease);
}

private boolean isNodeEmpty(String nodeIdentifier)
Expand Down
Loading

0 comments on commit 7146165

Please sign in to comment.