Skip to content

Commit

Permalink
HDDS-11243. SCM SafeModeRule Support EC.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 committed Oct 3, 2024
1 parent 31f9f2c commit 57f59f9
Show file tree
Hide file tree
Showing 15 changed files with 366 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public final class HddsConfigKeys {
public static final double
HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT_DEFAULT = 0.90;

public static final String HDDS_SCM_SAFEMODE_REPORTED_DATANODE_PCT =
"hdds.scm.safemode.reported.datanode.pct";
public static final double HDDS_SCM_SAFEMODE_REPORTED_DATANODE_PCT_DEFAULT = 0.90;

// This configuration setting is used as a fallback location by all
// Ozone/HDDS services for their metadata. It is useful as a single
// config point for test/PoC clusters.
Expand Down
9 changes: 9 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,15 @@
</description>
</property>

<property>
<name>hdds.scm.safemode.reported.datanode.pct</name>
<value>0.90</value>
<tag>HDDS,SCM,OPERATION</tag>
<description>
Percentage of successfully reported datanodes.
</description>
</property>

<property>
<name>hdds.container.action.max.limit</name>
<value>20</value>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.report.ContainerReportValidator;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerReportFromDatanode;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
Expand Down Expand Up @@ -199,6 +201,11 @@ public void onMessage(final ContainerReportFromDatanode reportFromDatanode,
// list
processMissingReplicas(datanodeDetails, expectedContainersInDatanode);
containerManager.notifyContainerReportProcessing(true, true);
if (reportFromDatanode.isRegister()) {
publisher.fireEvent(SCMEvents.CONTAINER_REGISTRATION_REPORT,
new SCMDatanodeProtocolServer.NodeRegistrationContainerReport(datanodeDetails,
reportFromDatanode.getReport()));
}
}
} catch (NodeNotFoundException ex) {
containerManager.notifyContainerReportProcessing(true, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,13 @@ public final class SCMEvents {
NodeRegistrationContainerReport.class,
"Node_Registration_Container_Report");

/**
* Event generated on DataNode Registration Container Report.
*/
public static final TypedEvent<NodeRegistrationContainerReport>
CONTAINER_REGISTRATION_REPORT = new TypedEvent<>(
NodeRegistrationContainerReport.class, "Container_Registration_Report");

/**
* ContainerReports are sent out by Datanodes. This report is received by
* SCMDatanodeHeartbeatDispatcher and Container_Report Event is generated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import com.google.common.collect.Sets;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
Expand All @@ -50,10 +58,15 @@ public class ContainerSafeModeRule extends
// Required cutoff % for containers with at least 1 reported replica.
private double safeModeCutoff;
// Containers read from scm db (excluding containers in ALLOCATED state).
private Map<Long, ContainerInfo> containerMap;
private double maxContainer;

private AtomicLong containerWithMinReplicas = new AtomicLong(0);
private Set<Long> reportedContainerIDSet = new HashSet<>();
private Map<Long, ContainerInfo> ratisContainerMap;
private Map<Long, Set<UUID>> ratisContainerDNsMap;
private Map<Long, ContainerInfo> ecContainerMap;
private Map<Long, Set<UUID>> ecContainerDNsMap;
private double ratisMaxContainer;
private double ecMaxContainer;
private AtomicLong ratisContainerWithMinReplicas = new AtomicLong(0);
private AtomicLong ecContainerWithMinReplicas = new AtomicLong(0);
private final ContainerManager containerManager;

public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
Expand All @@ -71,83 +84,175 @@ public ContainerSafeModeRule(String ruleName, EventQueue eventQueue,
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT +
" value should be >= 0.0 and <= 1.0");

containerMap = new ConcurrentHashMap<>();
containers.forEach(container -> {
// There can be containers in OPEN/CLOSING state which were never
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED)
&& container.getNumberOfKeys() > 0)
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
});
maxContainer = containerMap.size();
long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(cutOff);
ratisContainerMap = new ConcurrentHashMap<>();
ratisContainerDNsMap = new ConcurrentHashMap<>();
ecContainerMap = new ConcurrentHashMap<>();
ecContainerDNsMap = new ConcurrentHashMap<>();

LOG.info("containers with one replica threshold count {}", cutOff);
initializeRule(containers);
}


@Override
protected TypedEvent<NodeRegistrationContainerReport> getEventType() {
return SCMEvents.NODE_REGISTRATION_CONT_REPORT;
return SCMEvents.CONTAINER_REGISTRATION_REPORT;
}


@Override
protected synchronized boolean validate() {
return getCurrentContainerThreshold() >= safeModeCutoff;
return (getCurrentContainerThreshold() >= safeModeCutoff) &&
(getCurrentECContainerThreshold() >= safeModeCutoff);
}

@VisibleForTesting
public synchronized double getCurrentContainerThreshold() {
if (maxContainer == 0) {
if (ratisMaxContainer == 0) {
return 1;
}
return (containerWithMinReplicas.doubleValue() / maxContainer);
return (ratisContainerWithMinReplicas.doubleValue() / ratisMaxContainer);
}

@VisibleForTesting
public synchronized double getCurrentECContainerThreshold() {
if (ecMaxContainer == 0) {
return 1;
}
return (ecContainerWithMinReplicas.doubleValue() / ecMaxContainer);
}

public synchronized double getEcMaxContainer() {
if (ecMaxContainer == 0) {
return 1;
}
return ecMaxContainer;
}

private synchronized double getRatisMaxContainer() {
if (ratisMaxContainer == 0) {
return 1;
}
return ratisMaxContainer;
}

@Override
protected synchronized void process(
NodeRegistrationContainerReport reportsProto) {
DatanodeDetails datanodeDetails = reportsProto.getDatanodeDetails();
UUID datanodeUUID = datanodeDetails.getUuid();
StorageContainerDatanodeProtocolProtos.ContainerReportsProto report = reportsProto.getReport();

reportsProto.getReport().getReportsList().forEach(c -> {
if (containerMap.containsKey(c.getContainerID())) {
if (containerMap.remove(c.getContainerID()) != null) {
containerWithMinReplicas.getAndAdd(1);
getSafeModeMetrics()
.incCurrentContainersWithOneReplicaReportedCount();
}
report.getReportsList().forEach(c -> {
long containerID = c.getContainerID();

// If it is a Ratis container.
if (ratisContainerMap.containsKey(containerID)) {
initContainerDNsMap(containerID, ratisContainerDNsMap, datanodeUUID);
recordReportedContainer(containerID, Boolean.FALSE);
}

// If it is a EC container.
if (ecContainerMap.containsKey(containerID)) {
initContainerDNsMap(containerID, ecContainerDNsMap, datanodeUUID);
recordReportedContainer(containerID, Boolean.TRUE);
}
});

if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. {} % containers have at least one"
+ " reported replica.",
(containerWithMinReplicas.doubleValue() / maxContainer) * 100);
"SCM in safe mode. {} % containers [Ratis] have at least one"
+ " reported replica, {} % containers [EC] have at N reported replica.",
((ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100),
((ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100)
);
}
}

/**
* Record the reported Container.
*
* We will differentiate and count according to the type of Container.
*
* @param containerID containerID
* @param isEcContainer true, means ECContainer, false, means not ECContainer.
*/
private void recordReportedContainer(long containerID, boolean isEcContainer) {
if (!reportedContainerIDSet.contains(containerID)) {
Set<UUID> uuids = isEcContainer ? ecContainerDNsMap.get(containerID) :
ratisContainerDNsMap.get(containerID);
int minReplica = getMinReplica(containerID, isEcContainer);
if (uuids != null && uuids.size() >= minReplica) {
reportedContainerIDSet.add(containerID);
if (isEcContainer) {
getSafeModeMetrics()
.incCurrentContainersWithECDataReplicaReportedCount();
ecContainerWithMinReplicas.getAndAdd(1);
} else {
ratisContainerWithMinReplicas.getAndAdd(1);
getSafeModeMetrics()
.incCurrentContainersWithOneReplicaReportedCount();
}
}
}
}

/**
* Get the minimum replica.
*
* If it is a Ratis Contianer, the minimum copy is 1.
* If it is an EC Container, the minimum copy will be the number of Data in replicationConfig.
*
* @param containerID containerID
* @param isEcContainer true, means ECContainer, false, means not ECContainer.
* @return MinReplica.
*/
private int getMinReplica(long containerID, boolean isEcContainer) {
if (isEcContainer) {
ContainerInfo containerInfo = ecContainerMap.get(containerID);
if (containerInfo != null) {
ReplicationConfig replicationConfig = containerInfo.getReplicationConfig();
if (replicationConfig != null && replicationConfig instanceof ECReplicationConfig) {
ECReplicationConfig ecReplicationConfig = (ECReplicationConfig) replicationConfig;
return ecReplicationConfig.getData();
}
}
}
return 1;
}

private void initContainerDNsMap(long containerID, Map<Long, Set<UUID>> containerDNsMap,
UUID datanodeUUID) {
containerDNsMap.computeIfAbsent(containerID, key -> Sets.newHashSet());
containerDNsMap.get(containerID).add(datanodeUUID);
}

@Override
protected synchronized void cleanup() {
containerMap.clear();
ratisContainerMap.clear();
ratisContainerDNsMap.clear();
ecContainerMap.clear();
ecContainerDNsMap.clear();
reportedContainerIDSet.clear();
}

@Override
public String getStatusText() {
List<Long> sampleContainers = containerMap.keySet()
List<Long> sampleContainers = ratisContainerMap.keySet()
.stream()
.limit(SAMPLE_CONTAINER_DISPLAY_LIMIT)
.collect(Collectors.toList());

String status = String.format("%% of containers with at least one reported"
+ " replica (=%1.2f) >= safeModeCutoff (=%1.2f)",
getCurrentContainerThreshold(), this.safeModeCutoff);
String status = String.format(
"%1.2f%% of [Ratis] Containers(%s / %s) with at least one reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f);" +
"%1.2f%% of [EC] Containers(%s / %s) with at least N reported replica (=%1.2f) >= " +
"safeModeCutoff (=%1.2f)",
(ratisContainerWithMinReplicas.doubleValue() / getRatisMaxContainer()) * 100,
ratisContainerWithMinReplicas, (long) getRatisMaxContainer(),
getCurrentContainerThreshold(), this.safeModeCutoff,
(ecContainerWithMinReplicas.doubleValue() / getEcMaxContainer()) * 100,
ecContainerWithMinReplicas, (long) getEcMaxContainer(),
getCurrentECContainerThreshold(), this.safeModeCutoff);

if (!sampleContainers.isEmpty()) {
String sampleContainerText =
Expand All @@ -161,37 +266,53 @@ public String getStatusText() {

@Override
public synchronized void refresh(boolean forceRefresh) {
List<ContainerInfo> containers = containerManager.getContainers();
if (forceRefresh) {
reInitializeRule();
initializeRule(containers);
} else {
if (!validate()) {
reInitializeRule();
initializeRule(containers);
}
}
}

private void reInitializeRule() {
containerMap.clear();
containerManager.getContainers().forEach(container -> {
private boolean checkContainerState(LifeCycleState state) {
if (state == LifeCycleState.QUASI_CLOSED || state == LifeCycleState.CLOSED) {
return true;
}
return false;
}

private void initializeRule(List<ContainerInfo> containers) {

containers.forEach(container -> {
// There can be containers in OPEN/CLOSING state which were never
// created by the client. We are not considering these containers for
// now. These containers can be handled by tracking pipelines.

Optional.ofNullable(container.getState())
.filter(state -> (state == HddsProtos.LifeCycleState.QUASI_CLOSED ||
state == HddsProtos.LifeCycleState.CLOSED)
&& container.getNumberOfKeys() > 0)
.ifPresent(s -> containerMap.put(container.getContainerID(),
container));
LifeCycleState containerState = container.getState();
ReplicationConfig replicationConfig = container.getReplicationConfig();

if (checkContainerState(containerState) && container.getNumberOfKeys() > 0) {
if (replicationConfig instanceof RatisReplicationConfig) {
ratisContainerMap.put(container.getContainerID(), container);
}
if (replicationConfig instanceof ECReplicationConfig) {
ecContainerMap.put(container.getContainerID(), container);
}
}
});

maxContainer = containerMap.size();
long cutOff = (long) Math.ceil(maxContainer * safeModeCutoff);
ratisMaxContainer = ratisContainerMap.size();
ecMaxContainer = ecContainerMap.size();

LOG.info("Refreshed one replica container threshold {}, " +
"currentThreshold {}", cutOff, containerWithMinReplicas.get());
getSafeModeMetrics()
.setNumContainerWithOneReplicaReportedThreshold(cutOff);
}
long ratisCutOff = (long) Math.ceil(ratisMaxContainer * safeModeCutoff);
long ecCutOff = (long) Math.ceil(ecMaxContainer * safeModeCutoff);

getSafeModeMetrics().setNumContainerWithOneReplicaReportedThreshold(ratisCutOff);
getSafeModeMetrics().setNumContainerWithECDataReplicaReportedThreshold(ecCutOff);

LOG.info("Refreshed Containers with one replica threshold count {}, " +
"with ec n replica threshold count {}.", ratisCutOff, ecCutOff);
}
}
Loading

0 comments on commit 57f59f9

Please sign in to comment.