Skip to content

Commit

Permalink
rebase master
Browse files Browse the repository at this point in the history
  • Loading branch information
jsjtzyy committed Jan 15, 2020
1 parent 675abb4 commit f679e74
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ void registerPartitionStateChangeListener(StateModelListenerType listenerType,
*/
ReplicaSyncUpManager getReplicaSyncUpManager();

/**
* Update disk/replica infos associated with current data node in cluster (this occurs when replica addition/removal
* on current node is complete and local changes will be broadcast to all listeners in this cluster)
* @param replicaId the {@link ReplicaId} whose info should be updated on current node
* @param shouldExist Whether the replica info should exist or not. When {@code true}, replica info will be added if
* it is missing in current node info. When {@code false}, replica info will be removed if present.
* @return if {@code true}, node info is successfully updated. {@code false} otherwise.
*/
boolean updateDataNodeInfoInCluster(ReplicaId replicaId, boolean shouldExist);

/**
* Terminate the participant.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public enum TransitionErrorCode {
/**
* If disconnection process fails on specific replica.
*/
DisconnectionFailure
DisconnectionFailure,
/**
* If updating cluster info in Helix fails at some point for specific replica.
*/
HelixUpdateFailure
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,13 @@ public class ClusterMapConfig {
@Config("clustermap.replica.catchup.target")
public final int clustermapReplicaCatchupTarget;

/**
* Whether to allow participant to dynamically update its datanode info in cluster.
*/
@Config("clustermap.update.datanode.info")
@Default("false")
public final boolean clustermapUpdateDatanodeInfo;

public ClusterMapConfig(VerifiableProperties verifiableProperties) {
clusterMapFixedTimeoutDatanodeErrorThreshold =
verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100);
Expand Down Expand Up @@ -261,5 +268,6 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) {
verifiableProperties.getLongInRange("clustermap.replica.catchup.acceptable.lag.bytes", 0L, 0L, Long.MAX_VALUE);
clustermapReplicaCatchupTarget =
verifiableProperties.getIntInRange("clustermap.replica.catchup.target", 0, 0, Integer.MAX_VALUE);
clustermapUpdateDatanodeInfo = verifiableProperties.getBoolean("clustermap.update.datanode.info", false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,154 @@ public ReplicaSyncUpManager getReplicaSyncUpManager() {
return replicaSyncUpManager;
}

@Override
public boolean updateDataNodeInfoInCluster(ReplicaId replicaId, boolean shouldExist) {
boolean updateResult = true;
if (clusterMapConfig.clustermapUpdateDatanodeInfo) {
synchronized (helixAdministrationLock) {
InstanceConfig instanceConfig = helixAdmin.getInstanceConfig(clusterName, instanceName);
if (instanceConfig == null) {
throw new IllegalStateException(
"No instance config found for cluster: \"" + clusterName + "\", instance: \"" + instanceName + "\"");
}
updateResult = shouldExist ? addNewReplicaInfo(replicaId, instanceConfig)
: removeOldReplicaInfo(replicaId, instanceConfig);
}
}
return updateResult;
}

/**
* @return a snapshot of registered state change listeners.
*/
public Map<StateModelListenerType, PartitionStateChangeListener> getPartitionStateChangeListeners() {
return Collections.unmodifiableMap(partitionStateChangeListeners);
}

/**
* @return {@link HelixAdmin} that manages current data node.
*/
public HelixAdmin getHelixAdmin() {
return helixAdmin;
}

/**
* Add new replica info into {@link InstanceConfig} of current data node.
* @param replicaId new replica whose info should be added into {@link InstanceConfig}.
* @param instanceConfig the {@link InstanceConfig} to update.
* @return {@code true} replica info is successfully added. {@code false} otherwise.
*/
private boolean addNewReplicaInfo(ReplicaId replicaId, InstanceConfig instanceConfig) {
boolean additionResult = true;
String partitionName = replicaId.getPartitionId().toPathString();
String newReplicaInfo = partitionName + ClusterMapUtils.REPLICAS_STR_SEPARATOR + replicaId.getCapacityInBytes()
+ ClusterMapUtils.REPLICAS_STR_SEPARATOR + replicaId.getPartitionId().getPartitionClass()
+ ClusterMapUtils.REPLICAS_DELIM_STR;
Map<String, Map<String, String>> mountPathToDiskInfos = instanceConfig.getRecord().getMapFields();
Map<String, String> diskInfo = mountPathToDiskInfos.get(replicaId.getMountPath());
boolean newReplicaInfoAdded = false;
boolean duplicateFound = false;
if (diskInfo != null) {
// add replica to an existing disk (need to sort replicas by partition id)
String replicasStr = diskInfo.get(ClusterMapUtils.REPLICAS_STR);
String[] replicaInfos = replicasStr.split(ClusterMapUtils.REPLICAS_DELIM_STR);
StringBuilder replicasStrBuilder = new StringBuilder();
long idToAdd = Long.valueOf(partitionName);
for (String replicaInfo : replicaInfos) {
String[] infos = replicaInfo.split(ClusterMapUtils.REPLICAS_STR_SEPARATOR);
long currentId = Long.valueOf(infos[0]);
if (currentId == idToAdd) {
logger.info("Partition {} is already on instance {}, skipping adding it into InstanceConfig in Helix.",
partitionName, instanceName);
duplicateFound = true;
break;
} else if (currentId < idToAdd || newReplicaInfoAdded) {
replicasStrBuilder.append(replicaInfo).append(ClusterMapUtils.REPLICAS_DELIM_STR);
} else {
// newReplicaInfo already contains delimiter, no need to append REPLICAS_DELIM_STR
replicasStrBuilder.append(newReplicaInfo);
replicasStrBuilder.append(replicaInfo).append(ClusterMapUtils.REPLICAS_DELIM_STR);
newReplicaInfoAdded = true;
}
}
if (!duplicateFound && !newReplicaInfoAdded) {
// this means new replica id is larger than all existing replicas' ids
replicasStrBuilder.append(newReplicaInfo);
newReplicaInfoAdded = true;
}
if (newReplicaInfoAdded) {
diskInfo.put(ClusterMapUtils.REPLICAS_STR, replicasStrBuilder.toString());
mountPathToDiskInfos.put(replicaId.getMountPath(), diskInfo);
}
} else {
// add replica onto a brand new disk
Map<String, String> diskInfoToAdd = new HashMap<>();
diskInfoToAdd.put(ClusterMapUtils.DISK_CAPACITY_STR,
Long.toString(replicaId.getDiskId().getRawCapacityInBytes()));
diskInfoToAdd.put(ClusterMapUtils.DISK_STATE, ClusterMapUtils.AVAILABLE_STR);
diskInfoToAdd.put(ClusterMapUtils.REPLICAS_STR, newReplicaInfo);
mountPathToDiskInfos.put(replicaId.getMountPath(), diskInfoToAdd);
newReplicaInfoAdded = true;
}
if (newReplicaInfoAdded) {
// we update InstanceConfig only when new replica info is added (skip updating if replica is already present)
instanceConfig.getRecord().setMapFields(mountPathToDiskInfos);
logger.info("Updating config: {} in Helix by adding partition {}", instanceConfig, partitionName);
additionResult = helixAdmin.setInstanceConfig(clusterName, instanceName, instanceConfig);
}
return additionResult;
}

/**
* Remove old/existing replica info from {@link InstanceConfig} that associates with current data node.
* @param replicaId the {@link ReplicaId} whose info should be removed.
* @param instanceConfig {@link InstanceConfig} to update.
* @return {@code true} replica info is successfully removed. {@code false} otherwise.
*/
private boolean removeOldReplicaInfo(ReplicaId replicaId, InstanceConfig instanceConfig) {
boolean removalResult = true;
boolean replicaFound = false;
String partitionName = replicaId.getPartitionId().toPathString();
Map<String, Map<String, String>> mountPathToDiskInfos = instanceConfig.getRecord().getMapFields();
Map<String, String> diskInfo = mountPathToDiskInfos.get(replicaId.getMountPath());
if (diskInfo != null) {
String replicasStr = diskInfo.get(ClusterMapUtils.REPLICAS_STR);
if (!replicasStr.isEmpty()) {
String[] replicaInfos = replicasStr.split(ClusterMapUtils.REPLICAS_DELIM_STR);
for (String replicaInfo : replicaInfos) {
String[] infos = replicaInfo.split(ClusterMapUtils.REPLICAS_STR_SEPARATOR);
if (infos[0].equals(partitionName)) {
replicaFound = true;
break;
}
}
// We update InstanceConfig only when replica is found in current instanceConfig. (This is to avoid unnecessary
// notification traffic due to InstanceConfig change)
if (replicaFound) {
StringBuilder newReplicasStrBuilder = new StringBuilder();
for (String replicaInfo : replicaInfos) {
String[] infos = replicaInfo.split(ClusterMapUtils.REPLICAS_STR_SEPARATOR);
if (!infos[0].equals(partitionName)) {
newReplicasStrBuilder.append(replicaInfo).append(ClusterMapUtils.REPLICAS_DELIM_STR);
}
}
// update diskInfo and MountPathToDisk map
diskInfo.put(ClusterMapUtils.REPLICAS_STR, newReplicasStrBuilder.toString());
mountPathToDiskInfos.put(replicaId.getMountPath(), diskInfo);
// update InstanceConfig
instanceConfig.getRecord().setMapFields(mountPathToDiskInfos);
logger.info("Updating config: {} in Helix by removing partition {}", instanceConfig, partitionName);
removalResult = helixAdmin.setInstanceConfig(clusterName, instanceName, instanceConfig);
}
}
}
if (!replicaFound) {
logger.warn("Partition {} is not found on instance {}, skipping removing it from InstanceConfig in Helix.",
partitionName, instanceName);
}
return removalResult;
}

/**
* Register {@link HelixHealthReportAggregatorTask}s for appropriate {@link AmbryHealthReport}s.
* @param engine the {@link StateMachineEngine} to register the task state model.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ public void registerPartitionStateChangeListener(StateModelListenerType listener
public ReplicaSyncUpManager getReplicaSyncUpManager() {
return null;
}

@Override
public boolean updateDataNodeInfoInCluster(ReplicaId replicaId, boolean shouldExist) {
// static clustermap doesn't support updating node info dynamically.
return false;
}
};
}
return clusterParticipant;
Expand Down
Loading

0 comments on commit f679e74

Please sign in to comment.