Skip to content

Commit

Permalink
Reactivation of worker nodes
Browse files Browse the repository at this point in the history
Adds new node states to enable full control over shutdown and reactivation of workers.
- state: DRAINING - a reversible shutdown,
- state: DRAINED - all tasks are finished, server can be safely and quickly stopped. Can still go back to ACTIVE.
  • Loading branch information
brybacki authored and losipiuk committed Dec 16, 2024
1 parent 1179ade commit e369f66
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import static io.trino.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.DRAINED;
import static io.trino.metadata.NodeState.DRAINING;
import static io.trino.metadata.NodeState.INACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static io.trino.spi.connector.SystemTable.Distribution.SINGLE_COORDINATOR;
Expand Down Expand Up @@ -81,6 +83,9 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
addRows(table, allNodes.getActiveNodes(), ACTIVE);
addRows(table, allNodes.getInactiveNodes(), INACTIVE);
addRows(table, allNodes.getShuttingDownNodes(), SHUTTING_DOWN);
addRows(table, allNodes.getDrainingNodes(), DRAINING);
addRows(table, allNodes.getDrainedNodes(), DRAINED);

return table.build().cursor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.DRAINED;
import static io.trino.metadata.NodeState.DRAINING;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static java.lang.Math.min;
Expand Down Expand Up @@ -433,6 +435,8 @@ private synchronized void updateNodes()
Set<InternalNode> aliveNodes = builder
.addAll(nodeManager.getNodes(ACTIVE))
.addAll(nodeManager.getNodes(SHUTTING_DOWN))
.addAll(nodeManager.getNodes(DRAINING))
.addAll(nodeManager.getNodes(DRAINED))
.build();

ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
Expand Down
25 changes: 23 additions & 2 deletions core/trino-main/src/main/java/io/trino/metadata/AllNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ public class AllNodes
{
private final Set<InternalNode> activeNodes;
private final Set<InternalNode> inactiveNodes;
private final Set<InternalNode> drainingNodes;
private final Set<InternalNode> drainedNodes;
private final Set<InternalNode> shuttingDownNodes;
private final Set<InternalNode> activeCoordinators;

public AllNodes(Set<InternalNode> activeNodes, Set<InternalNode> inactiveNodes, Set<InternalNode> shuttingDownNodes, Set<InternalNode> activeCoordinators)
public AllNodes(Set<InternalNode> activeNodes,
Set<InternalNode> inactiveNodes,
Set<InternalNode> drainingNodes,
Set<InternalNode> drainedNodes,
Set<InternalNode> shuttingDownNodes,
Set<InternalNode> activeCoordinators)
{
this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null"));
this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null"));
this.drainedNodes = ImmutableSet.copyOf(requireNonNull(drainedNodes, "drainedNodes is null"));
this.drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null"));
this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
}
Expand All @@ -50,6 +59,16 @@ public Set<InternalNode> getShuttingDownNodes()
return shuttingDownNodes;
}

public Set<InternalNode> getDrainedNodes()
{
return drainedNodes;
}

public Set<InternalNode> getDrainingNodes()
{
return drainingNodes;
}

public Set<InternalNode> getActiveCoordinators()
{
return activeCoordinators;
Expand All @@ -67,13 +86,15 @@ public boolean equals(Object o)
AllNodes allNodes = (AllNodes) o;
return Objects.equals(activeNodes, allNodes.activeNodes) &&
Objects.equals(inactiveNodes, allNodes.inactiveNodes) &&
Objects.equals(drainedNodes, allNodes.drainedNodes) &&
Objects.equals(drainingNodes, allNodes.drainingNodes) &&
Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) &&
Objects.equals(activeCoordinators, allNodes.activeCoordinators);
}

@Override
public int hashCode()
{
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators);
return Objects.hash(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, activeCoordinators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand Down Expand Up @@ -56,7 +55,6 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.trino.connector.system.GlobalSystemConnector.CATALOG_HANDLE;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.INACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -169,6 +167,8 @@ private void pollWorkers()
AllNodes allNodes = getAllNodes();
Set<InternalNode> aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(allNodes.getActiveNodes())
.addAll(allNodes.getDrainingNodes())
.addAll(allNodes.getDrainedNodes())
.addAll(allNodes.getShuttingDownNodes())
.build();

Expand Down Expand Up @@ -216,6 +216,8 @@ private synchronized void refreshNodesInternal()

ImmutableSet.Builder<InternalNode> activeNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> inactiveNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> drainingNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> drainedNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> shuttingDownNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> coordinatorsBuilder = ImmutableSet.builder();
ImmutableSetMultimap.Builder<CatalogHandle, InternalNode> byCatalogHandleBuilder = ImmutableSetMultimap.builder();
Expand Down Expand Up @@ -250,6 +252,12 @@ private synchronized void refreshNodesInternal()
case INACTIVE:
inactiveNodesBuilder.add(node);
break;
case DRAINING:
drainingNodesBuilder.add(node);
break;
case DRAINED:
drainedNodesBuilder.add(node);
break;
case SHUTTING_DOWN:
shuttingDownNodesBuilder.add(node);
break;
Expand All @@ -260,12 +268,20 @@ private synchronized void refreshNodesInternal()
}

