Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore liveness update of current node if it is actually alive #1264

Merged
merged 3 commits into from
Sep 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
* @see <a href="http://helix.apache.org">http://helix.apache.org</a>
*/
class HelixClusterManager implements ClusterMap {
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final Logger logger = LoggerFactory.getLogger(HelixClusterManager.class);
private final String clusterName;
private final String selfInstanceName;
private final MetricRegistry metricRegistry;
Expand Down Expand Up @@ -590,7 +590,9 @@ private void updateInstanceLiveness(List<LiveInstance> liveInstances) {
liveInstancesSet.add(liveInstance.getInstanceName());
}
for (String instanceName : allInstances) {
if (liveInstancesSet.contains(instanceName)) {
// Here we ignore live instance change it's about self instance. The reason is, during server's startup, current
// node should be AVAILABLE but the list of live instances doesn't include current node since it hasn't joined yet.
if (liveInstancesSet.contains(instanceName) || instanceName.equals(selfInstanceName)) {
instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.AVAILABLE);
} else {
instanceNameToAmbryDataNode.get(instanceName).setState(HardwareState.UNAVAILABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,16 @@
@RunWith(Parameterized.class)
public class HelixClusterManagerTest {
private final HashMap<String, com.github.ambry.utils.TestUtils.ZkInfo> dcsToZkInfo = new HashMap<>();
private final String dcs[] = new String[]{"DC0", "DC1"};
private final String[] dcs = new String[]{"DC0", "DC1"};
private final TestUtils.TestHardwareLayout testHardwareLayout;
private final TestPartitionLayout testPartitionLayout;
private final String clusterNameStatic = "HelixClusterManagerTestCluster";
private final String clusterNamePrefixInHelix = "Ambry-";
private final ClusterMapConfig clusterMapConfig;
private final MockHelixCluster helixCluster;
private final String hostname;
private final int portNum;
private final String selfInstanceName;
private final String localDc;
private final String remoteDc;
private ClusterMap clusterManager;
Expand Down Expand Up @@ -182,11 +184,15 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo
}
}

hostname = "localhost";
DataNode currentNode = testHardwareLayout.getRandomDataNodeFromDc(localDc);
hostname = currentNode.getHostname();
portNum = currentNode.getPort();
selfInstanceName = getInstanceName(hostname, portNum);
Properties props = new Properties();
props.setProperty("clustermap.host.name", hostname);
props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic);
props.setProperty("clustermap.datacenter.name", localDc);
props.setProperty("clustermap.port", Integer.toString(portNum));
props.setProperty("clustermap.dcs.zk.connect.strings", zkJson.toString(2));
props.setProperty("clustermap.current.xid", Long.toString(CURRENT_XID));
props.setProperty("clustermap.enable.partition.override", Boolean.toString(overrideEnabled));
Expand All @@ -198,10 +204,10 @@ public HelixClusterManagerTest(boolean useComposite, boolean overrideEnabled, bo
new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutPath, partitionLayoutPath);
metricRegistry = staticClusterAgentsFactory.getMetricRegistry();
clusterManager = new CompositeClusterManager(staticClusterAgentsFactory.getClusterMap(),
new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry));
new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry));
} else {
metricRegistry = new MetricRegistry();
clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, metricRegistry);
clusterManager = new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, metricRegistry);
}
}

