Skip to content

Commit

Permalink
merge main
Browse files Browse the repository at this point in the history
  • Loading branch information
ceekay committed Apr 4, 2024
2 parents 08150fe + 3467db1 commit 0f28333
Show file tree
Hide file tree
Showing 49 changed files with 1,466 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,12 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeOperationalState opState,
* Allows a list of hosts to be decommissioned. The hosts are identified
* by their hostname and optionally port in the format foo.com:port.
* @param hosts A list of hostnames, optionally with port
* @param force true to forcefully decommission Datanodes
* @throws IOException
* @return A list of DatanodeAdminError for any hosts which failed to
* decommission
*/
List<DatanodeAdminError> decommissionNodes(List<String> hosts)
List<DatanodeAdminError> decommissionNodes(List<String> hosts, boolean force)
throws IOException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -232,10 +233,10 @@ public boolean contains(Node node) {

private boolean containsNode(Node node) {
Node parent = node.getParent();
while (parent != null && parent != clusterTree) {
while (parent != null && !Objects.equals(parent, clusterTree)) {
parent = parent.getParent();
}
return parent == clusterTree;
return Objects.equals(parent, clusterTree);
}

/**
Expand All @@ -249,7 +250,9 @@ public boolean isSameAncestor(Node node1, Node node2, int ancestorGen) {
}
netlock.readLock().lock();
try {
return node1.getAncestor(ancestorGen) == node2.getAncestor(ancestorGen);
Node ancestor1 = node1.getAncestor(ancestorGen);
Node ancestor2 = node2.getAncestor(ancestorGen);
return Objects.equals(ancestor1, ancestor2);
} finally {
netlock.readLock().unlock();
}
Expand All @@ -268,7 +271,7 @@ public boolean isSameParent(Node node1, Node node2) {
try {
node1 = node1.getParent();
node2 = node2.getParent();
return node1 == node2;
return Objects.equals(node1, node2);
} finally {
netlock.readLock().unlock();
}
Expand Down Expand Up @@ -713,8 +716,7 @@ private Node chooseNodeInternal(String scope, int leafIndex,
*/
@Override
public int getDistanceCost(Node node1, Node node2) {
if ((node1 != null && node1.equals(node2)) ||
(node1 == null && node2 == null)) {
if (Objects.equals(node1, node2)) {
return 0;
}
if (node1 == null || node2 == null) {
Expand All @@ -736,12 +738,9 @@ public int getDistanceCost(Node node1, Node node2) {
netlock.readLock().lock();
try {
Node ancestor1 = node1.getAncestor(level1 - 1);
boolean node1Topology = (ancestor1 != null && clusterTree != null &&
!ancestor1.equals(clusterTree)) || (ancestor1 != clusterTree);
Node ancestor2 = node2.getAncestor(level2 - 1);
boolean node2Topology = (ancestor2 != null && clusterTree != null &&
!ancestor2.equals(clusterTree)) || (ancestor2 != clusterTree);
if (node1Topology || node2Topology) {
if (!Objects.equals(ancestor1, clusterTree) ||
!Objects.equals(ancestor2, clusterTree)) {
LOG.debug("One of the nodes is outside of network topology");
return Integer.MAX_VALUE;
}
Expand All @@ -755,7 +754,7 @@ public int getDistanceCost(Node node1, Node node2) {
level2--;
cost += node2 == null ? 0 : node2.getCost();
}
while (node1 != null && node2 != null && !node1.equals(node2)) {
while (node1 != null && node2 != null && !Objects.equals(node1, node2)) {
node1 = node1.getParent();
node2 = node2.getParent();
cost += node1 == null ? 0 : node1.getCost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ List<HddsProtos.Node> queryNode(HddsProtos.NodeOperationalState opState,

HddsProtos.Node queryNode(UUID uuid) throws IOException;

List<DatanodeAdminError> decommissionNodes(List<String> nodes)
List<DatanodeAdminError> decommissionNodes(List<String> nodes, boolean force)
throws IOException;

List<DatanodeAdminError> recommissionNodes(List<String> nodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,15 +528,16 @@ public HddsProtos.Node queryNode(UUID uuid) throws IOException {
/**
* Attempts to decommission the list of nodes.
* @param nodes The list of hostnames or hostname:ports to decommission
* @param force true to skip fail-early checks and try to decommission nodes
* @throws IOException
*/
@Override
public List<DatanodeAdminError> decommissionNodes(List<String> nodes)
public List<DatanodeAdminError> decommissionNodes(List<String> nodes, boolean force)
throws IOException {
Preconditions.checkNotNull(nodes);
DecommissionNodesRequestProto request =
DecommissionNodesRequestProto.newBuilder()
.addAllHosts(nodes)
.addAllHosts(nodes).setForce(force)
.build();
DecommissionNodesResponseProto response =
submitRequest(Type.DecommissionNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ message DatanodeUsageInfoResponseProto {
*/
message DecommissionNodesRequestProto {
repeated string hosts = 1;
optional bool force = 2;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.scm.DatanodeAdminError;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
Expand All @@ -42,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -56,6 +61,7 @@ public class NodeDecommissionManager {
private final DatanodeAdminMonitor monitor;

private final NodeManager nodeManager;
private ContainerManager containerManager;
private final SCMContext scmContext;
private final boolean useHostnames;

Expand Down Expand Up @@ -252,10 +258,11 @@ private boolean validateDNPortMatch(int port, DatanodeDetails dn) {
return false;
}

public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm,
public NodeDecommissionManager(OzoneConfiguration config, NodeManager nm, ContainerManager cm,
SCMContext scmContext,
EventPublisher eventQueue, ReplicationManager rm) {
this.nodeManager = nm;
this.containerManager = cm;
this.scmContext = scmContext;

executor = Executors.newScheduledThreadPool(1,
Expand Down Expand Up @@ -305,9 +312,21 @@ public DatanodeAdminMonitor getMonitor() {
}

public synchronized List<DatanodeAdminError> decommissionNodes(
List<String> nodes) {
List<String> nodes, boolean force) {
List<DatanodeAdminError> errors = new ArrayList<>();
List<DatanodeDetails> dns = mapHostnamesToDatanodes(nodes, errors);
// add check for fail-early if force flag is not set
if (!force) {
LOG.info("Force flag = {}. Checking if decommission is possible for dns: {}", force, dns);
boolean decommissionPossible = checkIfDecommissionPossible(dns, errors);
if (!decommissionPossible) {
LOG.error("Cannot decommission nodes as sufficient node are not available.");
errors.add(new DatanodeAdminError("AllHosts", "Sufficient nodes are not available."));
return errors;
}
} else {
LOG.info("Force flag = {}. Skip checking if decommission is possible for dns: {}", force, dns);
}
for (DatanodeDetails dn : dns) {
try {
startDecommission(dn);
Expand Down Expand Up @@ -368,6 +387,61 @@ public synchronized void startDecommission(DatanodeDetails dn)
}
}

private synchronized boolean checkIfDecommissionPossible(List<DatanodeDetails> dns, List<DatanodeAdminError> errors) {
int numDecom = dns.size();
List<DatanodeDetails> validDns = new ArrayList<>(dns);
int inServiceTotal = nodeManager.getNodeCount(NodeStatus.inServiceHealthy());
for (DatanodeDetails dn : dns) {
try {
NodeStatus nodeStatus = getNodeStatus(dn);
NodeOperationalState opState = nodeStatus.getOperationalState();
if (opState != NodeOperationalState.IN_SERVICE) {
numDecom--;
validDns.remove(dn);
}
} catch (NodeNotFoundException ex) {
numDecom--;
validDns.remove(dn);
}
}

for (DatanodeDetails dn : validDns) {
Set<ContainerID> containers;
try {
containers = nodeManager.getContainers(dn);
} catch (NodeNotFoundException ex) {
LOG.warn("The host {} was not found in SCM. Ignoring the request to " +
"decommission it", dn.getHostName());
continue; // ignore the DN and continue to next one
}

for (ContainerID cid : containers) {
ContainerInfo cif;
try {
cif = containerManager.getContainer(cid);
} catch (ContainerNotFoundException ex) {
LOG.warn("Could not find container info for container {}.", cid);
continue; // ignore the container and continue to next one
}
synchronized (cif) {
if (cif.getState().equals(HddsProtos.LifeCycleState.DELETED) ||
cif.getState().equals(HddsProtos.LifeCycleState.DELETING)) {
continue;
}
int reqNodes = cif.getReplicationConfig().getRequiredNodes();
if ((inServiceTotal - numDecom) < reqNodes) {
LOG.info("Cannot decommission nodes. Tried to decommission {} nodes of which valid nodes = {}. " +
"Cluster state: In-service nodes = {}, nodes required for replication = {}. " +
"Failing due to datanode : {}, container : {}",
dns.size(), numDecom, inServiceTotal, reqNodes, dn, cid);
return false;
}
}
}
}
return true;
}

public synchronized List<DatanodeAdminError> recommissionNodes(
List<String> nodes) {
List<DatanodeAdminError> errors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ public ContainerBalancerStatusResponseProto getContainerBalancerStatus(
public DecommissionNodesResponseProto decommissionNodes(
DecommissionNodesRequestProto request) throws IOException {
List<DatanodeAdminError> errors =
impl.decommissionNodes(request.getHostsList());
impl.decommissionNodes(request.getHostsList(), request.getForce());
DecommissionNodesResponseProto.Builder response =
DecommissionNodesResponseProto.newBuilder();
for (DatanodeAdminError e : errors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;

/**
* This class is used for maintaining SafeMode metric information, which can
Expand All @@ -33,16 +34,16 @@ public class SafeModeMetrics {


// These all values will be set to some values when safemode is enabled.
private @Metric MutableCounterLong
private @Metric MutableGaugeLong
numContainerWithOneReplicaReportedThreshold;
private @Metric MutableCounterLong
currentContainersWithOneReplicaReportedCount;

// When hdds.scm.safemode.pipeline-availability.check is set then only
// below metrics will have some values, otherwise they will be zero.
private @Metric MutableCounterLong numHealthyPipelinesThreshold;
private @Metric MutableGaugeLong numHealthyPipelinesThreshold;
private @Metric MutableCounterLong currentHealthyPipelinesCount;
private @Metric MutableCounterLong
private @Metric MutableGaugeLong
numPipelinesWithAtleastOneReplicaReportedThreshold;
private @Metric MutableCounterLong
currentPipelinesWithAtleastOneReplicaReportedCount;
Expand All @@ -55,38 +56,38 @@ public static SafeModeMetrics create() {
}

public void setNumHealthyPipelinesThreshold(long val) {
this.numHealthyPipelinesThreshold.incr(val);
this.numHealthyPipelinesThreshold.set(val);
}

public void incCurrentHealthyPipelinesCount() {
this.currentHealthyPipelinesCount.incr();
}

public void setNumPipelinesWithAtleastOneReplicaReportedThreshold(long val) {
this.numPipelinesWithAtleastOneReplicaReportedThreshold.incr(val);
this.numPipelinesWithAtleastOneReplicaReportedThreshold.set(val);
}

public void incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount() {
this.currentPipelinesWithAtleastOneReplicaReportedCount.incr();
}

public void setNumContainerWithOneReplicaReportedThreshold(long val) {
this.numContainerWithOneReplicaReportedThreshold.incr(val);
this.numContainerWithOneReplicaReportedThreshold.set(val);
}

public void incCurrentContainersWithOneReplicaReportedCount() {
this.currentContainersWithOneReplicaReportedCount.incr();
}

MutableCounterLong getNumHealthyPipelinesThreshold() {
MutableGaugeLong getNumHealthyPipelinesThreshold() {
return numHealthyPipelinesThreshold;
}

MutableCounterLong getCurrentHealthyPipelinesCount() {
return currentHealthyPipelinesCount;
}

MutableCounterLong
MutableGaugeLong
getNumPipelinesWithAtleastOneReplicaReportedThreshold() {
return numPipelinesWithAtleastOneReplicaReportedThreshold;
}
Expand All @@ -95,7 +96,7 @@ MutableCounterLong getCurrentPipelinesWithAtleastOneReplicaCount() {
return currentPipelinesWithAtleastOneReplicaReportedCount;
}

MutableCounterLong getNumContainerWithOneReplicaReportedThreshold() {
MutableGaugeLong getNumContainerWithOneReplicaReportedThreshold() {
return numContainerWithOneReplicaReportedThreshold;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -646,11 +646,11 @@ public HddsProtos.Node queryNode(UUID uuid)
}

@Override
public List<DatanodeAdminError> decommissionNodes(List<String> nodes)
public List<DatanodeAdminError> decommissionNodes(List<String> nodes, boolean force)
throws IOException {
try {
getScm().checkAdminAccess(getRemoteUser(), false);
return scm.getScmDecommissionManager().decommissionNodes(nodes);
return scm.getScmDecommissionManager().decommissionNodes(nodes, force);
} catch (Exception ex) {
LOG.error("Failed to decommission nodes", ex);
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ private void initializeSystemManagers(OzoneConfiguration conf,
pipelineManager, eventQueue, serviceManager, scmContext);
}

scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager, containerManager,
scmContext, eventQueue, replicationManager);

statefulServiceStateManager = StatefulServiceStateManagerImpl.newBuilder()
Expand Down
Loading

0 comments on commit 0f28333

Please sign in to comment.