Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix helix test issues. #1177

Merged
merged 2 commits into from
Jun 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ public void onBecomeOfflineFromLeader(Message message, NotificationContext conte
helixVcrCluster.removePartition(message.getPartitionName());
}

@Transition(to = "DROPPED", from = "OFFLINE")
public void onBecomeDroppedFromOffline(Message message, NotificationContext context) {
logger.info("{} Becoming DROPPED from OFFLINE of Partition {}", helixVcrCluster.getCurrentDataNodeId(),
message.getPartitionName());
}

@Override
public void reset() {
// no op
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void startup() throws InstantiationException {
public void shutdown() {
long startTime = SystemTime.getInstance().milliseconds();
try {
logger.info("shutdown started");
logger.info("VCR shutdown started");
if (scheduler != null) {
shutDownExecutorService(scheduler, 5, TimeUnit.MINUTES);
}
Expand Down Expand Up @@ -213,8 +213,8 @@ public void shutdown() {
}
}

public void awaitShutdown(int timeoutMs) throws InterruptedException {
shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
public boolean awaitShutdown(int timeoutMs) throws InterruptedException {
return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
}

public VirtualReplicatorCluster getVirtualReplicatorCluster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.github.ambry.config.VerifiableProperties;
import com.github.ambry.utils.HelixControllerManager;
import com.github.ambry.utils.TestUtils;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -73,14 +74,16 @@ public static void afterClass() {
@Test
public void helixVcrClusterTest() throws Exception {
StrictMatchExternalViewVerifier helixBalanceVerifier =
new StrictMatchExternalViewVerifier(ZK_CONNECT_STRING, VCR_CLUSTER_NAME, null, null);
new StrictMatchExternalViewVerifier(ZK_CONNECT_STRING, VCR_CLUSTER_NAME,
Collections.singleton(VcrTestUtil.helixResource), null);
// Create helixInstance1 and join the cluster. All partitions should be assigned to helixInstance1.
VirtualReplicatorCluster helixInstance1 = createHelixInstance(8123, 10123);
List<PartitionId> expectedPartitions = mockClusterMap.getAllPartitionIds(null);
MockVcrListener mockVcrListener = new MockVcrListener();
helixInstance1.addListener(mockVcrListener);
helixInstance1.participate(InstanceType.PARTICIPANT);
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000));
TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() > 0, 1000);
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000));
Assert.assertEquals("Partition assignment are not correct.", helixInstance1.getAssignedPartitionIds(),
expectedPartitions);

Expand All @@ -89,8 +92,8 @@ public void helixVcrClusterTest() throws Exception {
helixInstance2.participate(InstanceType.PARTICIPANT);
// Detect any ideal state change first.
TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() < expectedPartitions.size(),
500);
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000));
1000);
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000));
Assert.assertEquals("Number of partitions removed are not correct.", expectedPartitions.size() / 2,
mockVcrListener.getPartitionSet().size());

