Skip to content

Commit

Permalink
Simplify node listing in FullNodeCapableNodeAllocatorService
Browse files Browse the repository at this point in the history
Previously NodeScheduler and NodeSelector were used for listing
nodes in FullNodeCapableNodeAllocatorService. Those abstractions were not
well suited for the job. They stattisfied much vaster set of
responsibilities, while we needed just a simple listing way of listing
all nodes (or all nodes which has given catalog installed).
The fact that NodeSelector was Session dependant made code even more
complex without any real reason as the code paths we accessed were not
really session dependent.

This PR switches node listing to be based on InternalNodeManager which
is lower down the stack.
  • Loading branch information
losipiuk committed Mar 16, 2022
1 parent 010e840 commit 504ba97
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.trino.memory.ClusterMemoryManager;
import io.trino.memory.MemoryInfo;
import io.trino.metadata.InternalNode;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.NodeState;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import org.assertj.core.util.VisibleForTesting;
Expand Down Expand Up @@ -59,6 +61,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.Futures.transform;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
Expand Down Expand Up @@ -93,7 +96,7 @@ public class FullNodeCapableNodeAllocatorService
@VisibleForTesting
static final int PROCESS_PENDING_ACQUIRES_DELAY_SECONDS = 5;

private final NodeScheduler nodeScheduler;
private final InternalNodeManager nodeManager;
private final Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier;
private final int maxAbsoluteFullNodesPerQuery;
private final double maxFractionFullNodesPerQuery;
Expand All @@ -113,27 +116,34 @@ public class FullNodeCapableNodeAllocatorService
private final Semaphore processSemaphore = new Semaphore(0);
private final ConcurrentMap<String, Long> nodePoolSizes = new ConcurrentHashMap<>();
private final AtomicLong maxNodePoolSize = new AtomicLong(FULL_NODE_MEMORY.toBytes());
private final boolean scheduleOnCoordinator;

@Inject
public FullNodeCapableNodeAllocatorService(
NodeScheduler nodeScheduler,
InternalNodeManager nodeManager,
ClusterMemoryManager clusterMemoryManager,
NodeSchedulerConfig config)
{
this(nodeScheduler, requireNonNull(clusterMemoryManager, "clusterMemoryManager is null")::getWorkerMemoryInfo, config.getMaxAbsoluteFullNodesPerQuery(), config.getMaxFractionFullNodesPerQuery());
this(nodeManager,
requireNonNull(clusterMemoryManager, "clusterMemoryManager is null")::getWorkerMemoryInfo,
config.getMaxAbsoluteFullNodesPerQuery(),
config.getMaxFractionFullNodesPerQuery(),
config.isIncludeCoordinator());
}

@VisibleForTesting
FullNodeCapableNodeAllocatorService(
NodeScheduler nodeScheduler,
InternalNodeManager nodeManager,
Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier,
int maxAbsoluteFullNodesPerQuery,
double maxFractionFullNodesPerQuery)
double maxFractionFullNodesPerQuery,
boolean scheduleOnCoordinator)
{
this.nodeScheduler = requireNonNull(nodeScheduler, "nodeScheduler is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
this.maxAbsoluteFullNodesPerQuery = maxAbsoluteFullNodesPerQuery;
this.maxFractionFullNodesPerQuery = maxFractionFullNodesPerQuery;
this.scheduleOnCoordinator = scheduleOnCoordinator;
}

private void refreshNodePoolSizes()
Expand Down Expand Up @@ -227,7 +237,7 @@ private void processSharedPendingAcquires()
}

try {
Candidates candidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector());
Candidates candidates = selectCandidates(pendingAcquire.getNodeRequirements());
if (candidates.isEmpty()) {
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Expand Down Expand Up @@ -277,7 +287,7 @@ private void processFullNodePendingAcquires()
continue;
}

