Skip to content

Commit

Permalink
Use explicit lease object in NodeAllocator
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 10, 2022
1 parent abb4c03 commit 98c2500
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 206 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public class FaultTolerantStageScheduler
private ListenableFuture<Void> blocked = immediateVoidFuture();

@GuardedBy("this")
private ListenableFuture<NodeInfo> acquireNodeFuture;
private NodeAllocator.NodeLease nodeLease;
@GuardedBy("this")
private SettableFuture<Void> taskFinishedFuture;

Expand All @@ -120,7 +120,7 @@ public class FaultTolerantStageScheduler
@GuardedBy("this")
private final Map<TaskId, RemoteTask> runningTasks = new HashMap<>();
@GuardedBy("this")
private final Map<TaskId, NodeInfo> runningNodes = new HashMap<>();
private final Map<TaskId, NodeAllocator.NodeLease> runningNodes = new HashMap<>();
@GuardedBy("this")
private final Set<Integer> allPartitions = new HashSet<>();
@GuardedBy("this")
Expand Down Expand Up @@ -253,15 +253,14 @@ public synchronized void schedule()
}
TaskDescriptor taskDescriptor = taskDescriptorOptional.get();

if (acquireNodeFuture == null) {
acquireNodeFuture = nodeAllocator.acquire(taskDescriptor.getNodeRequirements());
if (nodeLease == null) {
nodeLease = nodeAllocator.acquire(taskDescriptor.getNodeRequirements());
}
if (!acquireNodeFuture.isDone()) {
blocked = asVoid(acquireNodeFuture);
if (!nodeLease.getNode().isDone()) {
blocked = asVoid(nodeLease.getNode());
return;
}
NodeInfo node = getFutureValue(acquireNodeFuture);
acquireNodeFuture = null;
NodeInfo node = getFutureValue(nodeLease.getNode());

queuedPartitions.poll();

Expand Down Expand Up @@ -311,7 +310,8 @@ public synchronized void schedule()

partitionToRemoteTaskMap.put(partition, task);
runningTasks.put(task.getTaskId(), task);
runningNodes.put(task.getTaskId(), node);
runningNodes.put(task.getTaskId(), nodeLease);
nodeLease = null;

if (taskFinishedFuture == null) {
taskFinishedFuture = SettableFuture.create();
Expand Down Expand Up @@ -402,16 +402,13 @@ private void cancelBlockedFuture()
private void releaseAcquiredNode()
{
verify(!Thread.holdsLock(this));
ListenableFuture<NodeInfo> future;
NodeAllocator.NodeLease lease;
synchronized (this) {
future = acquireNodeFuture;
acquireNodeFuture = null;
lease = nodeLease;
nodeLease = null;
}
if (future != null) {
future.cancel(true);
if (future.isDone() && !future.isCancelled()) {
nodeAllocator.release(getFutureValue(future));
}
if (lease != null) {
lease.release();
}
}

Expand Down Expand Up @@ -497,8 +494,8 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
taskFinishedFuture = null;
}

NodeInfo node = requireNonNull(runningNodes.remove(taskId), () -> "node not found for task id: " + taskId);
nodeAllocator.release(node);
NodeAllocator.NodeLease nodeLease = requireNonNull(runningNodes.remove(taskId), () -> "node not found for task id: " + taskId);
nodeLease.release();

int partitionId = taskId.getPartitionId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.execution.scheduler.NodeInfo.unlimitedMemoryNode;
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
Expand Down Expand Up @@ -148,30 +149,23 @@ public FixedCountNodeAllocator(
}

@Override
public synchronized ListenableFuture<NodeInfo> acquire(NodeRequirements requirements)
public synchronized NodeLease acquire(NodeRequirements requirements)
{
try {
Optional<InternalNode> node = tryAcquireNode(requirements);
if (node.isPresent()) {
return immediateFuture(unlimitedMemoryNode(node.get()));
return new FixedCountNodeLease(immediateFuture(unlimitedMemoryNode(node.get())));
}
}
catch (RuntimeException e) {
return immediateFailedFuture(e);
return new FixedCountNodeLease(immediateFailedFuture(e));
}

SettableFuture<NodeInfo> future = SettableFuture.create();
PendingAcquire pendingAcquire = new PendingAcquire(requirements, future);
pendingAcquires.add(pendingAcquire);

return future;
}

@Override
public void release(NodeInfo node)
{
releaseNodeInternal(node.getNode());
processPendingAcquires();
return new FixedCountNodeLease(future);
}

public void updateNodes()
Expand Down Expand Up @@ -208,10 +202,13 @@ private synchronized Optional<InternalNode> tryAcquireNode(NodeRequirements requ
return selectedNode;
}

private synchronized void releaseNodeInternal(InternalNode node)
private void releaseNode(InternalNode node)
{
int allocationCount = allocationCountMap.compute(node, (key, value) -> value == null ? 0 : value - 1);
checkState(allocationCount >= 0, "allocation count for node %s is expected to be greater than or equal to zero: %s", node, allocationCount);
synchronized (this) {
int allocationCount = allocationCountMap.compute(node, (key, value) -> value == null ? 0 : value - 1);
checkState(allocationCount >= 0, "allocation count for node %s is expected to be greater than or equal to zero: %s", node, allocationCount);
}
processPendingAcquires();
}

private void processPendingAcquires()
Expand Down Expand Up @@ -247,7 +244,7 @@ private void processPendingAcquires()
SettableFuture<NodeInfo> future = pendingAcquire.getFuture();
future.set(unlimitedMemoryNode(node));
if (future.isCancelled()) {
releaseNodeInternal(node);
releaseNode(node);
}
});

Expand All @@ -262,6 +259,38 @@ public synchronized void close()
{
allocators.remove(this);
}

private class FixedCountNodeLease
implements NodeAllocator.NodeLease
{
private final ListenableFuture<NodeInfo> node;
private final AtomicBoolean released = new AtomicBoolean();

private FixedCountNodeLease(ListenableFuture<NodeInfo> node)
{
this.node = requireNonNull(node, "node is null");
}

@Override
public ListenableFuture<NodeInfo> getNode()
{
return node;
}

@Override
public void release()
{
if (released.compareAndSet(false, true)) {
node.cancel(true);
if (node.isDone() && !node.isCancelled()) {
releaseNode(getFutureValue(node).getNode());
}
}
else {
throw new IllegalStateException("Node " + node + " already released");
}
}
}
}

