Skip to content

Commit

Permalink
Hold fulfilled acquires in bin packing node allocator
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Apr 12, 2022
1 parent 1438161 commit 2fb9897
Showing 1 changed file with 24 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
Expand Down Expand Up @@ -80,6 +82,7 @@ public class BinPackingNodeAllocatorService

private final ConcurrentMap<String, Long> allocatedMemory = new ConcurrentHashMap<>();
private final Deque<PendingAcquire> pendingAcquires = new ConcurrentLinkedDeque<>();
private final Set<BinPackingNodeLease> fulfilledAcquires = newConcurrentHashSet();

@Inject
public BinPackingNodeAllocatorService(
Expand Down Expand Up @@ -180,6 +183,7 @@ synchronized void processPendingAcquires()
pendingAcquire.getFuture().set(reservedNode);
if (!pendingAcquire.getFuture().isCancelled()) {
updateAllocatedMemory(reservedNode, pendingAcquire.getMemoryLease());
fulfilledAcquires.add(pendingAcquire.getLease());
}
else {
// request was cancelled in the meantime
Expand Down Expand Up @@ -214,13 +218,11 @@ public NodeAllocator getNodeAllocator(Session session)
@Override
public NodeLease acquire(NodeRequirements requirements)
{
PendingAcquire pendingAcquire = new PendingAcquire(requirements);
BinPackingNodeLease nodeLease = new BinPackingNodeLease(requirements.getMemory().toBytes());
PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeLease);
pendingAcquires.add(pendingAcquire);
wakeupProcessPendingAcquires();

return new BinPackingNodeLease(
pendingAcquire.getFuture(),
requirements.getMemory().toBytes());
return nodeLease;
}

@Override
Expand Down Expand Up @@ -248,22 +250,27 @@ private void updateAllocatedMemory(InternalNode node, long delta)
private static class PendingAcquire
{
private final NodeRequirements nodeRequirements;
private final SettableFuture<InternalNode> future;
private final BinPackingNodeLease lease;

private PendingAcquire(NodeRequirements nodeRequirements)
private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease)
{
this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null");
this.future = SettableFuture.create();
this.lease = requireNonNull(lease, "lease is null");
}

public NodeRequirements getNodeRequirements()
{
return nodeRequirements;
}

public BinPackingNodeLease getLease()
{
return lease;
}

public SettableFuture<InternalNode> getFuture()
{
return future;
return lease.getNodeSettableFuture();
}

public long getMemoryLease()
Expand All @@ -275,14 +282,13 @@ public long getMemoryLease()
private class BinPackingNodeLease
implements NodeAllocator.NodeLease
{
private final ListenableFuture<InternalNode> node;
private final SettableFuture<InternalNode> node = SettableFuture.create();
private final AtomicBoolean released = new AtomicBoolean();
private final long memoryLease;
private final AtomicReference<TaskId> taskId = new AtomicReference<>();

private BinPackingNodeLease(ListenableFuture<InternalNode> node, long memoryLease)
private BinPackingNodeLease(long memoryLease)
{
this.node = requireNonNull(node, "node is null");
this.memoryLease = memoryLease;
}

Expand All @@ -292,6 +298,11 @@ public ListenableFuture<InternalNode> getNode()
return node;
}

SettableFuture<InternalNode> getNodeSettableFuture()
{
return node;
}

@Override
public void attachTaskId(TaskId taskId)
{
Expand All @@ -313,6 +324,7 @@ public void release()
if (node.isDone() && !node.isCancelled()) {
updateAllocatedMemory(getFutureValue(node), -memoryLease);
wakeupProcessPendingAcquires();
checkState(fulfilledAcquires.remove(this), "node lease %s not found in fulfilledAcquires %s", this, fulfilledAcquires);
}
}
else {
Expand Down

0 comments on commit 2fb9897

Please sign in to comment.