Skip to content

Commit

Permalink
address yingyi comment
Browse files Browse the repository at this point in the history
  • Loading branch information
zzmao committed Jun 3, 2019
1 parent 37024b8 commit 6507055
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 LinkedIn Corp. All rights reserved.
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@ public class HelixVcrPopulateToolTest {
private static final String SRC_ZK_CONNECT_STRING = SRC_ZK_SERVER_HOSTNAME + ":" + SRC_ZK_SERVER_PORT;
private static TestUtils.ZkInfo srcZkInfo;
private static final String SRC_CLUSTER_NAME = "srcCluster";
private static HelixAdmin srcHelixAdmin;

@BeforeClass
public static void beforeClass() throws Exception {
Expand All @@ -51,7 +52,7 @@ public static void beforeClass() throws Exception {
zkClient.setZkSerializer(new ZNRecordSerializer());
ClusterSetup clusterSetup = new ClusterSetup(zkClient);
clusterSetup.addCluster(SRC_CLUSTER_NAME, true);
HelixAdmin admin = new HelixAdminFactory().getHelixAdmin(SRC_ZK_CONNECT_STRING);
srcHelixAdmin = new HelixAdminFactory().getHelixAdmin(SRC_ZK_CONNECT_STRING);

String resourceName = "1";
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName);
Expand All @@ -61,7 +62,7 @@ public static void beforeClass() throws Exception {
}
builder.setRebalanceStrategy(CrushRebalanceStrategy.class.getName());
IdealState idealState = builder.build();
admin.addResource(SRC_CLUSTER_NAME, resourceName, idealState);
srcHelixAdmin.addResource(SRC_CLUSTER_NAME, resourceName, idealState);
}

@AfterClass
Expand All @@ -85,6 +86,24 @@ public void testCreateAndUpdateCluster() throws Exception {
destZkServerPort, true);
HelixVcrPopulateTool.createCluster(destZkConnectString, destVcrClusterName);

HelixVcrPopulateTool.updateResourceAndPartition(SRC_ZK_CONNECT_STRING, SRC_CLUSTER_NAME, destZkConnectString,
destVcrClusterName, false);
Assert.assertTrue("Dest and Src should be same",
isSrcDestSync(SRC_ZK_CONNECT_STRING, SRC_CLUSTER_NAME, destZkConnectString, destVcrClusterName));

// add one more partition to src cluster resource 1 and add one more resource to src cluster
srcHelixAdmin.dropResource(SRC_CLUSTER_NAME, "1");
String[] resourceNames = {"1", "2"};
for (String resourceName : resourceNames) {
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName);
builder.setStateModel(LeaderStandbySMD.name);
for (int i = 0; i < 101; i++) {
builder.add(Integer.toString(i));
}
builder.setRebalanceStrategy(CrushRebalanceStrategy.class.getName());
srcHelixAdmin.addResource(SRC_CLUSTER_NAME, resourceName, builder.build());
}