Expand All @@ -99,7 +102,7 @@ public void helixVcrClusterTest() throws Exception {
// Detect any ideal state change first.
TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() > expectedPartitions.size() / 2,
500);
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000));
Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000));
Assert.assertEquals("Partition assignment are not correct.", helixInstance1.getAssignedPartitionIds(),
expectedPartitions);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
*/
public class VcrTestUtil {

public static String helixResource = "resource1";
/**
* Create a {@link VcrServer}.
* @param properties the config properties to use.
Expand Down Expand Up @@ -87,16 +88,15 @@ public static HelixControllerManager populateZkInfoAndStartController(String zKC
clusterConfig.setPersistBestPossibleAssignment(true);
configAccessor.setClusterConfig(vcrClusterName, clusterConfig);

String resourceName = "1";
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName);
FullAutoModeISBuilder builder = new FullAutoModeISBuilder(helixResource);
builder.setStateModel(LeaderStandbySMD.name);
for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) {
builder.add(partitionId.toPathString());
}
builder.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName());
IdealState idealState = builder.build();
admin.addResource(vcrClusterName, resourceName, idealState);
admin.rebalance(vcrClusterName, resourceName, 3, "", "");
admin.addResource(vcrClusterName, helixResource, idealState);
admin.rebalance(vcrClusterName, helixResource, 3, "", "");
HelixControllerManager helixControllerManager = new HelixControllerManager(zKConnectString, vcrClusterName);
helixControllerManager.syncStart();
return helixControllerManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.*;

Expand All @@ -55,6 +57,7 @@
* On shutdown we ensure the servers are shutdown.
*/
public class MockCluster {
private Logger logger = LoggerFactory.getLogger(getClass());
private final MockClusterAgentsFactory mockClusterAgentsFactory;
private final MockClusterMap clusterMap;
private List<AmbryServer> serverList = null;
Expand Down Expand Up @@ -179,6 +182,7 @@ public void startServers() throws InstantiationException, IOException {
*/
public void stopServers() throws IOException {
if (serverInitialized) {
logger.info("Stopping servers......");
CountDownLatch shutdownLatch = new CountDownLatch(serverList.size());
for (AmbryServer server : serverList) {
new Thread(new ServerShutdown(shutdownLatch, server)).start();
Expand Down Expand Up @@ -221,6 +225,7 @@ public List<DataNodeId> getOneDataNodeFromEachDatacenter(ArrayList<String> datac
}

class ServerShutdown implements Runnable {
private Logger logger = LoggerFactory.getLogger(getClass());
private final CountDownLatch latch;
private final AmbryServer server;

Expand All @@ -232,6 +237,11 @@ public ServerShutdown(CountDownLatch latch, AmbryServer ambryServer) {
@Override
public void run() {
server.shutdown();
try {
server.awaitShutdown();
} catch (InterruptedException e) {
logger.warn("Server awaitShutdown is interrupted.");
}
latch.countDown();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.github.ambry.utils.Utils;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -46,11 +47,14 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.*;


public class VcrBackupTest {
private Logger logger = LoggerFactory.getLogger(getClass());
private Properties routerProps;
private MockNotificationSystem notificationSystem;
private MockCluster mockCluster;
Expand Down Expand Up @@ -81,6 +85,7 @@ public void setup() throws Exception {

@After
public void cleanup() throws IOException {
logger.info("Start to clean up.");
mockCluster.cleanup();
helixControllerManager.syncStop();
zkInfo.shutdown();
Expand All @@ -107,7 +112,7 @@ public void basicTest() throws Exception {
// Waiting for backup done
assertTrue("Did not backup all blobs in 2 minutes", latchBasedInMemoryCloudDestination.await(2, TimeUnit.MINUTES));
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
}

/**
Expand All @@ -116,7 +121,8 @@ public void basicTest() throws Exception {
@Test
public void singleNodeUpDownTestWithoutPersist() throws Exception {
StrictMatchExternalViewVerifier helixBalanceVerifier =
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null);
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName,
Collections.singleton(VcrTestUtil.helixResource), null);
int numberOfBlobs = 100;
sendBlobToDataNode(dataNode, numberOfBlobs);
// Create in memory cloud destination.
Expand All @@ -139,11 +145,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception {
assertTrue("Blob count is not correct.",
TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy.getBlobIds().size(), 200));
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());
assertEquals("No token is expected.", 0, latchBasedInMemoryCloudDestination.getTokenMap().size());

Expand All @@ -157,11 +163,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception {
// Because same cloud destination is used, getMissingKey() will filter out all keys.
assertEquals("Number of blobs doesn't match", 0, vcrNotificationSystem.getBlobIds().size());
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());

// Start VCR again with different cloud destination
Expand All @@ -179,11 +185,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception {
assertTrue("Blob count is not correct.",
TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy2.getBlobIds().size(), 200));
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());
}

Expand All @@ -193,7 +199,8 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception {
@Test
public void singleNodeUpDownTestWithPersist() throws Exception {
StrictMatchExternalViewVerifier helixBalanceVerifier =
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null);
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName,
Collections.singleton(VcrTestUtil.helixResource), null);
int numberOfBlobs = 100;
sendBlobToDataNode(dataNode, numberOfBlobs);
// Create in memory cloud destination.
Expand All @@ -216,11 +223,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception {
assertTrue("Blob count is not correct.",
TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy.getBlobIds().size(), 200));
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());
assertTrue("Token is expected.", latchBasedInMemoryCloudDestination.getTokenMap().size() > 0);

Expand All @@ -234,11 +241,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception {
// Because token is reloaded, back up number is 0.
assertEquals("Number of blobs doesn't match", 0, vcrNotificationSystem.getBlobIds().size());
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());

// Start VCR again with token.
Expand All @@ -259,11 +266,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception {
assertTrue("Blob count is not correct.",
TestUtils.checkAndSleep(0, () -> vcrNotificationSystemCopy2.getBlobIds().size(), 200));
vcrServer.shutdown();
vcrServer.awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());
}

Expand All @@ -273,7 +280,8 @@ public void singleNodeUpDownTestWithPersist() throws Exception {
@Test
public void multipleVcrTest() throws Exception {
StrictMatchExternalViewVerifier helixBalanceVerifier =
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null);
new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName,
Collections.singleton(VcrTestUtil.helixResource), null);
int initialNumOfVcrs = 5;
// create a shared in memory destination.
LatchBasedInMemoryCloudDestination latchBasedInMemoryCloudDestination =
Expand Down Expand Up @@ -317,6 +325,7 @@ public void multipleVcrTest() throws Exception {
assertEquals("Each VCR should only backup its assigned partitions.",
new HashSet<>(vcrServers.get(i).getVirtualReplicatorCluster().getAssignedPartitionIds()), partitionIdSet);
}
logger.info("Phase 1 done.");

// 2nd phase: Add a new VCR to cluster.
Properties props = VcrTestUtil.createVcrProperties(dataNode.getDatacenterName(), vcrClusterName, zkConnectString,
Expand All @@ -334,38 +343,42 @@ public void multipleVcrTest() throws Exception {
sendBlobToDataNode(dataNode, secondNumOfBlobs);
Assert.assertTrue("All blobs should be back up.", TestUtils.checkAndSleep(numOfBlobs + secondNumOfBlobs,
() -> vcrNotificationSystems.stream().mapToInt(i -> i.getBlobIds().size()).sum(), 5000));
logger.info("Phase 2 done.");

// 3rd phase: Remove last VCR from cluster.
vcrServers.get(vcrServers.size() - 1).shutdown();
vcrServers.get(vcrServers.size() - 1).awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServers.get(vcrServers.size() - 1).awaitShutdown(5000));
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0, vcrServers.get(vcrServers.size() - 1)
Assert.assertEquals("Error count should be zero", 0, vcrServers.get(vcrServers.size() - 1)
.getCloudBackupManager()
.getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0, vcrServers.get(vcrServers.size() - 1)
Assert.assertEquals("Error count should be zero", 0, vcrServers.get(vcrServers.size() - 1)
.getCloudBackupManager()
.getVcrMetrics().removePartitionErrorCount.getCount());
int temp = vcrNotificationSystems.get(vcrNotificationSystems.size() - 1).getBlobIds().size();

assertTrue("Wait for Helix balance.", helixBalanceVerifier.verify(3000));
assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000));
int thirdNumOfBlobs = 1000;
sendBlobToDataNode(dataNode, thirdNumOfBlobs);
Assert.assertTrue("All blobs should be back up.",
TestUtils.checkAndSleep(numOfBlobs + secondNumOfBlobs + thirdNumOfBlobs,
() -> vcrNotificationSystems.stream().mapToInt(i -> i.getBlobIds().size()).sum(), 5000));
Assert.assertEquals("The removed vcr shouldn't have any change", temp,
vcrNotificationSystems.get(vcrNotificationSystems.size() - 1).getBlobIds().size());
logger.info("Phase 3 done.");

// Shutdown all others.
for (int i = 0; i < initialNumOfVcrs; i++) {
// Error metrics should be zero.
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServers.get(i).getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount());
Assert.assertEquals("Error count shoud be zero", 0,
Assert.assertEquals("Error count should be zero", 0,
vcrServers.get(i).getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount());
vcrServers.get(i).shutdown();
vcrServers.get(i).awaitShutdown(1000);
assertTrue("VCR server shutdown timeout.", vcrServers.get(i).awaitShutdown(5000));
}

logger.info("Test done.");
}

/**
Expand All @@ -374,9 +387,9 @@ public void multipleVcrTest() throws Exception {
* @param helixBalanceVerifier helix balance verifier.
*/
private void makeSureHelixBalance(VcrServer vcrServer, StrictMatchExternalViewVerifier helixBalanceVerifier) {
Assert.assertTrue("Make sure topology change happen.", TestUtils.checkAndSleep(true,
() -> vcrServer.getVirtualReplicatorCluster().getAssignedPartitionIds().size() > 0, 3000));
assertTrue("Wait for Helix balance.", helixBalanceVerifier.verify(3000));
Assert.assertTrue("Helix topology change timeout.", TestUtils.checkAndSleep(true,
() -> vcrServer.getVirtualReplicatorCluster().getAssignedPartitionIds().size() > 0, 10000));
assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000));
}

/**
Expand Down