private static class PendingAcquire
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,21 @@
public interface NodeAllocator
extends Closeable
{
ListenableFuture<NodeInfo> acquire(NodeRequirements requirements);

void release(NodeInfo node);
/**
* Requests acquisition of node. Obtained node can be obtained via {@link NodeLease#getNode()} method.
* The node may not be available immediately. Calling party needs to wait until future returned is done.
*
* It is obligatory for the calling party to release all the leases they obtained via {@link NodeLease#release()}.
*/
NodeLease acquire(NodeRequirements requirements);

@Override
void close();

interface NodeLease
{
ListenableFuture<NodeInfo> getNode();

void release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 +361,14 @@ public void testTaskFailure()
// waiting on node acquisition
assertBlocked(blocked);

ListenableFuture<NodeInfo> acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
ListenableFuture<NodeInfo> acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
NodeAllocator.NodeLease acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
NodeAllocator.NodeLease acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));

remoteTaskFactory.getTasks().get(getTaskId(0, 0)).fail(new RuntimeException("some failure"));

assertUnblocked(blocked);
assertUnblocked(acquireNode1);
assertUnblocked(acquireNode2);
assertUnblocked(acquireNode1.getNode());
assertUnblocked(acquireNode2.getNode());

assertThatThrownBy(scheduler::schedule)
.hasMessageContaining("some failure");
Expand Down Expand Up @@ -469,8 +469,8 @@ private void testCancellation(boolean abort)
// waiting on node acquisition
assertBlocked(blocked);

ListenableFuture<NodeInfo> acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
ListenableFuture<NodeInfo> acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
NodeAllocator.NodeLease acquireNode1 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));
NodeAllocator.NodeLease acquireNode2 = nodeAllocator.acquire(new NodeRequirements(Optional.of(CATALOG), ImmutableSet.of()));

if (abort) {
scheduler.abort();
Expand All @@ -480,8 +480,8 @@ private void testCancellation(boolean abort)
}

assertUnblocked(blocked);
assertUnblocked(acquireNode1);
assertUnblocked(acquireNode2);
assertUnblocked(acquireNode1.getNode());
assertUnblocked(acquireNode2.getNode());

scheduler.schedule();

Expand Down
Loading

0 comments on commit 98c2500

Please sign in to comment.