Skip to content

Commit

Permalink
HDDS-11243. SCM SafeModeRule Support EC.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Aug 5, 2024
1 parent 5a3b798 commit b65a565
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public final class HddsConfigKeys {
public static final double
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT = 0.90;

public static final String HDDS_SCM_SAFEMODE_REPORTED_DATANODE_PCT =
"hdds.scm.safemode.reported.datanode.pct";
public static final double HDDS_SCM_SAFEMODE_REPORTED_DATANODE_PCT_DEFAULT = 0.90;

// This configuration setting is used as a fallback location by all
// Ozone/HDDS services for their metadata. It is useful as a single
// config point for test/PoC clusters.
Expand Down
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1636,6 +1636,15 @@
</description>
</property>

<property>
<name>hdds.scm.safemode.reported.datanode.pct</name>
<value>0.90</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Percentage of successfully reported datanodes.
</description>
</property>

<property>
<name>hdds.container.action.max.limit</name>
<value>20</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.report.ContainerReportValidator;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
Expand Down Expand Up @@ -199,6 +201,11 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
// list
processMissingReplicas(datanodeDetails, expectedContainersInDatanode);
containerManager.notifyContainerReportProcessing(true, true);
if (reportFromDatanode.isRegister()) {
publisher.fireEvent(SCMEvents.CONTAINER_REGISTRATION_REPORT,
new SCMDatanodeProtocolServer.NodeRegistrationContainerReport(datanodeDetails,
reportFromDatanode.getReport()));
}
}
} catch (NodeNotFoundException ex) {
containerManager.notifyContainerReportProcessing(true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public final class SCMEvents {
NodeRegistrationContainerReport.class,
"Node_Registration_Container_Report");

/**
* Event generated on DataNode Registration Container Report.
*/
public static final TypedEvent<NodeRegistrationContainerReport>
CONTAINER_REGISTRATION_REPORT = new TypedEvent<>(
NodeRegistrationContainerReport.class, "Container_Registration_Report");

/**
* ContainerReports are sent out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
Expand All @@ -50,10 +58,15 @@ public class ContainerSafeModeRule extends
// Required cutoff % for containers with at least 1 reported replica.
private double safeModeCutoff;
// Containers read from scm db (excluding containers in ALLOCATED state).
private Map<Long, ContainerInfo> containerMap;
private double maxContainer;

private AtomicLong containerWithMinReplicas = new AtomicLong(0);
private Set<Long> reportedConatinerIDSet = new HashSet<>();
private Map<Long, ContainerInfo> ratisContainerMap;
private Map<Long, Set<UUID>> ratisContainerDNsMap;
private Map<Long, ContainerInfo> ecContainerMap;
private Map<Long, Set<UUID>> ecContainerDNsMap;
private double ratisMaxContainer;
private double ecMaxContainer;
private AtomicLong ratisContainerWithMinReplicas = new AtomicLong(0);
private AtomicLong ecContainerWithMinReplicas = new AtomicLong(0);
private final ContainerManager containerManager;

public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
Expand All @@ -71,83 +84,166 @@ public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT +
" value should be >= 0.0 and <= 1.0");

containerMap = new ConcurrentHashMap<>();
ratisContainerMap = new ConcurrentHashMap<>();
ratisContainerDNsMap = new ConcurrentHashMap<>();
ecContainerMap = new ConcurrentHashMap<>();
ecContainerDNsMap = new ConcurrentHashMap<>();

containers.forEach(container -> {
// There can be containers in OPEN/CLOSING state which were never
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED)
&& container.getNumberOfKeys() > 0)
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
LifeCycleState containerState = container.getState();
ReplicationConfig replicationConfig = container.getReplicationConfig();

if (checkContainerState(containerState) && container.getNumberOfKeys() > 0) {
if (replicationConfig instanceof RatisReplicationConfig) {
ratisContainerMap.put(container.getContainerID(), container);
}
if (replicationConfig instanceof ECReplicationConfig) {
ecContainerMap.put(container.getContainerID(), container);
}
}
});
maxContainer = containerMap.size();
long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);

LOG.info("containers with one replica threshold count {}", cutOff);
ratisMaxContainer = ratisContainerMap.size();
ecMaxContainer = ecContainerMap.size();

long ratisCutOff = (long) Math.ceil(ratisMaxContainer * safeModeCutoff);
long ecCutOff = (long) Math.ceil(ecMaxContainer * safeModeCutoff);

getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(ratisCutOff);

LOG.info("Containers with one replica threshold count {}, with ec n replica threshold count {}.",
ratisCutOff, ecCutOff);
}


@Override
protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
return SCMEvents.CONTAINER_REGISTRATION_REPORT;
}


@Override
protected synchronized boolean validate() {
return getCurrentContainerThreshold() >= safeModeCutoff;
return (getCurrentContainerThreshold() >= safeModeCutoff) &&
(getCurrentECContainerThreshold() >= safeModeCutoff);
}

@VisibleForTesting
public synchronized double getCurrentContainerThreshold() {
if (maxContainer == 0) {
if (ratisMaxContainer == 0) {
return 1;
}
return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
}

@VisibleForTesting
public synchronized double getCurrentECContainerThreshold() {
if (ecMaxContainer == 0) {
return 1;
}
return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
}

public synchronized double getEcMaxContainer() {
if (ecMaxContainer == 0) {
return 1;
}
return (containerWithMinReplicas.doubleValue() / maxContainer);
return ecMaxContainer;
}

private synchronized double getRatisMaxContainer() {
if (ratisMaxContainer == 0) {
return 1;
}
return ratisMaxContainer;
}

@Override
protected synchronized void process(
NodeRegistrationContainerReport reportsProto) {
DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
UUID datanodeUUID = datanodeDetails.getUuid();
StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = reportsProto.getReport();

report.getReportsList().forEach(c -> {
long containerID = c.getContainerID();

reportsProto.getReport().getReportsList().forEach(c -> {
if (containerMap.containsKey(c.getContainerID())) {
if (containerMap.remove(c.getContainerID()) != null) {
containerWithMinReplicas.getAndAdd(1);
getSafeModeMetrics()
.incCurrentContainersWithOneReplicaReportedCount();
if (ratisContainerMap.containsKey(containerID)) {
ratisContainerDNsMap.computeIfAbsent(containerID, key -> Sets.newHashSet());
ratisContainerDNsMap.get(containerID).add(datanodeUUID);
if (!reportedConatinerIDSet.contains(containerID)) {
Set<UUID> uuids = ratisContainerDNsMap.get(containerID);
if (uuids != null && uuids.size() >= 1) {
ratisContainerWithMinReplicas.getAndAdd(1);
reportedConatinerIDSet.add(containerID);
getSafeModeMetrics()
.incCurrentContainersWithOneReplicaReportedCount();
}
}
}

if (ecContainerMap.containsKey(containerID)) {
ecContainerDNsMap.computeIfAbsent(containerID, key -> Sets.newHashSet());
ecContainerDNsMap.get(containerID).add(datanodeUUID);
if (!reportedConatinerIDSet.contains(containerID)) {
Set<UUID> uuids = ecContainerDNsMap.get(containerID);
ContainerInfo containerInfo = ecContainerMap.get(containerID);
ReplicationConfig replicationConfig = containerInfo.getReplicationConfig();
if (replicationConfig != null && replicationConfig instanceof ECReplicationConfig) {
ECReplicationConfig ecReplicationConfig = (ECReplicationConfig) replicationConfig;
int data = ecReplicationConfig.getData();
if (uuids != null && uuids.size() > data) {
ecContainerWithMinReplicas.getAndAdd(1);
reportedConatinerIDSet.add(containerID);
getSafeModeMetrics()
.incCurrentContainersWithECDataReplicaReportedCount();
}
}
}
}
});

if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. {} % containers have at least one"
+ " reported replica.",
(containerWithMinReplicas.doubleValue() / maxContainer) * 100);
"SCM in safe mode. {} % containers [Ratis] have at least one"
+ " reported replica, {} % containers [EC] have at N reported replica.",
((ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100),
((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100)
);
}
}

@Override
protected synchronized void cleanup() {
containerMap.clear();
ratisContainerMap.clear();
ratisContainerDNsMap.clear();
ecContainerMap.clear();
ecContainerDNsMap.clear();
reportedConatinerIDSet.clear();
}

@Override
public String getStatusText() {
List<Long> sampleContainers = containerMap.keySet()
List<Long> sampleContainers = ratisContainerMap.keySet()
.stream()
.limit(SAMPLE_CONTAINER_DISPLAY_LIMIT)
.collect(Collectors.toList());

String status = String.format("%% of containers with at least one reported"
+ " replica (=%1.2f) >= safeModeCutoff (=%1.2f)",
getCurrentContainerThreshold(), this.safeModeCutoff);
String status = String.format(
"%1.2f%% of [Ratis] Containers(%s / %s) with at least one reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f);" +
"%1.2f%% of [EC] Containers(%s / %s) with at least N reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f)",
(ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100,
ratisContainerWithMinReplicas, (long) getRatisMaxContainer(),
getCurrentContainerThreshold(), this.safeModeCutoff,
(ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100,
ecContainerWithMinReplicas, (long) getEcMaxContainer(),
getCurrentECContainerThreshold(), this.safeModeCutoff);

if (!sampleContainers.isEmpty()) {
String sampleContainerText =
Expand All @@ -170,28 +266,44 @@ public synchronized void refresh(boolean forceRefresh) {
}
}

private boolean checkContainerState(LifeCycleState state) {
if (state == LifeCycleState.QUASI_CLOSED || state == LifeCycleState.CLOSED) {
return true;
}
return false;
}

private void reInitializeRule() {
containerMap.clear();

containerManager.getContainers().forEach(container -> {
// There can be containers in OPEN/CLOSING state which were never
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED)
&& container.getNumberOfKeys() > 0)
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
LifeCycleState containerState = container.getState();
ReplicationConfig replicationConfig = container.getReplicationConfig();

if (checkContainerState(containerState) && container.getNumberOfKeys() > 0) {
if (replicationConfig instanceof RatisReplicationConfig) {
ratisContainerMap.put(container.getContainerID(), container);
}
if (replicationConfig instanceof ECReplicationConfig) {
ecContainerMap.put(container.getContainerID(), container);
}
}
});

maxContainer = containerMap.size();
long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
ratisMaxContainer = ratisContainerMap.size();
ecMaxContainer = ecContainerMap.size();

long ratisCutOff = (long) Math.ceil(ratisMaxContainer * safeModeCutoff);
long ecCutOff = (long) Math.ceil(ecMaxContainer * safeModeCutoff);

getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(ratisCutOff);
getSafeModeMetrics().setNumContainerWithECDataReplicaReportedThreshold(ecCutOff);

LOG.info("Refreshed one replica container threshold {}, " +
"currentThreshold {}", cutOff, containerWithMinReplicas.get());
getSafeModeMetrics()
.setNumContainerWithOneReplicaReportedThreshold(cutOff);
LOG.info("Refreshed Containers with one replica threshold count {}, " +
"with ec n replica threshold count {}.", ratisCutOff, ecCutOff);
}

}
Loading

0 comments on commit b65a565

Please sign in to comment.