Set<InternalNode> activeNodes = activeNodesBuilder.build();
Set<InternalNode> drainingNodes = drainingNodesBuilder.build();
Set<InternalNode> drainedNodes = drainedNodesBuilder.build();
Set<InternalNode> inactiveNodes = inactiveNodesBuilder.build();
Set<InternalNode> coordinators = coordinatorsBuilder.build();
Set<InternalNode> shuttingDownNodes = shuttingDownNodesBuilder.build();
if (allNodes != null) {
// log node that are no longer active (but not shutting down)
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodes, shuttingDownNodes));
Set<InternalNode> aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(activeNodes)
.addAll(drainingNodes)
.addAll(drainedNodes)
.addAll(shuttingDownNodes)
.build();
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), aliveNodes);
for (InternalNode missingNode : missingNodes) {
log.info("Previously active node is missing: %s (last seen at %s)", missingNode.getNodeIdentifier(), missingNode.getHost());
}
Expand All @@ -276,7 +292,7 @@ private synchronized void refreshNodesInternal()
activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build());
}

AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, shuttingDownNodes, coordinators);
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators);
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
Expand All @@ -292,21 +308,17 @@ private synchronized void refreshNodesInternal()
private NodeState getNodeState(InternalNode node)
{
if (expectedNodeVersion.equals(node.getNodeVersion())) {
if (isNodeShuttingDown(node.getNodeIdentifier())) {
return SHUTTING_DOWN;
}
return ACTIVE;
String nodeId = node.getNodeIdentifier();
// The empty case that is being set to a default value of ACTIVE is limited to the case where a node
// has announced itself but no state has yet been successfully retrieved. RemoteNodeState will retain
// the previously known state if any has been reported.
return Optional.ofNullable(nodeStates.get(nodeId))
.flatMap(RemoteNodeState::getNodeState)
.orElse(NodeState.ACTIVE);
}
return INACTIVE;
}

private boolean isNodeShuttingDown(String nodeId)
{
return Optional.ofNullable(nodeStates.get(nodeId))
.flatMap(RemoteNodeState::getNodeState)
.orElse(NodeState.ACTIVE) == SHUTTING_DOWN;
}

@Override
public synchronized AllNodes getAllNodes()
{
Expand All @@ -325,6 +337,18 @@ public int getInactiveNodeCount()
return getAllNodes().getInactiveNodes().size();
}

@Managed
public int getDrainingNodeCount()
{
return getAllNodes().getDrainingNodes().size();
}

@Managed
public int getDrainedNodeCount()
{
return getAllNodes().getDrainedNodes().size();
}

@Managed
public int getShuttingDownNodeCount()
{
Expand All @@ -337,6 +361,8 @@ public Set<InternalNode> getNodes(NodeState state)
return switch (state) {
case ACTIVE -> getAllNodes().getActiveNodes();
case INACTIVE -> getAllNodes().getInactiveNodes();
case DRAINING -> getAllNodes().getDrainingNodes();
case DRAINED -> getAllNodes().getDrainedNodes();
case SHUTTING_DOWN -> getAllNodes().getShuttingDownNodes();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Set<InternalNode> getNodes(NodeState state)
{
return switch (state) {
case ACTIVE -> ImmutableSet.copyOf(allNodes);
case INACTIVE, SHUTTING_DOWN -> ImmutableSet.of();
case DRAINING, DRAINED, INACTIVE, SHUTTING_DOWN -> ImmutableSet.of();
};
}

Expand All @@ -84,6 +84,8 @@ public AllNodes getAllNodes()
ImmutableSet.copyOf(allNodes),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(CURRENT_NODE));
}

Expand Down
17 changes: 17 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/NodeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@

public enum NodeState
{
/**
* Server is up and running ready to handle tasks
*/
ACTIVE,
/**
* Never used internally, might be used by discoveryNodeManager when communication error occurs
*/
INACTIVE,
/**
* A reversible graceful shutdown, can go to forward to DRAINED or back to ACTIVE.
*/
DRAINING,
/**
* All tasks are finished, server can be safely and quickly stopped. Can also go back to ACTIVE.
*/
DRAINED,
/**
* Graceful shutdown, non-reversible, when observed will drain and terminate
*/
SHUTTING_DOWN
}
Loading

0 comments on commit e369f66

Please sign in to comment.