Skip to content

Commit

Permalink
Added HelixPopulateTool and tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzmao committed May 31, 2019
1 parent 5694e12 commit 37024b8
Show file tree
Hide file tree
Showing 3 changed files with 312 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public boolean uploadBlob(BlobId blobId, long inputLength, CloudBlobMetadata clo

Objects.requireNonNull(blobId, "BlobId cannot be null");
Objects.requireNonNull(blobInputStream, "Input stream cannot be null");

azureMetrics.blobUploadRate.mark();
azureMetrics.blobUploadRequestCount.inc();
try {
boolean uploaded = uploadIfNotExists(blobId, inputLength, cloudBlobMetadata, blobInputStream);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2017 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/

package com.github.ambry.clustermap;

import com.github.ambry.utils.TestUtils;
import java.util.HashSet;
import java.util.Set;
import org.apache.helix.HelixAdmin;
import org.apache.helix.controller.rebalancer.strategy.CrushRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.manager.zk.client.SharedZkClientFactory;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.tools.ClusterSetup;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;


public class HelixVcrPopulateToolTest {

private static final String SRC_ZK_SERVER_HOSTNAME = "localhost";
private static final int SRC_ZK_SERVER_PORT = 31900;
private static final String SRC_ZK_CONNECT_STRING = SRC_ZK_SERVER_HOSTNAME + ":" + SRC_ZK_SERVER_PORT;
private static TestUtils.ZkInfo srcZkInfo;
private static final String SRC_CLUSTER_NAME = "srcCluster";

@BeforeClass
public static void beforeClass() throws Exception {
srcZkInfo = new com.github.ambry.utils.TestUtils.ZkInfo(TestUtils.getTempDir("helixVcr"), "DC1", (byte) 1,
SRC_ZK_SERVER_PORT, true);

HelixZkClient zkClient =
SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(SRC_ZK_CONNECT_STRING));
zkClient.setZkSerializer(new ZNRecordSerializer());
ClusterSetup clusterSetup = new ClusterSetup(zkClient);
clusterSetup.addCluster(SRC_CLUSTER_NAME, true);
HelixAdmin admin = new HelixAdminFactory().getHelixAdmin(SRC_ZK_CONNECT_STRING);

String resourceName = "1";
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName);
builder.setStateModel(LeaderStandbySMD.name);
for (int i = 0; i < 100; i++) {
builder.add(Integer.toString(i));
}
builder.setRebalanceStrategy(CrushRebalanceStrategy.class.getName());
IdealState idealState = builder.build();
admin.addResource(SRC_CLUSTER_NAME, resourceName, idealState);
}

@AfterClass
public static void afterClass() {
srcZkInfo.shutdown();
}

/**
* Test {@link HelixVcrPopulateTool#createCluster(String, String)} and
* {@link HelixVcrPopulateTool#updateResourceAndPartition(String, String, String, String, boolean)} method.
*/
@Test
public void testCreateAndUpdateCluster() throws Exception {
String destZkHostName = "localhost";
int destZkServerPort = SRC_ZK_SERVER_PORT + 1;
String destZkConnectString = destZkHostName + ":" + destZkServerPort;
String destVcrClusterName = "DEST_VCR_CLUSTER1";
// set up dest zk
TestUtils.ZkInfo destZkInfo =
new com.github.ambry.utils.TestUtils.ZkInfo(TestUtils.getTempDir("helixDestVcr"), "DC1", (byte) 1,
destZkServerPort, true);
HelixVcrPopulateTool.createCluster(destZkConnectString, destVcrClusterName);

HelixVcrPopulateTool.updateResourceAndPartition(SRC_ZK_CONNECT_STRING, SRC_CLUSTER_NAME, destZkConnectString,
destVcrClusterName, false);
Assert.assertTrue("Dest and Src should be same",
isSrcDestSync(SRC_ZK_CONNECT_STRING, SRC_CLUSTER_NAME, destZkConnectString, destVcrClusterName));
destZkInfo.shutdown();
}

/**
* A method to verify resources and partitions in src cluster and dest cluster are same.
*/
private boolean isSrcDestSync(String srcZkString, String srcClusterName, String destZkString,
String destClusterName) {

HelixAdmin srcAdmin = new ZKHelixAdmin(srcZkString);
Set<String> srcResources = new HashSet<>(srcAdmin.getResourcesInCluster(srcClusterName));
HelixAdmin destAdmin = new ZKHelixAdmin(destZkString);
Set<String> destResources = new HashSet<>(destAdmin.getResourcesInCluster(destClusterName));

for (String resource : srcResources) {
if (HelixVcrPopulateTool.ignoreResourceKeyWords.stream().anyMatch(resource::contains)) {
System.out.println("Resource " + resource + " from src cluster is ignored");
continue;
}
if (destResources.contains(resource)) {
// check if every partition exist.
Set<String> srcPartitions = srcAdmin.getResourceIdealState(srcClusterName, resource).getPartitionSet();
Set<String> destPartitions = destAdmin.getResourceIdealState(destClusterName, resource).getPartitionSet();
for (String partition : srcPartitions) {
if (!destPartitions.contains(partition)) {
return false;
}
}
} else {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright 2017 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.client.DedicatedZkClientFactory;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LeaderStandbySMD;
import org.apache.helix.model.builder.FullAutoModeISBuilder;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;


/**
* This tool provides function to create vcr cluster and update vcr cluster by referencing src cluster.
*/
public class HelixVcrPopulateTool {

private static String SEPARATOR = "/";
static List<String> ignoreResourceKeyWords = Arrays.asList("aggregation", "trigger", "stats");

public static void main(String args[]) {
OptionParser parser = new OptionParser();
OptionSpec createClusterOpt = parser.accepts("createCluster",
"Create resources in dest by copying from src to dest. --createCluster --dest destZkEndpoint/destClusterName");
OptionSpec updateClusterOpt = parser.accepts("updateCluster",
"Update resources in dest by copying from src to dest. --updateCluster"
+ " --src srcZkEndpoint/srcClusterName --dest destZkEndpoint/destClusterName");
OptionSpec dryRunOpt = parser.accepts("dryRun", "Do dry run.");

ArgumentAcceptingOptionSpec<String> srcOpt =
parser.accepts("src").withRequiredArg().describedAs("src zk and cluster name").ofType(String.class);
ArgumentAcceptingOptionSpec<String> destOpt =
parser.accepts("dest").withRequiredArg().describedAs("dest zk and cluster name").ofType(String.class);

OptionSet options = parser.parse(args);

String destZkString = options.valueOf(destOpt).split(SEPARATOR)[0];
String destClusterName = options.valueOf(destOpt).split(SEPARATOR)[1];
if (!destClusterName.contains("VCR")) {
System.out.println("dest should be a VCR cluster.(VCR string should be included)");
return;
}

if (options.has(createClusterOpt)) {
System.out.println("Creating cluster: " + destClusterName);
createCluster(destZkString, destClusterName);
}

if (options.has(updateClusterOpt)) {
System.out.println("Updating cluster: " + destClusterName);
String srcZkString = options.valueOf(srcOpt).split(SEPARATOR)[0];
String srcClusterName = options.valueOf(srcOpt).split(SEPARATOR)[1];
boolean dryRun = options.has(dryRunOpt);
updateResourceAndPartition(srcZkString, srcClusterName, destZkString, destClusterName, dryRun);
}
System.out.println("Done.");
}

/**
* Create a helix cluster with given information.
* @param destZkString the cluster's zk string
* @param destClusterName the cluster's name
* @return false if cluster already exist, otherwise true.
*/
static boolean createCluster(String destZkString, String destClusterName) {
HelixZkClient destZkClient =
DedicatedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(destZkString));
destZkClient.setZkSerializer(new ZNRecordSerializer());
HelixAdmin destAdmin = new ZKHelixAdmin(destZkClient);
if (destAdmin.getClusters().contains(destClusterName)) {
System.out.println("Create cluster failed. " + destClusterName + " already exist.");
return false;
}
ClusterSetup clusterSetup = new ClusterSetup(destZkString);
clusterSetup.addCluster(destClusterName, true);

// set ALLOW_PARTICIPANT_AUTO_JOIN
HelixConfigScope configScope = new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).
forCluster(destClusterName).build();
Map<String, String> helixClusterProperties = new HashMap<>();
helixClusterProperties.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, String.valueOf(true));
destAdmin.setConfig(configScope, helixClusterProperties);
// set PersistBestPossibleAssignment
ConfigAccessor configAccessor = new ConfigAccessor(destZkClient);
ClusterConfig clusterConfig = configAccessor.getClusterConfig(destClusterName);
clusterConfig.setPersistBestPossibleAssignment(true);
configAccessor.setClusterConfig(destClusterName, clusterConfig);
System.out.println("Cluster " + destClusterName + " create done.");
return true;
}

/**
* Update dest cluster information based on src cluster. Dest cluster resource will be recreated if src cluster has any change.
* @param srcZkString the src cluster's zk string
* @param srcClusterName the src cluster's name
* @param destZkString the dest cluster's zk string
* @param destClusterName the dest cluster's name
*/
static void updateResourceAndPartition(String srcZkString, String srcClusterName, String destZkString,
String destClusterName, boolean dryRun) {

HelixAdmin srcAdmin = new ZKHelixAdmin(srcZkString);
Set<String> srcResources = new HashSet<>(srcAdmin.getResourcesInCluster(srcClusterName));
HelixAdmin destAdmin = new ZKHelixAdmin(destZkString);
Set<String> destResources = new HashSet<>(destAdmin.getResourcesInCluster(destClusterName));

for (String resource : srcResources) {
if (ignoreResourceKeyWords.stream().anyMatch(resource::contains)) {
System.out.println("Resource " + resource + " from src cluster is ignored");
continue;
}
boolean createNewResource = false;
if (destResources.contains(resource)) {
// check if every partition exist.
Set<String> srcPartitions = srcAdmin.getResourceIdealState(srcClusterName, resource).getPartitionSet();
Set<String> destPartitions = destAdmin.getResourceIdealState(destClusterName, resource).getPartitionSet();
for (String partition : srcPartitions) {
if (!destPartitions.contains(partition)) {
if (dryRun) {
System.out.println("DryRun: Drop Resource " + resource);
} else {
// This resource need to be recreate.
destAdmin.dropResource(destClusterName, resource);
System.out.println("Dropped Resource " + resource);
createNewResource = true;
break;
}
}
}
} else {
createNewResource = true;
}
if (createNewResource) {
// add new resource
Set<String> srcPartitions = srcAdmin.getResourceIdealState(srcClusterName, resource).getPartitionSet();
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resource);
builder.setStateModel(LeaderStandbySMD.name);
for (String partition : srcPartitions) {
builder.add(partition);
}
builder.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
IdealState idealState = builder.build();
if (dryRun) {
System.out.println("DryRun: Add Resource " + resource + " with partition " + srcPartitions);
} else {
destAdmin.addResource(destClusterName, resource, idealState);
destAdmin.rebalance(destClusterName, resource, 3, "", "");
System.out.println("Added Resource " + resource + " with partition " + srcPartitions);
}
}
}
System.out.println("Cluster " + destClusterName + " update done.");
}
}

0 comments on commit 37024b8

Please sign in to comment.