diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java index 04e80ce8b6..8a1389f72d 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/HelixVcrStateModel.java @@ -68,6 +68,12 @@ public void onBecomeOfflineFromLeader(Message message, NotificationContext conte helixVcrCluster.removePartition(message.getPartitionName()); } + @Transition(to = "DROPPED", from = "OFFLINE") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + logger.info("{} Becoming DROPPED from OFFLINE of Partition {}", helixVcrCluster.getCurrentDataNodeId(), + message.getPartitionName()); + } + @Override public void reset() { // no op diff --git a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java index 3675fa5ad0..0c4c1670d6 100644 --- a/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java +++ b/ambry-cloud/src/main/java/com.github.ambry.cloud/VcrServer.java @@ -174,7 +174,7 @@ public void startup() throws InstantiationException { public void shutdown() { long startTime = SystemTime.getInstance().milliseconds(); try { - logger.info("shutdown started"); + logger.info("VCR shutdown started"); if (scheduler != null) { shutDownExecutorService(scheduler, 5, TimeUnit.MINUTES); } @@ -213,8 +213,8 @@ public void shutdown() { } } - public void awaitShutdown(int timeoutMs) throws InterruptedException { - shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); + public boolean awaitShutdown(int timeoutMs) throws InterruptedException { + return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS); } public VirtualReplicatorCluster getVirtualReplicatorCluster() { diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java index 6c6dba4a05..5b90faed48 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/HelixVcrClusterTest.java @@ -23,6 +23,7 @@ import com.github.ambry.config.VerifiableProperties; import com.github.ambry.utils.HelixControllerManager; import com.github.ambry.utils.TestUtils; +import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.Set; @@ -73,14 +74,16 @@ public static void afterClass() { @Test public void helixVcrClusterTest() throws Exception { StrictMatchExternalViewVerifier helixBalanceVerifier = - new StrictMatchExternalViewVerifier(ZK_CONNECT_STRING, VCR_CLUSTER_NAME, null, null); + new StrictMatchExternalViewVerifier(ZK_CONNECT_STRING, VCR_CLUSTER_NAME, + Collections.singleton(VcrTestUtil.helixResource), null); // Create helixInstance1 and join the cluster. All partitions should be assigned to helixInstance1. VirtualReplicatorCluster helixInstance1 = createHelixInstance(8123, 10123); List expectedPartitions = mockClusterMap.getAllPartitionIds(null); MockVcrListener mockVcrListener = new MockVcrListener(); helixInstance1.addListener(mockVcrListener); helixInstance1.participate(InstanceType.PARTICIPANT); - Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000)); + TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() > 0, 1000); + Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000)); Assert.assertEquals("Partition assignment are not correct.", helixInstance1.getAssignedPartitionIds(), expectedPartitions); @@ -89,8 +92,8 @@ public void helixVcrClusterTest() throws Exception { helixInstance2.participate(InstanceType.PARTICIPANT); // Detect any ideal state change first. TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() < expectedPartitions.size(), - 500); - Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000)); + 1000); + Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000)); Assert.assertEquals("Number of partitions removed are not correct.", expectedPartitions.size() / 2, mockVcrListener.getPartitionSet().size()); @@ -99,7 +102,7 @@ public void helixVcrClusterTest() throws Exception { // Detect any ideal state change first. TestUtils.checkAndSleep(true, () -> helixInstance1.getAssignedPartitionIds().size() > expectedPartitions.size() / 2, 500); - Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(2000)); + Assert.assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000)); Assert.assertEquals("Partition assignment are not correct.", helixInstance1.getAssignedPartitionIds(), expectedPartitions); diff --git a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java index 81b1354acc..36f8930141 100644 --- a/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java +++ b/ambry-cloud/src/test/java/com.github.ambry.cloud/VcrTestUtil.java @@ -46,6 +46,7 @@ */ public class VcrTestUtil { + public static String helixResource = "resource1"; /** * Create a {@link VcrServer}. * @param properties the config properties to use. @@ -87,16 +88,15 @@ public static HelixControllerManager populateZkInfoAndStartController(String zKC clusterConfig.setPersistBestPossibleAssignment(true); configAccessor.setClusterConfig(vcrClusterName, clusterConfig); - String resourceName = "1"; - FullAutoModeISBuilder builder = new FullAutoModeISBuilder(resourceName); + FullAutoModeISBuilder builder = new FullAutoModeISBuilder(helixResource); builder.setStateModel(LeaderStandbySMD.name); for (PartitionId partitionId : clusterMap.getAllPartitionIds(null)) { builder.add(partitionId.toPathString()); } builder.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); IdealState idealState = builder.build(); - admin.addResource(vcrClusterName, resourceName, idealState); - admin.rebalance(vcrClusterName, resourceName, 3, "", ""); + admin.addResource(vcrClusterName, helixResource, idealState); + admin.rebalance(vcrClusterName, helixResource, 3, "", ""); HelixControllerManager helixControllerManager = new HelixControllerManager(zKConnectString, vcrClusterName); helixControllerManager.syncStart(); return helixControllerManager; diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index f1ed980dac..fa051010e3 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -44,6 +44,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import junit.framework.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; @@ -55,6 +57,7 @@ * On shutdown we ensure the servers are shutdown. */ public class MockCluster { + private Logger logger = LoggerFactory.getLogger(getClass()); private final MockClusterAgentsFactory mockClusterAgentsFactory; private final MockClusterMap clusterMap; private List serverList = null; @@ -179,6 +182,7 @@ public void startServers() throws InstantiationException, IOException { */ public void stopServers() throws IOException { if (serverInitialized) { + logger.info("Stopping servers......"); CountDownLatch shutdownLatch = new CountDownLatch(serverList.size()); for (AmbryServer server : serverList) { new Thread(new ServerShutdown(shutdownLatch, server)).start(); @@ -221,6 +225,7 @@ public List getOneDataNodeFromEachDatacenter(ArrayList datac } class ServerShutdown implements Runnable { + private Logger logger = LoggerFactory.getLogger(getClass()); private final CountDownLatch latch; private final AmbryServer server; @@ -232,6 +237,11 @@ public ServerShutdown(CountDownLatch latch, AmbryServer ambryServer) { @Override public void run() { server.shutdown(); + try { + server.awaitShutdown(); + } catch (InterruptedException e) { + logger.warn("Server awaitShutdown is interrupted."); + } latch.countDown(); } } diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java b/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java index c087cc7542..c3e16c7b20 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/VcrBackupTest.java @@ -33,6 +33,7 @@ import com.github.ambry.utils.Utils; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -46,11 +47,14 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.junit.Assert.*; public class VcrBackupTest { + private Logger logger = LoggerFactory.getLogger(getClass()); private Properties routerProps; private MockNotificationSystem notificationSystem; private MockCluster mockCluster; @@ -81,6 +85,7 @@ public void setup() throws Exception { @After public void cleanup() throws IOException { + logger.info("Start to clean up."); mockCluster.cleanup(); helixControllerManager.syncStop(); zkInfo.shutdown(); @@ -107,7 +112,7 @@ public void basicTest() throws Exception { // Waiting for backup done assertTrue("Did not backup all blobs in 2 minutes", latchBasedInMemoryCloudDestination.await(2, TimeUnit.MINUTES)); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); } /** @@ -116,7 +121,8 @@ public void basicTest() throws Exception { @Test public void singleNodeUpDownTestWithoutPersist() throws Exception { StrictMatchExternalViewVerifier helixBalanceVerifier = - new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null); + new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, + Collections.singleton(VcrTestUtil.helixResource), null); int numberOfBlobs = 100; sendBlobToDataNode(dataNode, numberOfBlobs); // Create in memory cloud destination. @@ -139,11 +145,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception { assertTrue("Blob count is not correct.", TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy.getBlobIds().size(), 200)); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); assertEquals("No token is expected.", 0, latchBasedInMemoryCloudDestination.getTokenMap().size()); @@ -157,11 +163,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception { // Because same cloud destination is used, getMissingKey() will filter out all keys. assertEquals("Number of blobs doesn't match", 0, vcrNotificationSystem.getBlobIds().size()); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); // Start VCR again with different cloud destination @@ -179,11 +185,11 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception { assertTrue("Blob count is not correct.", TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy2.getBlobIds().size(), 200)); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); } @@ -193,7 +199,8 @@ public void singleNodeUpDownTestWithoutPersist() throws Exception { @Test public void singleNodeUpDownTestWithPersist() throws Exception { StrictMatchExternalViewVerifier helixBalanceVerifier = - new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null); + new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, + Collections.singleton(VcrTestUtil.helixResource), null); int numberOfBlobs = 100; sendBlobToDataNode(dataNode, numberOfBlobs); // Create in memory cloud destination. @@ -216,11 +223,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception { assertTrue("Blob count is not correct.", TestUtils.checkAndSleep(numberOfBlobs, () -> vcrNotificationSystemCopy.getBlobIds().size(), 200)); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); assertTrue("Token is expected.", latchBasedInMemoryCloudDestination.getTokenMap().size() > 0); @@ -234,11 +241,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception { // Because token is reloaded, back up number is 0. assertEquals("Number of blobs doesn't match", 0, vcrNotificationSystem.getBlobIds().size()); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); // Start VCR again with token. @@ -259,11 +266,11 @@ public void singleNodeUpDownTestWithPersist() throws Exception { assertTrue("Blob count is not correct.", TestUtils.checkAndSleep(0, () -> vcrNotificationSystemCopy2.getBlobIds().size(), 200)); vcrServer.shutdown(); - vcrServer.awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServer.awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServer.getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); } @@ -273,7 +280,8 @@ public void singleNodeUpDownTestWithPersist() throws Exception { @Test public void multipleVcrTest() throws Exception { StrictMatchExternalViewVerifier helixBalanceVerifier = - new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, null, null); + new StrictMatchExternalViewVerifier(zkConnectString, vcrClusterName, + Collections.singleton(VcrTestUtil.helixResource), null); int initialNumOfVcrs = 5; // create a shared in memory destination. LatchBasedInMemoryCloudDestination latchBasedInMemoryCloudDestination = @@ -317,6 +325,7 @@ public void multipleVcrTest() throws Exception { assertEquals("Each VCR should only backup its assigned partitions.", new HashSet<>(vcrServers.get(i).getVirtualReplicatorCluster().getAssignedPartitionIds()), partitionIdSet); } + logger.info("Phase 1 done."); // 2nd phase: Add a new VCR to cluster. Properties props = VcrTestUtil.createVcrProperties(dataNode.getDatacenterName(), vcrClusterName, zkConnectString, @@ -334,20 +343,21 @@ public void multipleVcrTest() throws Exception { sendBlobToDataNode(dataNode, secondNumOfBlobs); Assert.assertTrue("All blobs should be back up.", TestUtils.checkAndSleep(numOfBlobs + secondNumOfBlobs, () -> vcrNotificationSystems.stream().mapToInt(i -> i.getBlobIds().size()).sum(), 5000)); + logger.info("Phase 2 done."); // 3rd phase: Remove last VCR from cluster. vcrServers.get(vcrServers.size() - 1).shutdown(); - vcrServers.get(vcrServers.size() - 1).awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServers.get(vcrServers.size() - 1).awaitShutdown(5000)); // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, vcrServers.get(vcrServers.size() - 1) + Assert.assertEquals("Error count should be zero", 0, vcrServers.get(vcrServers.size() - 1) .getCloudBackupManager() .getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, vcrServers.get(vcrServers.size() - 1) + Assert.assertEquals("Error count should be zero", 0, vcrServers.get(vcrServers.size() - 1) .getCloudBackupManager() .getVcrMetrics().removePartitionErrorCount.getCount()); int temp = vcrNotificationSystems.get(vcrNotificationSystems.size() - 1).getBlobIds().size(); - assertTrue("Wait for Helix balance.", helixBalanceVerifier.verify(3000)); + assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000)); int thirdNumOfBlobs = 1000; sendBlobToDataNode(dataNode, thirdNumOfBlobs); Assert.assertTrue("All blobs should be back up.", @@ -355,17 +365,20 @@ public void multipleVcrTest() throws Exception { () -> vcrNotificationSystems.stream().mapToInt(i -> i.getBlobIds().size()).sum(), 5000)); Assert.assertEquals("The removed vcr shouldn't have any change", temp, vcrNotificationSystems.get(vcrNotificationSystems.size() - 1).getBlobIds().size()); + logger.info("Phase 3 done."); // Shutdown all others. for (int i = 0; i < initialNumOfVcrs; i++) { // Error metrics should be zero. - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServers.get(i).getCloudBackupManager().getVcrMetrics().addPartitionErrorCount.getCount()); - Assert.assertEquals("Error count shoud be zero", 0, + Assert.assertEquals("Error count should be zero", 0, vcrServers.get(i).getCloudBackupManager().getVcrMetrics().removePartitionErrorCount.getCount()); vcrServers.get(i).shutdown(); - vcrServers.get(i).awaitShutdown(1000); + assertTrue("VCR server shutdown timeout.", vcrServers.get(i).awaitShutdown(5000)); } + + logger.info("Test done."); } /** @@ -374,9 +387,9 @@ public void multipleVcrTest() throws Exception { * @param helixBalanceVerifier helix balance verifier. */ private void makeSureHelixBalance(VcrServer vcrServer, StrictMatchExternalViewVerifier helixBalanceVerifier) { - Assert.assertTrue("Make sure topology change happen.", TestUtils.checkAndSleep(true, - () -> vcrServer.getVirtualReplicatorCluster().getAssignedPartitionIds().size() > 0, 3000)); - assertTrue("Wait for Helix balance.", helixBalanceVerifier.verify(3000)); + Assert.assertTrue("Helix topology change timeout.", TestUtils.checkAndSleep(true, + () -> vcrServer.getVirtualReplicatorCluster().getAssignedPartitionIds().size() > 0, 10000)); + assertTrue("Helix balance timeout.", helixBalanceVerifier.verify(5000)); } /**