HelixVcrPopulateTool.updateResourceAndPartition(SRC_ZK_CONNECT_STRING, SRC_CLUSTER_NAME, destZkConnectString,
destVcrClusterName, false);
Assert.assertTrue("Dest and Src should be same",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 LinkedIn Corp. All rights reserved.
* Copyright 2019 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,10 +48,10 @@ public class HelixVcrPopulateTool {
private static String SEPARATOR = "/";
static List<String> ignoreResourceKeyWords = Arrays.asList("aggregation", "trigger", "stats");

public static void main(String args[]) {
public static void main(String[] args) {
OptionParser parser = new OptionParser();
OptionSpec createClusterOpt = parser.accepts("createCluster",
"Create resources in dest by copying from src to dest. --createCluster --dest destZkEndpoint/destClusterName");
"Create resources in dest cluster. --createCluster --dest destZkEndpoint/destClusterName");
OptionSpec updateClusterOpt = parser.accepts("updateCluster",
"Update resources in dest by copying from src to dest. --updateCluster"
+ " --src srcZkEndpoint/srcClusterName --dest destZkEndpoint/destClusterName");
Expand All @@ -77,9 +77,9 @@ public static void main(String args[]) {
}

if (options.has(updateClusterOpt)) {
System.out.println("Updating cluster: " + destClusterName);
String srcZkString = options.valueOf(srcOpt).split(SEPARATOR)[0];
String srcClusterName = options.valueOf(srcOpt).split(SEPARATOR)[1];
System.out.println("Updating cluster: " + destClusterName + "by checking " + srcClusterName);
boolean dryRun = options.has(dryRunOpt);
updateResourceAndPartition(srcZkString, srcClusterName, destZkString, destClusterName, dryRun);
}
Expand All @@ -90,16 +90,15 @@ public static void main(String args[]) {
* Create a helix cluster with given information.
* @param destZkString the cluster's zk string
* @param destClusterName the cluster's name
* @return false if cluster already exist, otherwise true.
*/
static boolean createCluster(String destZkString, String destClusterName) {
static void createCluster(String destZkString, String destClusterName) {
HelixZkClient destZkClient =
DedicatedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(destZkString));
destZkClient.setZkSerializer(new ZNRecordSerializer());
HelixAdmin destAdmin = new ZKHelixAdmin(destZkClient);
if (destAdmin.getClusters().contains(destClusterName)) {
System.out.println("Create cluster failed. " + destClusterName + " already exist.");
return false;
System.out.println("Failed to create cluster becuase " + destClusterName + " already exist.");
return;
}
ClusterSetup clusterSetup = new ClusterSetup(destZkString);
clusterSetup.addCluster(destClusterName, true);
Expand All @@ -115,16 +114,18 @@ static boolean createCluster(String destZkString, String destClusterName) {
ClusterConfig clusterConfig = configAccessor.getClusterConfig(destClusterName);
clusterConfig.setPersistBestPossibleAssignment(true);
configAccessor.setClusterConfig(destClusterName, clusterConfig);
System.out.println("Cluster " + destClusterName + " create done.");
return true;
System.out.println("Cluster " + destClusterName + " is created successfully!");
return;
}

/**
* Update dest cluster information based on src cluster. Dest cluster resource will be recreated if src cluster has any change.
* Update dest cluster information based on src cluster.
* Dest cluster resource will be recreated if it mismatches that in src cluster.
* @param srcZkString the src cluster's zk string
* @param srcClusterName the src cluster's name
* @param destZkString the dest cluster's zk string
* @param destClusterName the dest cluster's name
* @param dryRun run the update process but without actual change.
*/
static void updateResourceAndPartition(String srcZkString, String srcClusterName, String destZkString,
String destClusterName, boolean dryRun) {
Expand All @@ -144,19 +145,23 @@ static void updateResourceAndPartition(String srcZkString, String srcClusterName
// check if every partition exist.
Set<String> srcPartitions = srcAdmin.getResourceIdealState(srcClusterName, resource).getPartitionSet();
Set<String> destPartitions = destAdmin.getResourceIdealState(destClusterName, resource).getPartitionSet();
for (String partition : srcPartitions) {
if (!destPartitions.contains(partition)) {
if (dryRun) {
System.out.println("DryRun: Drop Resource " + resource);
} else {
// This resource need to be recreate.
destAdmin.dropResource(destClusterName, resource);
System.out.println("Dropped Resource " + resource);
if (srcPartitions.size() != destPartitions.size()) {
createNewResource = true;
} else {
for (String partition : srcPartitions) {
if (!destPartitions.contains(partition)) {
createNewResource = true;
break;
}
}
}
if (dryRun) {
System.out.println("DryRun: Drop Resource " + resource);
} else {
// This resource need to be recreate.
destAdmin.dropResource(destClusterName, resource);
System.out.println("Dropped Resource " + resource);
}
} else {
createNewResource = true;
}
Expand All @@ -179,7 +184,7 @@ static void updateResourceAndPartition(String srcZkString, String srcClusterName
}
}
}
System.out.println("Cluster " + destClusterName + " update done.");
System.out.println("Cluster " + destClusterName + " is updated successfully!");
}
}

0 comments on commit 6507055

Please sign in to comment.