Candidates currentCandidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector());
Candidates currentCandidates = selectCandidates(pendingAcquire.getNodeRequirements());
if (currentCandidates.isEmpty()) {
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Expand Down Expand Up @@ -309,7 +319,7 @@ private void processFullNodePendingAcquires()
continue;
}
try {
Candidates currentCandidates = selectCandidates(pendingAcquire.getNodeRequirements(), pendingAcquire.getNodeSelector());
Candidates currentCandidates = selectCandidates(pendingAcquire.getNodeRequirements());
if (currentCandidates.isEmpty()) {
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Expand Down Expand Up @@ -435,9 +445,9 @@ private synchronized Optional<InternalNode> tryAcquireSharedNode(Candidates cand
return selectedNode;
}

private synchronized PendingAcquire registerPendingAcquire(NodeRequirements requirements, NodeSelector nodeSelector, Candidates candidates, QueryId queryId)
private synchronized PendingAcquire registerPendingAcquire(NodeRequirements requirements, Candidates candidates, QueryId queryId)
{
PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeSelector, queryId);
PendingAcquire pendingAcquire = new PendingAcquire(requirements, queryId);
if (isFullNode(requirements)) {
Optional<InternalNode> targetNode = findTargetPendingFullNode(queryId, candidates);

Expand Down Expand Up @@ -496,9 +506,9 @@ public NodeAllocator getNodeAllocator(Session session)
return new FullNodeCapableNodeAllocator(session);
}

private static Candidates selectCandidates(NodeRequirements requirements, NodeSelector nodeSelector)
private Candidates selectCandidates(NodeRequirements requirements)
{
List<InternalNode> allNodes = nodeSelector.allNodes();
Set<InternalNode> allNodes = getAllNodes(requirements.getCatalogName());
return new Candidates(
allNodes.size(),
allNodes.stream()
Expand Down Expand Up @@ -533,11 +543,27 @@ public boolean isEmpty()
}
}

private Set<InternalNode> getAllNodes(Optional<CatalogName> catalogName)
{
Set<InternalNode> activeNodes;
if (catalogName.isPresent()) {
activeNodes = nodeManager.getActiveConnectorNodes(catalogName.get());
}
else {
activeNodes = nodeManager.getNodes(NodeState.ACTIVE);
}
if (scheduleOnCoordinator) {
return activeNodes;
}
return activeNodes.stream()
.filter(node -> !node.isCoordinator())
.collect(toImmutableSet());
}

private class FullNodeCapableNodeAllocator
implements NodeAllocator
{
@GuardedBy("this")
private final Map<Optional<CatalogName>, NodeSelector> nodeSelectorCache = new HashMap<>();
private final Session session;

public FullNodeCapableNodeAllocator(Session session)
Expand All @@ -548,9 +574,7 @@ public FullNodeCapableNodeAllocator(Session session)
@Override
public NodeLease acquire(NodeRequirements requirements)
{
NodeSelector nodeSelector = nodeSelectorCache.computeIfAbsent(requirements.getCatalogName(), catalogName -> nodeScheduler.createNodeSelector(session, catalogName));

Candidates candidates = selectCandidates(requirements, nodeSelector);
Candidates candidates = selectCandidates(requirements);
if (candidates.isEmpty()) {
throw new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query");
}
Expand All @@ -567,7 +591,7 @@ public NodeLease acquire(NodeRequirements requirements)
queryId);
}

PendingAcquire pendingAcquire = registerPendingAcquire(requirements, nodeSelector, candidates, queryId);
PendingAcquire pendingAcquire = registerPendingAcquire(requirements, candidates, queryId);
return new FullNodeCapableNodeLease(
transform(pendingAcquire.getFuture(), this::nodeInfoForNode, directExecutor()),
requirements.getMemory().toBytes(),
Expand All @@ -593,14 +617,12 @@ public void close()
private static class PendingAcquire
{
private final NodeRequirements nodeRequirements;
private final NodeSelector nodeSelector;
private final SettableFuture<InternalNode> future;
private final QueryId queryId;

private PendingAcquire(NodeRequirements nodeRequirements, NodeSelector nodeSelector, QueryId queryId)
private PendingAcquire(NodeRequirements nodeRequirements, QueryId queryId)
{
this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null");
this.nodeSelector = requireNonNull(nodeSelector, "nodeSelector is null");
this.queryId = requireNonNull(queryId, "queryId is null");
this.future = SettableFuture.create();
}
Expand All @@ -610,11 +632,6 @@ public NodeRequirements getNodeRequirements()
return nodeRequirements;
}

public NodeSelector getNodeSelector()
{
return nodeSelector;
}

public QueryId getQueryId()
{
return queryId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ public void addNode(CatalogName catalogName, Iterable<InternalNode> nodes)
listeners.forEach(listener -> listener.accept(allNodes));
}

public void removeNode(InternalNode node)
{
for (CatalogName catalog : ImmutableSet.copyOf(remoteNodes.keySet())) {
removeNode(catalog, node);
}
}

public void removeNode(CatalogName catalogName, InternalNode node)
{
remoteNodes.remove(catalogName, node);
}

@Override
public Set<InternalNode> getNodes(NodeState state)
{
Expand Down
Loading

0 comments on commit 504ba97

Please sign in to comment.