-
Notifications
You must be signed in to change notification settings - Fork 275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upload new replicas infos onto Helix PropertyStore #1249
Changes from all commits
56cb657
bc159a2
c36a133
e6b3e2e
e63b79e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -185,31 +185,29 @@ static void bootstrapOrUpgrade(String hardwareLayoutPath, String partitionLayout | |
} | ||
|
||
/** | ||
* Takes in the path to the files that make up the static cluster map and uploads the cluster configs(such as partition | ||
* override) to HelixPropertyStore in zookeeper. | ||
* Takes in the path to the files that make up the static cluster map and uploads the cluster admin configs (such as | ||
* partition override, replica addition) to HelixPropertyStore in zookeeper. | ||
* @param hardwareLayoutPath the path to the hardware layout file. | ||
* @param partitionLayoutPath the path to the partition layout file. | ||
* @param zkLayoutPath the path to the zookeeper layout file. | ||
* @param clusterNamePrefix the prefix that when combined with the cluster name in the static cluster map files | ||
* will give the cluster name in Helix to bootstrap or upgrade. | ||
* @param dcs the comma-separated list of datacenters that needs to be upgraded/bootstrapped. | ||
* @param dcs the comma-separated list of data centers that needs to be upgraded/bootstrapped. | ||
* @param maxPartitionsInOneResource the maximum number of Ambry partitions to group under a single Helix resource. | ||
* @param helixAdminFactory the {@link HelixAdminFactory} to use to instantiate {@link HelixAdmin} | ||
* @param stateModelDef the state model definition to use in Ambry cluster. | ||
* @param adminTypes types of admin operation that requires to generate config and upload it to Helix PropertyStore. | ||
* @throws IOException if there is an error reading a file. | ||
* @throws JSONException if there is an error parsing the JSON content in any of the files. | ||
*/ | ||
static void uploadClusterConfigs(String hardwareLayoutPath, String partitionLayoutPath, String zkLayoutPath, | ||
static void uploadClusterAdminConfigs(String hardwareLayoutPath, String partitionLayoutPath, String zkLayoutPath, | ||
String clusterNamePrefix, String dcs, int maxPartitionsInOneResource, HelixAdminFactory helixAdminFactory, | ||
String stateModelDef) throws Exception { | ||
String[] adminTypes) throws Exception { | ||
HelixBootstrapUpgradeUtil clusterMapToHelixMapper = | ||
new HelixBootstrapUpgradeUtil(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath, clusterNamePrefix, dcs, | ||
maxPartitionsInOneResource, false, false, helixAdminFactory, stateModelDef); | ||
Map<String, Map<String, String>> partitionOverrideInfos = | ||
clusterMapToHelixMapper.generatePartitionOverrideFromAllDCs(); | ||
info("Uploading partition override to HelixPropertyStore based on override json file."); | ||
clusterMapToHelixMapper.uploadPartitionOverride(partitionOverrideInfos); | ||
info("Upload cluster configs completed."); | ||
maxPartitionsInOneResource, false, false, helixAdminFactory, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why it's leaderstandby model? thought it should be the offline/boostrap/leader/standby? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will make it configurable. For now, we are still using default LeaderStandby model. To use new state model, we first need to create new model on Helix and make all participants register the new model. |
||
for (String adminType : adminTypes) { | ||
generateAdminInfosAndUpload(clusterMapToHelixMapper, adminType); | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -268,6 +266,33 @@ static void dropCluster(String zkLayoutPath, String clusterName, String dcs, Hel | |
} | ||
} | ||
|
||
/** | ||
* Generate cluster admin configs based on admin type and upload them to Helix PropertyStore | ||
* @param clusterMapToHelixMapper {@link HelixBootstrapUpgradeUtil} to use. | ||
* @param adminType the type of admin operation. | ||
*/ | ||
private static void generateAdminInfosAndUpload(HelixBootstrapUpgradeUtil clusterMapToHelixMapper, String adminType) { | ||
switch (adminType) { | ||
case ClusterMapUtils.PARTITION_OVERRIDE_STR: | ||
Map<String, Map<String, Map<String, String>>> partitionOverrideInfosByDc = | ||
clusterMapToHelixMapper.generatePartitionOverrideFromAllDCs(); | ||
clusterMapToHelixMapper.uploadClusterAdminInfos(partitionOverrideInfosByDc, | ||
ClusterMapUtils.PARTITION_OVERRIDE_STR, ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH); | ||
break; | ||
case ClusterMapUtils.REPLICA_ADDITION_STR: | ||
Map<String, Map<String, Map<String, String>>> replicaAdditionInfosByDc = | ||
clusterMapToHelixMapper.generateReplicaAdditionMap(); | ||
clusterMapToHelixMapper.uploadClusterAdminInfos(replicaAdditionInfosByDc, ClusterMapUtils.REPLICA_ADDITION_STR, | ||
ClusterMapUtils.REPLICA_ADDITION_ZNODE_PATH); | ||
break; | ||
default: | ||
throw new IllegalArgumentException( | ||
"Unrecognized admin config type: " + adminType + ". Current supported types are: " | ||
+ ClusterMapUtils.PARTITION_OVERRIDE_STR + ", " + ClusterMapUtils.REPLICA_ADDITION_STR); | ||
} | ||
info("Upload cluster configs completed."); | ||
} | ||
|
||
/** | ||
* Instantiates this class with the given information. | ||
* @param hardwareLayoutPath the path to the hardware layout file. | ||
|
@@ -324,7 +349,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay | |
|
||
/** | ||
* Generate the partition override map containing partition state from all datacenters. | ||
* @return the constructed partitionOverrideInfos. The format is as follows. | ||
* @return the constructed partitionOverrideInfos by dc. The format is as follows. | ||
* | ||
* "mapFields": { | ||
* "0": { | ||
|
@@ -337,7 +362,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay | |
* } | ||
* } | ||
*/ | ||
private Map<String, Map<String, String>> generatePartitionOverrideFromAllDCs() { | ||
private Map<String, Map<String, Map<String, String>>> generatePartitionOverrideFromAllDCs() { | ||
Map<String, Map<String, String>> partitionOverrideInfos = new HashMap<>(); | ||
for (PartitionId partitionId : staticClusterMap.getAllPartitionIds(null)) { | ||
String partitionName = partitionId.toPathString(); | ||
|
@@ -347,37 +372,112 @@ private Map<String, Map<String, String>> generatePartitionOverrideFromAllDCs() { | |
: ClusterMapUtils.READ_ONLY_STR); | ||
partitionOverrideInfos.put(partitionName, partitionProperties); | ||
} | ||
return partitionOverrideInfos; | ||
Map<String, Map<String, Map<String, String>>> partitionOverrideByDc = new HashMap<>(); | ||
for (String dc : dataCenterToZkAddress.keySet()) { | ||
partitionOverrideByDc.put(dc, partitionOverrideInfos); | ||
} | ||
return partitionOverrideByDc; | ||
} | ||
|
||
/** | ||
* Uploads the seal state of all partitions in the format of map. | ||
* @param partitionOverrideInfos the override information for each partition. The current format is as follows. | ||
* | ||
* Generate replica addition infos map grouped by each dc. This map contains detailed replica info (size, mount path, etc) | ||
* that will be used by certain server to instantiate new added Ambry replica. The format is as follows. | ||
* "mapFields": { | ||
* "0": { | ||
* "state": "RW" | ||
* }, | ||
* "1": { | ||
* "state": "RO" | ||
* } | ||
* "1": { | ||
* "replicaCapacityInBytes": 107374182400, | ||
* "partitionClass": "max-replicas-all-datacenters", | ||
* "localhost1_17088": "/tmp/c/1", | ||
* "localhost2_17088": "/tmp/d/1" | ||
* }, | ||
* "2": { | ||
* "replicaCapacityInBytes": 107374182400, | ||
* "partitionClass": "max-replicas-all-datacenters", | ||
* "localhost3_17088": "/tmp/e/1" | ||
* } | ||
* } | ||
* | ||
* In above example, two new replicas of partition[1] will be added to localhost1 and localhost2 respectively. | ||
* The host name is followed by mount path on which the new replica should be placed. | ||
* @return a map that contains detailed replica info. | ||
*/ | ||
private Map<String, Map<String, Map<String, String>>> generateReplicaAdditionMap() { | ||
//populate dcToInstanceNameToDataNodeId and instanceToDiskReplicasMap | ||
populateInstancesAndPartitionsMap(); | ||
Map<String, Map<String, Map<String, String>>> newAddedReplicasByDc = new HashMap<>(); | ||
for (Map.Entry<String, HelixAdmin> entry : adminForDc.entrySet()) { | ||
HelixAdmin dcAdmin = entry.getValue(); | ||
String dcName = entry.getKey(); | ||
info("Generating replica addition map for datacenter {}", dcName); | ||
Map<String, Map<String, Replica>> partitionToInstancesAndReplicas = new HashMap<>(); | ||
Map<String, Map<String, String>> newAddedReplicasInDc = new HashMap<>(); | ||
for (String instanceName : dcToInstanceNameToDataNodeId.get(dcName).keySet()) { | ||
Map<DiskId, SortedSet<Replica>> diskToReplica = instanceToDiskReplicasMap.get(instanceName); | ||
for (SortedSet<Replica> replicas : diskToReplica.values()) { | ||
for (Replica replica : replicas) { | ||
partitionToInstancesAndReplicas.computeIfAbsent(replica.getPartitionId().toPathString(), | ||
key -> new HashMap<>()).put(instanceName, replica); | ||
} | ||
} | ||
} | ||
List<String> resourcesInCluster = dcAdmin.getResourcesInCluster(clusterName); | ||
for (String resourceName : resourcesInCluster) { | ||
if (!resourceName.matches("\\d+")) { | ||
continue; | ||
} | ||
IdealState idealState = dcAdmin.getResourceIdealState(clusterName, resourceName); | ||
for (String partitionStr : new HashSet<>(idealState.getPartitionSet())) { | ||
Set<String> instanceSetInHelix = idealState.getInstanceSet(partitionStr); | ||
Map<String, Replica> instanceAndReplicaInStatic = partitionToInstancesAndReplicas.get(partitionStr); | ||
if (instanceAndReplicaInStatic == null || instanceAndReplicaInStatic.isEmpty()) { | ||
info( | ||
"*** Partition {} no longer present in the static clustermap. Uploading cluster admin infos operation won't remove it *** ", | ||
partitionStr); | ||
} else if (!instanceAndReplicaInStatic.keySet().equals(instanceSetInHelix)) { | ||
info( | ||
"Different instance sets for partition {} under resource {}. Extracting new replicas from static clustermap.", | ||
partitionStr, resourceName); | ||
// instances in static only | ||
Set<String> instanceSetInStatic = instanceAndReplicaInStatic.keySet(); | ||
instanceSetInStatic.removeAll(instanceSetInHelix); | ||
for (String instance : instanceSetInStatic) { | ||
Replica replica = instanceAndReplicaInStatic.get(instance); | ||
info("New replica of partition[{}] will be added to instance {} on {}", partitionStr, instance, | ||
replica.getMountPath()); | ||
newAddedReplicasInDc.computeIfAbsent(partitionStr, key -> { | ||
Map<String, String> partitionMap = new HashMap<>(); | ||
partitionMap.put(ClusterMapUtils.PARTITION_CLASS_STR, replica.getPartitionId().getPartitionClass()); | ||
partitionMap.put(ClusterMapUtils.REPLICAS_CAPACITY_STR, String.valueOf(replica.getCapacityInBytes())); | ||
return partitionMap; | ||
}).put(instance, replica.getMountPath()); | ||
} | ||
} | ||
} | ||
} | ||
newAddedReplicasByDc.put(dcName, newAddedReplicasInDc); | ||
} | ||
return newAddedReplicasByDc; | ||
} | ||
|
||
/** | ||
* Uploads cluster config infos onto Helix PropertyStore. | ||
* @param adminInfosByDc the cluster admin information (overridden partitions, added replicas) grouped by DC that would | ||
* be applied to cluster. | ||
* @param clusterAdminType the type of cluster admin that would be uploaded (i.e. PartitionOverride, ReplicaAddition) | ||
* @param adminConfigZNodePath ZNode path of admin config associated with clusterAdminType. | ||
*/ | ||
private void uploadPartitionOverride(Map<String, Map<String, String>> partitionOverrideInfos) { | ||
private void uploadClusterAdminInfos(Map<String, Map<String, Map<String, String>>> adminInfosByDc, | ||
String clusterAdminType, String adminConfigZNodePath) { | ||
Properties storeProps = new Properties(); | ||
storeProps.setProperty("helix.property.store.root.path", "/" + clusterName); | ||
storeProps.setProperty("helix.property.store.root.path", | ||
"/" + clusterName + "/" + ClusterMapUtils.PROPERTYSTORE_STR); | ||
HelixPropertyStoreConfig propertyStoreConfig = new HelixPropertyStoreConfig(new VerifiableProperties(storeProps)); | ||
info("Setting partition override for specified datacenters."); | ||
for (Map.Entry<String, ClusterMapUtils.DcZkInfo> entry : dataCenterToZkAddress.entrySet()) { | ||
info("Setting partition override for {}.", entry.getKey()); | ||
info("Uploading {} infos for datacenter {}.", clusterAdminType, entry.getKey()); | ||
HelixPropertyStore<ZNRecord> helixPropertyStore = | ||
CommonUtils.createHelixPropertyStore(entry.getValue().getZkConnectStr(), propertyStoreConfig, null); | ||
ZNRecord znRecord = new ZNRecord(ClusterMapUtils.ZNODE_NAME); | ||
znRecord.setMapFields(partitionOverrideInfos); | ||
String path = ClusterMapUtils.PROPERTYSTORE_ZNODE_PATH; | ||
if (!helixPropertyStore.set(path, znRecord, AccessOption.PERSISTENT)) { | ||
info("Failed to upload partition override for datacenter {}", entry.getKey()); | ||
ZNRecord znRecord = new ZNRecord(clusterAdminType); | ||
znRecord.setMapFields(adminInfosByDc.get(entry.getKey())); | ||
if (!helixPropertyStore.set(adminConfigZNodePath, znRecord, AccessOption.PERSISTENT)) { | ||
info("Failed to upload {} infos for datacenter {}", clusterAdminType, entry.getKey()); | ||
} | ||
} | ||
} | ||
|
@@ -582,8 +682,16 @@ private void addUpdateResources(String dcName, Map<String, Set<String>> partitio | |
// not the toString() method for the enum as that is what Helix uses). | ||
resourceIs.setReplicas(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name()); | ||
resourceModified = true; | ||
// TODO when move replica feature is ready, the infos of new added replicas should be uploaded to PropertyStore | ||
} | ||
} | ||
// update state model def if necessary | ||
if (!resourceIs.getStateModelDefRef().equals(stateModelDef)) { | ||
info("Resource {} has different state model {}. Updating it with {}", resourceName, | ||
resourceIs.getStateModelDefRef(), stateModelDef); | ||
resourceIs.setStateModelDefRef(stateModelDef); | ||
resourceModified = true; | ||
} | ||
resourceIs.setNumPartitions(resourceIs.getPartitionSet().size()); | ||
if (resourceModified) { | ||
if (resourceIs.getPartitionSet().isEmpty()) { | ||
|
@@ -609,7 +717,7 @@ private void addUpdateResources(String dcName, Map<String, Set<String>> partitio | |
List<Map.Entry<String, Set<String>>> partitionsUnderNextResource = newPartitions.subList(fromIndex, toIndex); | ||
fromIndex = toIndex; | ||
IdealState idealState = new IdealState(resourceName); | ||
idealState.setStateModelDefRef(LeaderStandbySMD.name); | ||
idealState.setStateModelDefRef(stateModelDef); | ||
info("Adding partitions for next resource in {}", dcName); | ||
for (Map.Entry<String, Set<String>> entry : partitionsUnderNextResource) { | ||
String partitionName = entry.getKey(); | ||
|
@@ -961,7 +1069,7 @@ private void verifyResourcesAndPartitionEquivalencyInDc(Datacenter dc, String cl | |
continue; | ||
} | ||
IdealState resourceIS = admin.getResourceIdealState(clusterName, resourceName); | ||
ensureOrThrow(resourceIS.getStateModelDefRef().equals(LeaderStandbySMD.name), | ||
ensureOrThrow(resourceIS.getStateModelDefRef().equals(stateModelDef), | ||
"StateModel name mismatch for resource " + resourceName); | ||
Set<String> resourcePartitions = resourceIS.getPartitionSet(); | ||
for (String resourcePartition : resourcePartitions) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you add some comments to these three znode path, to describe what are the purpose of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, comments are added