Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload new replicas infos onto Helix PropertyStore #1249

Merged
merged 5 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public void createHelixPropertyStoreTest() throws IOException {

// Ensure the HelixPropertyStore works correctly
List<String> list = Arrays.asList("first", "second", "third");
String path = propertyStoreConfig.rootPath + ClusterMapUtils.PROPERTYSTORE_ZNODE_PATH;
ZNRecord znRecord = new ZNRecord(ClusterMapUtils.ZNODE_NAME);
String path = propertyStoreConfig.rootPath + ClusterMapUtils.PROPERTYSTORE_ZNODE_PATH
+ ClusterMapUtils.PARTITION_OVERRIDE_STR;
ZNRecord znRecord = new ZNRecord(ClusterMapUtils.PARTITION_OVERRIDE_STR);
znRecord.setListField("AmbryList", list);
if (!propertyStore.set(path, znRecord, AccessOption.PERSISTENT)) {
fail("Failed to set HelixPropertyStore");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,19 @@
public class ClusterMapUtils {
// datacenterId == UNKNOWN_DATACENTER_ID indicate datacenterId is not available at the time when this blobId is formed.
public static final byte UNKNOWN_DATACENTER_ID = -1;
public static final String ZNODE_NAME = "PartitionOverride";
public static final String ZNODE_PATH = "/ClusterConfigs/" + ZNODE_NAME;
public static final String PROPERTYSTORE_ZNODE_PATH = "/PROPERTYSTORE/ClusterConfigs/" + ZNODE_NAME;
public static final String PARTITION_OVERRIDE_STR = "PartitionOverride";
public static final String REPLICA_ADDITION_STR = "ReplicaAddition";
public static final String PARTITION_OVERRIDE_ZNODE_PATH = "/AdminConfigs/" + PARTITION_OVERRIDE_STR;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add some comments to these three znode path, to describe what are the purpose of them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, comments are added

public static final String REPLICA_ADDITION_ZNODE_PATH = "/AdminConfigs/" + REPLICA_ADDITION_STR;
public static final String PROPERTYSTORE_ZNODE_PATH = "/PROPERTYSTORE/AdminConfigs/";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the purpose of having this znode when we already have PARTITION_OVERRIDE_ZNODE_PATH?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point, I removed PARTITION_OVERRIDE_ZNODE_PATH

static final String DISK_CAPACITY_STR = "capacityInBytes";
static final String DISK_STATE = "diskState";
static final String PARTITION_STATE = "state";
static final String PARTITION_CLASS_STR = "partitionClass";
static final String REPLICAS_STR = "Replicas";
static final String REPLICAS_DELIM_STR = ",";
static final String REPLICAS_STR_SEPARATOR = ":";
static final String REPLICAS_CAPACITY_STR = "replicaCapacityInBytes";
static final String SSLPORT_STR = "sslPort";
static final String RACKID_STR = "rackId";
static final String SEALED_STR = "SEALED";
Expand Down Expand Up @@ -432,7 +436,7 @@ PartitionId getRandomWritablePartition(String partitionClass, List<PartitionId>
if (partitionsToExclude == null || partitionsToExclude.size() == 0 || !partitionsToExclude.contains(selected)) {
if (selected.getPartitionState() == PartitionState.READ_WRITE) {
anyWritablePartition = selected;
if(areEnoughReplicasForPartitionUp(selected)) {
if (areEnoughReplicasForPartitionUp(selected)) {
return selected;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,31 +185,29 @@ static void bootstrapOrUpgrade(String hardwareLayoutPath, String partitionLayout
}

/**
* Takes in the path to the files that make up the static cluster map and uploads the cluster configs(such as partition
* override) to HelixPropertyStore in zookeeper.
* Takes in the path to the files that make up the static cluster map and uploads the cluster admin configs (such as
* partition override, replica addition) to HelixPropertyStore in zookeeper.
* @param hardwareLayoutPath the path to the hardware layout file.
* @param partitionLayoutPath the path to the partition layout file.
* @param zkLayoutPath the path to the zookeeper layout file.
* @param clusterNamePrefix the prefix that when combined with the cluster name in the static cluster map files
* will give the cluster name in Helix to bootstrap or upgrade.
* @param dcs the comma-separated list of datacenters that needs to be upgraded/bootstrapped.
* @param dcs the comma-separated list of data centers that needs to be upgraded/bootstrapped.
* @param maxPartitionsInOneResource the maximum number of Ambry partitions to group under a single Helix resource.
* @param helixAdminFactory the {@link HelixAdminFactory} to use to instantiate {@link HelixAdmin}
* @param stateModelDef the state model definition to use in Ambry cluster.
* @param adminTypes types of admin operation that requires to generate config and upload it to Helix PropertyStore.
* @throws IOException if there is an error reading a file.
* @throws JSONException if there is an error parsing the JSON content in any of the files.
*/
static void uploadClusterConfigs(String hardwareLayoutPath, String partitionLayoutPath, String zkLayoutPath,
static void uploadClusterAdminConfigs(String hardwareLayoutPath, String partitionLayoutPath, String zkLayoutPath,
String clusterNamePrefix, String dcs, int maxPartitionsInOneResource, HelixAdminFactory helixAdminFactory,
String stateModelDef) throws Exception {
String[] adminTypes) throws Exception {
HelixBootstrapUpgradeUtil clusterMapToHelixMapper =
new HelixBootstrapUpgradeUtil(hardwareLayoutPath, partitionLayoutPath, zkLayoutPath, clusterNamePrefix, dcs,
maxPartitionsInOneResource, false, false, helixAdminFactory, stateModelDef);
Map<String, Map<String, String>> partitionOverrideInfos =
clusterMapToHelixMapper.generatePartitionOverrideFromAllDCs();
info("Uploading partition override to HelixPropertyStore based on override json file.");
clusterMapToHelixMapper.uploadPartitionOverride(partitionOverrideInfos);
info("Upload cluster configs completed.");
maxPartitionsInOneResource, false, false, helixAdminFactory, ClusterMapConfig.DEFAULT_STATE_MODEL_DEF);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it's leaderstandby model? thought it should be the offline/boostrap/leader/standby?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will make it configurable. For now, we are still using default LeaderStandby model. To use new state model, we first need to create new model on Helix and make all participants register the new model.

for (String adminType : adminTypes) {
generateAdminInfosAndUpload(clusterMapToHelixMapper, adminType);
}
}

/**
Expand Down Expand Up @@ -268,6 +266,32 @@ static void dropCluster(String zkLayoutPath, String clusterName, String dcs, Hel
}
}

/**
* Generate cluster admin configs based on admin type and upload them to Helix PropertyStore
* @param clusterMapToHelixMapper {@link HelixBootstrapUpgradeUtil} to use.
* @param adminType the type of admin operation.
*/
private static void generateAdminInfosAndUpload(HelixBootstrapUpgradeUtil clusterMapToHelixMapper, String adminType) {
switch (adminType) {
case ClusterMapUtils.PARTITION_OVERRIDE_STR:
Map<String, Map<String, Map<String, String>>> partitionOverrideInfosByDc =
clusterMapToHelixMapper.generatePartitionOverrideFromAllDCs();
clusterMapToHelixMapper.uploadClusterAdminInfos(partitionOverrideInfosByDc,
ClusterMapUtils.PARTITION_OVERRIDE_STR);
break;
case ClusterMapUtils.REPLICA_ADDITION_STR:
Map<String, Map<String, Map<String, String>>> replicaAdditionInfosByDc =
clusterMapToHelixMapper.generateReplicaAdditionMap();
clusterMapToHelixMapper.uploadClusterAdminInfos(replicaAdditionInfosByDc, ClusterMapUtils.REPLICA_ADDITION_STR);
break;
default:
throw new IllegalArgumentException(
"Unrecognized admin config type: " + adminType + ". Current supported types are: "
+ ClusterMapUtils.PARTITION_OVERRIDE_STR + ", " + ClusterMapUtils.REPLICA_ADDITION_STR);
}
info("Upload cluster configs completed.");
}

/**
* Instantiates this class with the given information.
* @param hardwareLayoutPath the path to the hardware layout file.
Expand Down Expand Up @@ -324,7 +348,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay

/**
* Generate the partition override map containing partition state from all datacenters.
* @return the constructed partitionOverrideInfos. The format is as follows.
* @return the constructed partitionOverrideInfos by dc. The format is as follows.
*
* "mapFields": {
* "0": {
Expand All @@ -337,7 +361,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay
* }
* }
*/
private Map<String, Map<String, String>> generatePartitionOverrideFromAllDCs() {
private Map<String, Map<String, Map<String, String>>> generatePartitionOverrideFromAllDCs() {
Map<String, Map<String, String>> partitionOverrideInfos = new HashMap<>();
for (PartitionId partitionId : staticClusterMap.getAllPartitionIds(null)) {
String partitionName = partitionId.toPathString();
Expand All @@ -347,37 +371,111 @@ private Map<String, Map<String, String>> generatePartitionOverrideFromAllDCs() {
: ClusterMapUtils.READ_ONLY_STR);
partitionOverrideInfos.put(partitionName, partitionProperties);
}
return partitionOverrideInfos;
Map<String, Map<String, Map<String, String>>> partitionOverrideByDc = new HashMap<>();
for (String dc : dataCenterToZkAddress.keySet()) {
partitionOverrideByDc.put(dc, partitionOverrideInfos);
}
return partitionOverrideByDc;
}

/**
* Uploads the seal state of all partitions in the format of map.
* @param partitionOverrideInfos the override information for each partition. The current format is as follows.
*
* Generate replica addition infos map grouped by each dc. This map contains detailed replica info (size, mount path, etc)
* that will be used by certain server to instantiate new added Ambry replica. The format is as follows.
* "mapFields": {
* "0": {
* "state": "RW"
* },
* "1": {
* "state": "RO"
* }
* "1": {
* "replicaCapacityInBytes": 107374182400,
* "partitionClass": "max-replicas-all-datacenters",
* "localhost1_17088": "/tmp/c/1",
* "localhost2_17088": "/tmp/d/1"
* },
* "2": {
* "replicaCapacityInBytes": 107374182400,
* "partitionClass": "max-replicas-all-datacenters",
* "localhost3_17088": "/tmp/e/1"
* }
* }
*
* In above example, two new replicas of partition[1] will be added to localhost1 and localhost2 respectively.
* The host name is followed by mount path on which the new replica should be placed.
* @return a map that contains detailed replica info.
*/
private Map<String, Map<String, Map<String, String>>> generateReplicaAdditionMap() {
//populate dcToInstanceNameToDataNodeId and instanceToDiskReplicasMap
populateInstancesAndPartitionsMap();
Map<String, Map<String, Map<String, String>>> newAddedReplicasByDc = new HashMap<>();
for (Map.Entry<String, HelixAdmin> entry : adminForDc.entrySet()) {
HelixAdmin dcAdmin = entry.getValue();
String dcName = entry.getKey();
info("Generating replica addition map for datacenter {}", dcName);
Map<String, Map<String, Replica>> partitionToInstancesAndReplicas = new HashMap<>();
Map<String, Map<String, String>> newAddedReplicasInDc = new HashMap<>();
for (String instanceName : dcToInstanceNameToDataNodeId.get(dcName).keySet()) {
Map<DiskId, SortedSet<Replica>> diskToReplica = instanceToDiskReplicasMap.get(instanceName);
for (SortedSet<Replica> replicas : diskToReplica.values()) {
for (Replica replica : replicas) {
partitionToInstancesAndReplicas.computeIfAbsent(replica.getPartitionId().toPathString(),
key -> new HashMap<>()).put(instanceName, replica);
}
}
}
List<String> resourcesInCluster = dcAdmin.getResourcesInCluster(clusterName);
for (String resourceName : resourcesInCluster) {
if (!resourceName.matches("\\d+")) {
continue;
}
IdealState idealState = dcAdmin.getResourceIdealState(clusterName, resourceName);
for (String partitionStr : new HashSet<>(idealState.getPartitionSet())) {
Set<String> instanceSetInHelix = idealState.getInstanceSet(partitionStr);
Map<String, Replica> instanceAndReplicaInStatic = partitionToInstancesAndReplicas.get(partitionStr);
if (instanceAndReplicaInStatic == null || instanceAndReplicaInStatic.isEmpty()) {
info(
"*** Partition {} no longer present in the static clustermap. Uploading cluster admin infos operation won't remove it *** ",
partitionStr);
} else if (!instanceAndReplicaInStatic.keySet().equals(instanceSetInHelix)) {
info(
"Different instance sets for partition {} under resource {}. Extracting new replicas from static clustermap.",
partitionStr, resourceName);
// instances in static only
Set<String> instanceSetInStatic = instanceAndReplicaInStatic.keySet();
instanceSetInStatic.removeAll(instanceSetInHelix);
for (String instance : instanceSetInStatic) {
Replica replica = instanceAndReplicaInStatic.get(instance);
info("New replica of partition[{}] will be added to instance {} on {}", partitionStr, instance,
replica.getMountPath());
newAddedReplicasInDc.computeIfAbsent(partitionStr, key -> {
Map<String, String> partitionMap = new HashMap<>();
partitionMap.put(ClusterMapUtils.PARTITION_CLASS_STR, replica.getPartitionId().getPartitionClass());
partitionMap.put(ClusterMapUtils.REPLICAS_CAPACITY_STR, String.valueOf(replica.getCapacityInBytes()));
return partitionMap;
}).put(instance, replica.getMountPath());
}
}
}
}
newAddedReplicasByDc.put(dcName, newAddedReplicasInDc);
}
return newAddedReplicasByDc;
}

/**
* Uploads cluster config infos onto Helix PropertyStore.
* @param adminInfosByDc the cluster admin information (overridden partitions, added replicas) grouped by DC that would
* be applied to cluster.
* @param clusterAdminType the type of cluster admin that would be uploaded (i.e. PartitionOverride, ReplicaAddition)
*/
private void uploadPartitionOverride(Map<String, Map<String, String>> partitionOverrideInfos) {
private void uploadClusterAdminInfos(Map<String, Map<String, Map<String, String>>> adminInfosByDc,
String clusterAdminType) {
Properties storeProps = new Properties();
storeProps.setProperty("helix.property.store.root.path", "/" + clusterName);
HelixPropertyStoreConfig propertyStoreConfig = new HelixPropertyStoreConfig(new VerifiableProperties(storeProps));
info("Setting partition override for specified datacenters.");
for (Map.Entry<String, ClusterMapUtils.DcZkInfo> entry : dataCenterToZkAddress.entrySet()) {
info("Setting partition override for {}.", entry.getKey());
info("Uploading {} infos for datacenter {}.", clusterAdminType, entry.getKey());
HelixPropertyStore<ZNRecord> helixPropertyStore =
CommonUtils.createHelixPropertyStore(entry.getValue().getZkConnectStr(), propertyStoreConfig, null);
ZNRecord znRecord = new ZNRecord(ClusterMapUtils.ZNODE_NAME);
znRecord.setMapFields(partitionOverrideInfos);
String path = ClusterMapUtils.PROPERTYSTORE_ZNODE_PATH;
ZNRecord znRecord = new ZNRecord(clusterAdminType);
znRecord.setMapFields(adminInfosByDc.get(entry.getKey()));
String path = ClusterMapUtils.PROPERTYSTORE_ZNODE_PATH + clusterAdminType;
if (!helixPropertyStore.set(path, znRecord, AccessOption.PERSISTENT)) {
info("Failed to upload partition override for datacenter {}", entry.getKey());
info("Failed to upload {} infos for datacenter {}", clusterAdminType, entry.getKey());
}
}
}
Expand Down Expand Up @@ -582,8 +680,16 @@ private void addUpdateResources(String dcName, Map<String, Set<String>> partitio
// not the toString() method for the enum as that is what Helix uses).
resourceIs.setReplicas(ResourceConfig.ResourceConfigConstants.ANY_LIVEINSTANCE.name());
resourceModified = true;
// TODO when move replica feature is ready, the infos of new added replicas should be uploaded to PropertyStore
}
}
// update state model def if necessary
if (!resourceIs.getStateModelDefRef().equals(stateModelDef)) {
info("Resource {} has different state model {}. Updating it with {}", resourceName,
resourceIs.getStateModelDefRef(), stateModelDef);
resourceIs.setStateModelDefRef(stateModelDef);
resourceModified = true;
}
resourceIs.setNumPartitions(resourceIs.getPartitionSet().size());
if (resourceModified) {
if (resourceIs.getPartitionSet().isEmpty()) {
Expand All @@ -609,7 +715,7 @@ private void addUpdateResources(String dcName, Map<String, Set<String>> partitio
List<Map.Entry<String, Set<String>>> partitionsUnderNextResource = newPartitions.subList(fromIndex, toIndex);
fromIndex = toIndex;
IdealState idealState = new IdealState(resourceName);
idealState.setStateModelDefRef(LeaderStandbySMD.name);
idealState.setStateModelDefRef(stateModelDef);
info("Adding partitions for next resource in {}", dcName);
for (Map.Entry<String, Set<String>> entry : partitionsUnderNextResource) {
String partitionName = entry.getKey();
Expand Down Expand Up @@ -961,7 +1067,7 @@ private void verifyResourcesAndPartitionEquivalencyInDc(Datacenter dc, String cl
continue;
}
IdealState resourceIS = admin.getResourceIdealState(clusterName, resourceName);
ensureOrThrow(resourceIS.getStateModelDefRef().equals(LeaderStandbySMD.name),
ensureOrThrow(resourceIS.getStateModelDefRef().equals(stateModelDef),
"StateModel name mismatch for resource " + resourceName);
Set<String> resourcePartitions = resourceIS.getPartitionSet();
for (String resourcePartition : resourcePartitions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ public void handleDataDeleted(String dataPath) {
}
};
logger.info("Subscribing data listener to HelixPropertyStore.");
helixPropertyStore.subscribeDataChanges(ClusterMapUtils.ZNODE_PATH, dataListener);
helixPropertyStore.subscribeDataChanges(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, dataListener);
logger.info("Getting ZNRecord from HelixPropertyStore");
ZNRecord zNRecord = helixPropertyStore.get(ClusterMapUtils.ZNODE_PATH, null, AccessOption.PERSISTENT);
ZNRecord zNRecord =
helixPropertyStore.get(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH, null, AccessOption.PERSISTENT);
if (clusterMapConfig.clusterMapEnablePartitionOverride) {
if (zNRecord != null) {
partitionOverrideInfoMap.putAll(zNRecord.getMapFields());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo
partitionOverrideMap.computeIfAbsent(String.valueOf(i), k -> new HashMap<>())
.put(ClusterMapUtils.PARTITION_STATE, ClusterMapUtils.READ_ONLY_STR);
}
znRecord = new ZNRecord(ClusterMapUtils.ZNODE_NAME);
znRecord = new ZNRecord(ClusterMapUtils.PARTITION_OVERRIDE_STR);
znRecord.setMapFields(partitionOverrideMap);

helixCluster =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ record = znRecord;
this.beBadException = beBadException;

MockitoAnnotations.initMocks(this);
Mockito.when(helixPropertyStore.get(eq(ClusterMapUtils.ZNODE_PATH), eq(null), eq(AccessOption.PERSISTENT)))
.thenReturn(record);
Mockito.when(helixPropertyStore.get(eq(ClusterMapUtils.PARTITION_OVERRIDE_ZNODE_PATH), eq(null),
eq(AccessOption.PERSISTENT))).thenReturn(record);
}

@Override
Expand Down
Loading