Expand Down Expand Up @@ -234,13 +240,14 @@ public void instantiationTest() throws Exception {
JSONObject invalidZkJson = constructZkLayoutJSON(zkInfos);
Properties props = new Properties();
props.setProperty("clustermap.host.name", hostname);
props.setProperty("clustermap.port", Integer.toString(portNum));
props.setProperty("clustermap.cluster.name", clusterNamePrefixInHelix + clusterNameStatic);
props.setProperty("clustermap.datacenter.name", localDc);
props.setProperty("clustermap.dcs.zk.connect.strings", invalidZkJson.toString(2));
ClusterMapConfig invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
metricRegistry = new MetricRegistry();
new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
metricRegistry);
new HelixClusterManager(invalidClusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
assertEquals(0L,
metricRegistry.getGauges().get(HelixClusterManager.class.getName() + ".instantiationFailed").getValue());
assertEquals(1L, metricRegistry.getGauges()
Expand All @@ -256,8 +263,8 @@ public void instantiationTest() throws Exception {
invalidClusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props));
metricRegistry = new MetricRegistry();
try {
new HelixClusterManager(invalidClusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
metricRegistry);
new HelixClusterManager(invalidClusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);
fail("Instantiation should have failed with invalid zk addresses");
} catch (IOException e) {
assertEquals(1L,
Expand All @@ -269,7 +276,7 @@ public void instantiationTest() throws Exception {

metricRegistry = new MetricRegistry();
try {
new HelixClusterManager(clusterMapConfig, hostname,
new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, new Exception("beBad")), metricRegistry);
fail("Instantiation should fail with a HelixManager factory that throws exception on listener registrations");
} catch (Exception e) {
Expand All @@ -289,9 +296,8 @@ public void emptyPartitionOverrideTest() throws Exception {
assumeTrue(overrideEnabled);
metricRegistry = new MetricRegistry();
// create a MockHelixManagerFactory
ClusterMap clusterManagerWithEmptyRecord =
new HelixClusterManager(clusterMapConfig, hostname, new MockHelixManagerFactory(helixCluster, null, null),
metricRegistry);
ClusterMap clusterManagerWithEmptyRecord = new HelixClusterManager(clusterMapConfig, selfInstanceName,
new MockHelixManagerFactory(helixCluster, null, null), metricRegistry);

Set<String> writableInClusterManager = new HashSet<>();
for (PartitionId partition : clusterManagerWithEmptyRecord.getWritablePartitionIds(null)) {
Expand All @@ -314,14 +320,11 @@ public void emptyPartitionOverrideTest() throws Exception {
public void basicInterfaceTest() throws Exception {
assumeTrue(!overrideEnabled);

for (String metricName : clusterManager.getMetricRegistry().getNames()) {
System.out.println(metricName);
}
assertEquals("Incorrect local datacenter ID", 0, clusterManager.getLocalDatacenterId());
testPartitionReplicaConsistency();
testInvalidPartitionId();
testDatacenterDatanodeReplicas();
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}

/**
Expand All @@ -334,24 +337,39 @@ public void helixInitiatedLivenessChangeTest() throws Exception {
assumeTrue(!useComposite && !overrideEnabled);

// all instances are up initially.
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());

// Bring one instance down in each dc.
// Bring one instance (not current host) down in each dc
for (String zkAddr : helixCluster.getZkAddrs()) {
helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0));
String instance =
helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get();
helixCluster.bringInstanceDown(instance);
}

assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());

// Bring all instances down in all dcs.
helixCluster.bringAllInstancesDown();
assertStateEquivalency();
Set<String> expectedDownInstances = helixCluster.getDownInstances();
Set<String> expectedUpInstances = helixCluster.getUpInstances();
expectedDownInstances.remove(selfInstanceName);
expectedUpInstances.add(selfInstanceName);
assertStateEquivalency(expectedDownInstances, expectedUpInstances);

// Bring one instance up in each dc.
boolean selfInstanceIsChosen = false;
for (String zkAddr : helixCluster.getZkAddrs()) {
helixCluster.bringInstanceUp(helixCluster.getDownInstances(zkAddr).get(0));
String instanceName = helixCluster.getDownInstances(zkAddr).get(0);
selfInstanceIsChosen = instanceName.equals(selfInstanceName);
helixCluster.bringInstanceUp(instanceName);
}
expectedDownInstances = helixCluster.getDownInstances();
expectedUpInstances = helixCluster.getUpInstances();
if (!selfInstanceIsChosen) {
expectedDownInstances.remove(selfInstanceName);
expectedUpInstances.add(selfInstanceName);
}
assertStateEquivalency();
assertStateEquivalency(expectedDownInstances, expectedUpInstances);
}

/**
Expand Down Expand Up @@ -422,7 +440,7 @@ public void clientInitiatedLivenessChangeTest() {

// The following does not do anything currently.
clusterManager.onReplicaEvent(replica, ReplicaEventType.Partition_ReadOnly);
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}

/**
Expand Down Expand Up @@ -488,7 +506,7 @@ public void sealedReplicaChangeTest() throws Exception {
assumeTrue(!useComposite && !overrideEnabled && listenCrossColo);

// all instances are up initially.
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());

AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0);
List<String> instances = helixCluster.getInstancesForPartition((partition.toPathString()));
Expand All @@ -513,7 +531,7 @@ public void sealedReplicaChangeTest() throws Exception {
clusterManager.getWritablePartitionIds(null).contains(partition));
assertEquals("If no replica is SEALED, the whole partition should be Writable", PartitionState.READ_WRITE,
partition.getPartitionState());
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());
}

/**
Expand Down Expand Up @@ -568,7 +586,7 @@ public void clusterMapOverrideEnabledAndDisabledTest() throws Exception {
clusterManager.close();
MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, znRecord, null);
HelixClusterManager clusterManager =
new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry());
new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry());
// Ensure the new RW partition is added
assertEquals("Mismatch in writable partitions when instanceConfig changes", writableInOverrideMap.size() + 1,
clusterManager.getWritablePartitionIds(null).size());
Expand Down Expand Up @@ -621,7 +639,7 @@ public void stoppedReplicaChangeTest() {
assumeTrue(!useComposite && !overrideEnabled && listenCrossColo);

// all instances are up initially.
assertStateEquivalency();
assertStateEquivalency(helixCluster.getDownInstances(), helixCluster.getUpInstances());

AmbryPartition partition = (AmbryPartition) clusterManager.getWritablePartitionIds(null).get(0);
List<String> instances = helixCluster.getInstancesForPartition((partition.toPathString()));
Expand Down Expand Up @@ -679,11 +697,19 @@ public void xidTest() throws Exception {
MockHelixManagerFactory helixManagerFactory = new MockHelixManagerFactory(helixCluster, null, null);
List<InstanceConfig> instanceConfigs = helixCluster.getAllInstanceConfigs();
int instanceCount = instanceConfigs.size();
int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size());
// find the self instance config and put it at the end of list. This is to ensure subsequent test won't choose self instance config.
for (int i = 0; i < instanceCount; ++i) {
if (instanceConfigs.get(i).getInstanceName().equals(selfInstanceName)) {
Collections.swap(instanceConfigs, i, instanceConfigs.size() - 1);
break;
}
}
int randomIndex = com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 1);
InstanceConfig aheadInstanceConfig = instanceConfigs.get(randomIndex);
Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 1);
Collections.swap(instanceConfigs, randomIndex, instanceConfigs.size() - 2);
aheadInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 1));
clusterManager = new HelixClusterManager(clusterMapConfig, hostname, helixManagerFactory, new MetricRegistry());
clusterManager =
new HelixClusterManager(clusterMapConfig, selfInstanceName, helixManagerFactory, new MetricRegistry());
assertEquals(instanceCount - 1, clusterManager.getDataNodeIds().size());
for (DataNodeId dataNode : clusterManager.getDataNodeIds()) {
String instanceName = ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort());
Expand All @@ -696,9 +722,9 @@ public void xidTest() throws Exception {
assertEquals(instanceCount, aheadInstanceClusterManager.getDataNodeIds().size());
}

// Post-initialization InstanceConfig change:
// Post-initialization InstanceConfig change: pick an instance that is neither previous instance nor self instance
InstanceConfig ignoreInstanceConfig =
instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceConfigs.size() - 1));
instanceConfigs.get(com.github.ambry.utils.TestUtils.RANDOM.nextInt(instanceCount - 2));
String ignoreInstanceName = ignoreInstanceConfig.getInstanceName();
ignoreInstanceConfig.getRecord().setSimpleField(XID_STR, Long.toString(CURRENT_XID + 2));

Expand Down Expand Up @@ -785,9 +811,11 @@ public void metricsTest() throws Exception {
// live instance trigger happens once initially.
long instanceTriggerCount = dcs.length;

// Bring one instance down in each dc in order to test the metrics more generally.
// Bring one instance (not current instance) down in each dc in order to test the metrics more generally.
for (String zkAddr : helixCluster.getZkAddrs()) {
helixCluster.bringInstanceDown(helixCluster.getUpInstances(zkAddr).get(0));
String instance =
helixCluster.getUpInstances(zkAddr).stream().filter(name -> !name.equals(selfInstanceName)).findFirst().get();
helixCluster.bringInstanceDown(instance);
instanceTriggerCount++;
}

Expand Down Expand Up @@ -981,11 +1009,10 @@ private void testDatacenterDatanodeReplicas() {
/**
* Assert that the state of datanodes in the cluster manager's view are consistent with their actual states in the
* cluster.
* @param expectedDownInstances the expected down instances set in cluster manager.
* @param expectedUpInstances the expected up instances set in cluster manager.
*/
private void assertStateEquivalency() {
Set<String> upInstancesInCluster = helixCluster.getUpInstances();
Set<String> downInstancesInCluster = helixCluster.getDownInstances();

private void assertStateEquivalency(Set<String> expectedDownInstances, Set<String> expectedUpInstances) {
Set<String> upInstancesInClusterManager = new HashSet<>();
Set<String> downInstancesInClusterManager = new HashSet<>();
for (DataNodeId dataNode : clusterManager.getDataNodeIds()) {
Expand All @@ -997,8 +1024,8 @@ private void assertStateEquivalency() {
ClusterMapUtils.getInstanceName(dataNode.getHostname(), dataNode.getPort())));
}
}
assertEquals(downInstancesInCluster, downInstancesInClusterManager);
assertEquals(upInstancesInCluster, upInstancesInClusterManager);
assertEquals(expectedDownInstances, downInstancesInClusterManager);
assertEquals(expectedUpInstances, upInstancesInClusterManager);
Pair<Set<String>, Set<String>> writablePartitionsInTwoPlaces = getWritablePartitions();
assertEquals(writablePartitionsInTwoPlaces.getFirst(), writablePartitionsInTwoPlaces.getSecond());
testAllPartitions();
Expand Down