From fcdfca8ff172886c10db52557af507297fbd8a30 Mon Sep 17 00:00:00 2001 From: Yingyi Zhang Date: Thu, 23 Jan 2020 15:55:17 -0800 Subject: [PATCH] Make writable partition selection account for partitions with more than 3 replicas (#1349) Currently, selecting writable partition for PUT operation only picks partitions with highest local replica count. This will become a problem when "move replica" is rolled out. In the intermediate state of "move replica", particular partition may have 6 replicas and if we stick with current logic. All the write traffic will be routed to this particular partition. Hence, this PR we make changes to allow partition selection to pick partitions with replicas count >= min replica count(specified in ClusterMapConfig). In most cases, min replica count = 3. --- .../config/ClusterMapConfig.java | 10 ++ .../ClusterMapUtils.java | 23 ++-- .../HelixBootstrapUpgradeUtil.java | 2 +- .../HelixClusterManager.java | 3 +- .../PartitionLayout.java | 19 ++-- .../StaticClusterAgentsFactory.java | 2 +- .../ClusterMapUtilsTest.java | 100 ++++++++++++++---- .../MockClusterMap.java | 15 ++- .../StaticClusterManagerTest.java | 31 +++--- .../TestUtils.java | 22 ++-- .../PutManagerTest.java | 6 +- .../com.github.ambry.server/MockCluster.java | 1 - .../clustermap/PartitionManager.java | 2 +- 13 files changed, 167 insertions(+), 69 deletions(-) diff --git a/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java b/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java index d96f1cca66..b0bc502c9c 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java @@ -216,6 +216,14 @@ public class ClusterMapConfig { @Config("clustermap.replica.catchup.target") public final int clustermapReplicaCatchupTarget; + /** + * The minimum number of replicas in local datacenter required for a partition to serve PUT request. This is used to + * get writable partitions for PUT operation. Any partition with replica count larger than or equal to this number is + * acceptable to be considered as a candidate. + */ + @Config("clustermap.writable.partition.min.replica.count") + public final int clustermapWritablePartitionMinReplicaCount; + public ClusterMapConfig(VerifiableProperties verifiableProperties) { clusterMapFixedTimeoutDatanodeErrorThreshold = verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100); @@ -261,5 +269,7 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getLongInRange("clustermap.replica.catchup.acceptable.lag.bytes", 0L, 0L, Long.MAX_VALUE); clustermapReplicaCatchupTarget = verifiableProperties.getIntInRange("clustermap.replica.catchup.target", 0, 0, Integer.MAX_VALUE); + clustermapWritablePartitionMinReplicaCount = + verifiableProperties.getIntInRange("clustermap.writable.partition.min.replica.count", 3, 0, Integer.MAX_VALUE); } } diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java index 2b0d42aff8..cb627b79ea 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java @@ -364,6 +364,7 @@ static boolean areAllReplicasForPartitionUp(PartitionId partition) { * Not thread safe. */ static class PartitionSelectionHelper { + private final int minimumLocalReplicaCount; private Collection allPartitions; private Map>> partitionIdsByClassAndLocalReplicaCount; private Map> partitionIdToLocalReplicas; @@ -372,10 +373,13 @@ static class PartitionSelectionHelper { /** * @param allPartitions the list of all {@link PartitionId}s * @param localDatacenterName the name of the local datacenter. Can be null if datacenter specific replica counts - * are not required. + * @param minimumLocalReplicaCount the minimum number of replicas in local datacenter. This is used when selecting + * writable partitions. */ - PartitionSelectionHelper(Collection allPartitions, String localDatacenterName) { + PartitionSelectionHelper(Collection allPartitions, String localDatacenterName, + int minimumLocalReplicaCount) { this.localDatacenterName = localDatacenterName; + this.minimumLocalReplicaCount = minimumLocalReplicaCount; updatePartitions(allPartitions, localDatacenterName); } @@ -387,6 +391,7 @@ static class PartitionSelectionHelper { */ void updatePartitions(Collection allPartitions, String localDatacenterName) { this.allPartitions = allPartitions; + // todo when new partitions added into clustermap, dynamically update these two maps. partitionIdsByClassAndLocalReplicaCount = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); partitionIdToLocalReplicas = new HashMap<>(); for (PartitionId partition : allPartitions) { @@ -455,7 +460,6 @@ List getWritablePartitions(String partitionClass) { PartitionId getRandomWritablePartition(String partitionClass, List partitionsToExclude) { PartitionId anyWritablePartition = null; List partitionsInClass = getPartitionsInClass(partitionClass, true); - int workingSize = partitionsInClass.size(); while (workingSize > 0) { int randomIndex = ThreadLocalRandom.current().nextInt(workingSize); @@ -510,20 +514,23 @@ private boolean areAllLocalReplicasForPartitionUp(PartitionId partitionId) { * Returns the partitions belonging to the {@code partitionClass}. Returns all partitions if {@code partitionClass} * is {@code null}. * @param partitionClass the class of the partitions desired. - * @param highestReplicaCountOnly if {@code true}, returns only the partitions with the highest number of replicas - * in the local datacenter. + * @param minimumReplicaCountRequired if {@code true}, returns only the partitions with the number of replicas in + * local datacenter that is larger than or equal to minimum required count. * @return the partitions belonging to the {@code partitionClass}. Returns all partitions if {@code partitionClass} * is {@code null}. */ - private List getPartitionsInClass(String partitionClass, boolean highestReplicaCountOnly) { + private List getPartitionsInClass(String partitionClass, boolean minimumReplicaCountRequired) { List toReturn = new ArrayList<>(); if (partitionClass == null) { toReturn.addAll(allPartitions); } else if (partitionIdsByClassAndLocalReplicaCount.containsKey(partitionClass)) { SortedMap> partitionsByReplicaCount = partitionIdsByClassAndLocalReplicaCount.get(partitionClass); - if (highestReplicaCountOnly) { - toReturn.addAll(partitionsByReplicaCount.get(partitionsByReplicaCount.lastKey())); + if (minimumReplicaCountRequired) { + // get partitions with replica count >= min replica count specified in ClusterMapConfig + for (List partitionIds : partitionsByReplicaCount.tailMap(minimumLocalReplicaCount).values()) { + toReturn.addAll(partitionIds); + } } else { for (List partitionIds : partitionIdsByClassAndLocalReplicaCount.get(partitionClass).values()) { toReturn.addAll(partitionIds); diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java index 9c8b8156be..10c6d26e5f 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java @@ -329,7 +329,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay } else { staticClusterMap = (new StaticClusterAgentsFactory(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(Utils.readStringFromFile(hardwareLayoutPath)), clusterMapConfig), - null))).getClusterMap(); + clusterMapConfig))).getClusterMap(); } String clusterNameInStaticClusterMap = staticClusterMap.partitionLayout.getClusterName(); clusterName = clusterNamePrefix + clusterNameInStaticClusterMap; diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java index daef0fb084..db575ea58c 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java @@ -218,7 +218,8 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam } localDatacenterId = dcToDcZkInfo.get(clusterMapConfig.clusterMapDatacenterName).dcZkInfo.getDcId(); partitionSelectionHelper = - new PartitionSelectionHelper(partitionMap.values(), clusterMapConfig.clusterMapDatacenterName); + new PartitionSelectionHelper(partitionMap.values(), clusterMapConfig.clusterMapDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } /** diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java index 786340dfeb..8b8424dc57 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.github.ambry.config.ClusterMapConfig; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -52,16 +53,16 @@ class PartitionLayout { * Create a PartitionLayout * @param hardwareLayout the {@link HardwareLayout} to use. * @param jsonObject the {@link JSONObject} that represents the partition layout - * @param localDatacenterName the name of the local datacenter. Can be {@code null}. + * @param clusterMapConfig the {@link ClusterMapConfig} to use. * @throws JSONException */ - public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, String localDatacenterName) + public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, ClusterMapConfig clusterMapConfig) throws JSONException { if (logger.isTraceEnabled()) { logger.trace("PartitionLayout " + hardwareLayout + ", " + jsonObject.toString()); } this.hardwareLayout = hardwareLayout; - this.localDatacenterName = localDatacenterName; + this.localDatacenterName = clusterMapConfig.clusterMapDatacenterName; this.clusterName = jsonObject.getString("clusterName"); this.version = jsonObject.getLong("version"); this.partitionMap = new HashMap<>(); @@ -70,26 +71,28 @@ public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, Str } validate(); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } /** * Constructor for initial PartitionLayout * @param hardwareLayout the {@link JSONObject} that represents the partition layout - * @param localDatacenterName the name of the local datacenter. Can be {@code null}. + * @param clusterMapConfig the {@link ClusterMapConfig} to use. */ - public PartitionLayout(HardwareLayout hardwareLayout, String localDatacenterName) { + public PartitionLayout(HardwareLayout hardwareLayout, ClusterMapConfig clusterMapConfig) { if (logger.isTraceEnabled()) { logger.trace("PartitionLayout " + hardwareLayout); } this.hardwareLayout = hardwareLayout; - this.localDatacenterName = localDatacenterName; + this.localDatacenterName = clusterMapConfig.clusterMapDatacenterName; this.clusterName = hardwareLayout.getClusterName(); this.version = 1; this.maxPartitionId = MinPartitionId; this.partitionMap = new HashMap<>(); validate(); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } public HardwareLayout getHardwareLayout() { diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java index 5799270aa7..c403ef9228 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java @@ -52,7 +52,7 @@ public StaticClusterAgentsFactory(ClusterMapConfig clusterMapConfig, String hard String partitionLayoutFilePath) throws JSONException, IOException { this(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(readStringFromFile(hardwareLayoutFilePath)), clusterMapConfig), - new JSONObject(readStringFromFile(partitionLayoutFilePath)), clusterMapConfig.clusterMapDatacenterName)); + new JSONObject(readStringFromFile(partitionLayoutFilePath)), clusterMapConfig)); } /** diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java index 3c2a674420..29f461dfad 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java @@ -60,10 +60,12 @@ public void partitionSelectionHelperTest() { // 2 partitions with 3 replicas in two datacenters "DC1" and "DC2" (class "max-replicas-all-sites") // 2 partitions with 3 replicas in "DC1" and 1 replica in "DC2" (class "max-local-one-remote") // 2 partitions with 3 replicas in "DC2" and 1 replica in "DC1" (class "max-local-one-remote") + // minimum number of replicas required for choosing writable partition is 3. final String dc1 = "DC1"; final String dc2 = "DC2"; final String maxReplicasAllSites = "max-replicas-all-sites"; final String maxLocalOneRemote = "max-local-one-remote"; + final int minimumLocalReplicaCount = 3; MockDataNodeId dc1Dn1 = getDataNodeId("dc1dn1", dc1); MockDataNodeId dc1Dn2 = getDataNodeId("dc1dn2", dc1); @@ -86,7 +88,7 @@ public void partitionSelectionHelperTest() { Collection allPartitionIdsMain = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(everywhere1, everywhere2, majorDc11, majorDc12, majorDc21, majorDc22))); ClusterMapUtils.PartitionSelectionHelper psh = - new ClusterMapUtils.PartitionSelectionHelper(allPartitionIdsMain, null); + new ClusterMapUtils.PartitionSelectionHelper(allPartitionIdsMain, null, minimumLocalReplicaCount); String[] dcsToTry = {null, "", dc1, dc2}; for (String dc : dcsToTry) { @@ -143,14 +145,51 @@ public void partitionSelectionHelperTest() { // expected. Nothing to do. } verifyWritablePartitionsReturned(psh, allPartitionIds, maxReplicasAllSites, everywhere1, everywhere2, - maxLocalOneRemote, expectedWritableForMaxLocalOneRemote); + maxLocalOneRemote, expectedWritableForMaxLocalOneRemote, dc); if (candidate1 != null && candidate2 != null) { verifyWritablePartitionsReturned(psh, allPartitionIds, maxLocalOneRemote, candidate1, candidate2, - maxReplicasAllSites, new HashSet<>(Arrays.asList(everywhere1, everywhere2))); + maxReplicasAllSites, new HashSet<>(Arrays.asList(everywhere1, everywhere2)), dc); } } } + /** + * Test partition with different number of replicas in local datacenter. + */ + @Test + public void partitionWithDifferentReplicaCntTest() { + // set up partitions in local dc: + // partition1 has 2 replicas; partition2 has 3 replicas; partition3 has 4 replicas + final String dc1 = "DC1"; + final String partitionClass = "default-partition-class"; + List dataNodeIdList = new ArrayList<>(); + for (int i = 1; i <= 4; ++i) { + dataNodeIdList.add(getDataNodeId("node" + i, dc1)); + } + MockPartitionId partition1 = new MockPartitionId(1, partitionClass, dataNodeIdList.subList(0, 2), 0); + MockPartitionId partition2 = new MockPartitionId(2, partitionClass, dataNodeIdList.subList(0, 3), 0); + MockPartitionId partition3 = new MockPartitionId(3, partitionClass, dataNodeIdList.subList(0, 4), 0); + List allPartitions = Arrays.asList(partition1, partition2, partition3); + int minimumLocalReplicaCount = 3; + ClusterMapUtils.PartitionSelectionHelper psh = + new ClusterMapUtils.PartitionSelectionHelper(allPartitions, dc1, minimumLocalReplicaCount); + // verify get all partitions return correct result + assertEquals("Returned partitions are not expected", allPartitions, psh.getPartitions(null)); + // verify get writable partitions return partition2 and partition3 only + assertEquals("Returned writable partitions are not expected", Arrays.asList(partition2, partition3), + psh.getWritablePartitions(partitionClass)); + assertNotSame("Get random writable partition shouldn't return partition1", partition1, + psh.getRandomWritablePartition(partitionClass, null)); + + // create another partition selection helper with minimumLocalReplicaCount = 4 + minimumLocalReplicaCount = 4; + psh = new ClusterMapUtils.PartitionSelectionHelper(allPartitions, dc1, minimumLocalReplicaCount); + assertEquals("Returned writable partitions are not expected", Arrays.asList(partition3), + psh.getWritablePartitions(partitionClass)); + assertEquals("Get random writable partition should return partition3 only", partition3, + psh.getRandomWritablePartition(partitionClass, null)); + } + /** * @param hostname the host name of the {@link MockDataNodeId}. * @param dc the name of the dc of the {@link MockDataNodeId}. @@ -230,13 +269,19 @@ private void checkCaseInsensitivityForPartitionSelectionHelper(ClusterMapUtils.P * @param classBeingTested the partition class being tested * @param expectedReturnForClassBeingTested the list of partitions that can expected to be returned for * {@code classBeingTested}. + * @param localDc the local dc name. */ private void verifyGetWritablePartition(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, - Set expectedReturnForClassBeingTested) { + Set expectedReturnForClassBeingTested, String localDc) { assertCollectionEquals("Partitions returned not as expected", allPartitionIds, psh.getWritablePartitions(null)); - assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassBeingTested, - psh.getWritablePartitions(classBeingTested)); + if (localDc == null || localDc.isEmpty()) { + assertCollectionEquals("Partitions returned not as expected", Collections.emptyList(), + psh.getWritablePartitions(classBeingTested)); + } else { + assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassBeingTested, + psh.getWritablePartitions(classBeingTested)); + } } /** @@ -247,14 +292,19 @@ private void verifyGetWritablePartition(ClusterMapUtils.PartitionSelectionHelper * @param classBeingTested the partition class being tested * @param expectedReturnForClassBeingTested the list of partitions that can expected to be returned for * {@code classBeingTested}. + * @param localDc the local dc name. */ private void verifyGetRandomWritablePartition(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, - Set expectedReturnForClassBeingTested) { + Set expectedReturnForClassBeingTested, String localDc) { assertInCollection("Random partition returned not as expected", allPartitionIds, psh.getRandomWritablePartition(null, null)); - assertInCollection("Random partition returned not as expected", expectedReturnForClassBeingTested, - psh.getRandomWritablePartition(classBeingTested, null)); + if (localDc == null || localDc.isEmpty()) { + assertNull("Partitions returned not as expected", psh.getRandomWritablePartition(classBeingTested, null)); + } else { + assertInCollection("Random partition returned not as expected", expectedReturnForClassBeingTested, + psh.getRandomWritablePartition(classBeingTested, null)); + } } /** @@ -269,37 +319,44 @@ private void verifyGetRandomWritablePartition(ClusterMapUtils.PartitionSelection * {@code classBeingTested} aren't affected). * @param expectedReturnForClassNotBeingTested the list of partitions that can expected to be returned for * {@code classsNotBeingTested}. + * @param localDc the local dc name. */ private void verifyWritablePartitionsReturned(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, MockPartitionId testedPart1, MockPartitionId testedPart2, String classsNotBeingTested, - Set expectedReturnForClassNotBeingTested) { + Set expectedReturnForClassNotBeingTested, String localDc) { Set expectedReturnForClassBeingTested = new HashSet<>(Arrays.asList(testedPart1, testedPart2)); // no problematic scenarios - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); //verify excluded partitions behavior in getRandomPartition assertNull(psh.getRandomWritablePartition(null, new ArrayList<>(allPartitionIds))); - checkCaseInsensitivityForPartitionSelectionHelper(psh, false, classBeingTested, expectedReturnForClassBeingTested); + if (localDc != null && !localDc.isEmpty()) { + checkCaseInsensitivityForPartitionSelectionHelper(psh, false, classBeingTested, + expectedReturnForClassBeingTested); + } // one replica of one partition of "classBeingTested" down ((MockReplicaId) testedPart1.getReplicaIds().get(0)).markReplicaDownStatus(true); allPartitionIds.remove(testedPart1); expectedReturnForClassBeingTested.remove(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); //for getRandomWritablePartition one replica being down doesnt change anything unless its a local replica expectedReturnForClassBeingTested.add(testedPart1); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); // one replica of other partition of "classBeingTested" down too ((MockReplicaId) testedPart2.getReplicaIds().get(0)).markReplicaDownStatus(true); allPartitionIds.remove(testedPart2); // if both have a replica down, then even though both are unhealthy, they are both returned. expectedReturnForClassBeingTested.add(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); if (expectedReturnForClassNotBeingTested != null) { assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassNotBeingTested, @@ -315,15 +372,16 @@ private void verifyWritablePartitionsReturned(ClusterMapUtils.PartitionSelection ((MockReplicaId) testedPart1.getReplicaIds().get(0)).setSealedState(true); allPartitionIds.remove(testedPart1); expectedReturnForClassBeingTested.remove(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); // all READ_ONLY ((MockReplicaId) testedPart2.getReplicaIds().get(0)).setSealedState(true); allPartitionIds.remove(testedPart2); expectedReturnForClassBeingTested.remove(testedPart2); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, null); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, null, localDc); if (expectedReturnForClassNotBeingTested != null) { assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassNotBeingTested, diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java index 661e3c1e85..ec74d8b710 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java @@ -156,7 +156,14 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe } else { specialPartition = null; } - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + // find a partition belong to DEFAULT_PARTITION_CLASS + PartitionId defaultPartition = partitions.values() + .stream() + .filter(p -> p.getPartitionClass().equals(DEFAULT_PARTITION_CLASS)) + .findFirst() + .get(); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(defaultPartition.getReplicaIds().size(), 3)); } /** @@ -175,7 +182,8 @@ public MockClusterMap(boolean enableSSLPorts, List datanodes, in partitions = new HashMap<>(); partitionIdList.forEach(p -> partitions.put(Long.valueOf(p.toPathString()), p)); this.localDatacenterName = localDatacenterName; - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(partitionIdList.get(0).getReplicaIds().size(), 3)); Set dcNames = new HashSet<>(); datanodes.forEach(node -> dcNames.add(node.getDatacenterName())); dataCentersInClusterMap.addAll(dcNames); @@ -217,7 +225,8 @@ private MockClusterMap(MockDataNodeId recoveryNode, MockDataNodeId vcrNode, Stri recoveryReplica.setPeerReplicas(Collections.singletonList(vcrReplica)); partitions.put(mockPartitionId.partition, mockPartitionId); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(mockPartitionId.getReplicaIds().size(), 3)); specialPartition = null; } diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java index 361e04a0f3..ffc0545404 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java @@ -13,9 +13,9 @@ */ package com.github.ambry.clustermap; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -46,9 +46,18 @@ * Tests {@link StaticClusterManager} class. */ public class StaticClusterManagerTest { + private final ClusterMapConfig clusterMapConfig; @Rule public org.junit.rules.TemporaryFolder folder = new TemporaryFolder(); + public StaticClusterManagerTest() { + Properties props = new Properties(); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.cluster.name", "cluster"); + props.setProperty("clustermap.datacenter.name", "dc1"); + clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + } + // Useful for understanding partition layout affect on free capacity across all hardware. public String freeCapacityDump(StaticClusterManager clusterMapManager, HardwareLayout hardwareLayout) { StringBuilder sb = new StringBuilder(); @@ -159,7 +168,7 @@ public void findDatacenter() { @Test public void addNewPartition() { TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -181,7 +190,7 @@ public void nonRackAwareAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -223,7 +232,7 @@ public void rackAwareAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha", true); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -259,7 +268,7 @@ public void rackAwareOverAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha", true); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -284,7 +293,7 @@ public void rackAwareOverAllocationTest() { @Test public void capacities() { TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -347,10 +356,9 @@ public void persistAndReadBack() throws Exception { String hardwareLayoutDe = tmpDir + "/hardwareLayoutDe.json"; String partitionLayoutDe = tmpDir + "/partitionLayoutDe.json"; - StaticClusterManager clusterMapManagerSer = getTestClusterMap(); - clusterMapManagerSer.persist(hardwareLayoutSer, partitionLayoutSer); - ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + StaticClusterManager clusterMapManagerSer = getTestClusterMap(clusterMapConfig); + clusterMapManagerSer.persist(hardwareLayoutSer, partitionLayoutSer); StaticClusterManager clusterMapManagerDe = (new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutSer, partitionLayoutSer)).getClusterMap(); @@ -481,11 +489,6 @@ public void getPartitionsTest() throws IOException, JSONException { */ @Test public void onReplicaEventTest() { - Properties props = new Properties(); - props.setProperty("clustermap.host.name", "localhost"); - props.setProperty("clustermap.cluster.name", "cluster"); - props.setProperty("clustermap.datacenter.name", "dc1"); - ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); TestPartitionLayout testPartitionLayout = new TestPartitionLayout(testHardwareLayout, null); ClusterMap clusterMapManager = diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java index d805543dbe..54c20eebf1 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java @@ -15,9 +15,9 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.commons.ResponseHandler; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.server.ServerErrorCode; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -804,6 +804,7 @@ public static class TestPartitionLayout { protected PartitionLayout partitionLayout; private JSONArray jsonPartitions; private long version; + private ClusterMapConfig clusterMapConfig; protected JSONObject makeJsonPartitionLayout() throws JSONException { return makeJsonPartitionLayout(DEFAULT_PARTITION_CLASS); @@ -842,9 +843,14 @@ public TestPartitionLayout(TestHardwareLayout testHardwareLayout, int partitionC this.replicaCountPerDc = replicaCountPerDc; this.testHardwareLayout = testHardwareLayout; - this.partitionLayout = - new PartitionLayout(testHardwareLayout.getHardwareLayout(), makeJsonPartitionLayout(), localDc); this.dcCount = testHardwareLayout.getHardwareLayout().getDatacenterCount(); + Properties props = new Properties(); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.cluster.name", "cluster"); + props.setProperty("clustermap.datacenter.name", localDc == null ? "" : localDc); + clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + this.partitionLayout = + new PartitionLayout(testHardwareLayout.getHardwareLayout(), makeJsonPartitionLayout(), clusterMapConfig); } public TestPartitionLayout(TestHardwareLayout testHardwareLayout, String localDc) throws JSONException { @@ -856,7 +862,7 @@ void addNewPartitions(int i, String partitionClass, PartitionState partitionStat throws JSONException { this.partitionCount += i; this.partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), - updateJsonPartitionLayout(partitionClass, partitionState), localDc); + updateJsonPartitionLayout(partitionClass, partitionState), clusterMapConfig); } public PartitionLayout getPartitionLayout() { @@ -922,10 +928,10 @@ public TestPartitionLayoutWithDuplicateReplicas(TestHardwareLayout testHardwareL } public static StaticClusterManager getTestClusterMap(int partitionCount, int replicaCountPerDatacenter, - long replicaCapacityInBytes) throws JSONException { + long replicaCapacityInBytes, ClusterMapConfig clusterMapConfig) throws JSONException { TestUtils.TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = new StaticClusterManager(partitionLayout, null, new MetricRegistry()); List allocatedPartitions; @@ -937,12 +943,12 @@ public static StaticClusterManager getTestClusterMap(int partitionCount, int rep return clusterMapManager; } - public static StaticClusterManager getTestClusterMap() throws JSONException { + public static StaticClusterManager getTestClusterMap(ClusterMapConfig clusterMapConfig) throws JSONException { int numPartitions = 5; int replicaCountPerDatacenter = 2; long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; - return getTestClusterMap(numPartitions, replicaCountPerDatacenter, replicaCapacityInBytes); + return getTestClusterMap(numPartitions, replicaCountPerDatacenter, replicaCapacityInBytes, clusterMapConfig); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java index a7caa71b36..8da290ad01 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java @@ -23,7 +23,6 @@ import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.LoggingNotificationSystem; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.CryptoServiceConfig; import com.github.ambry.config.KMSConfig; import com.github.ambry.config.RouterConfig; @@ -35,6 +34,7 @@ import com.github.ambry.messageformat.MetadataContentSerDe; import com.github.ambry.notification.NotificationBlobType; import com.github.ambry.protocol.PutRequest; +import com.github.ambry.server.ServerErrorCode; import com.github.ambry.store.StoreKey; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.MockTime; @@ -108,7 +108,9 @@ public class PutManagerTest { private static final int MAX_PORTS_PLAIN_TEXT = 3; private static final int MAX_PORTS_SSL = 3; private static final int CHECKOUT_TIMEOUT_MS = 1000; - private static final String LOCAL_DC = "DC1"; + // here we set local dc to "DC3" because MockClusterMap uses DC3 as default local dc (where special class partition + // has 3 replicas) + private static final String LOCAL_DC = "DC3"; private static final String EXTERNAL_ASSET_TAG = "ExternalAssetTag"; /** diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index 590ebbc206..eea298e21a 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -144,7 +144,6 @@ private MockCluster(MockClusterMap mockClusterMap, Properties serverSslProps, bo * @param recoveryNode The data node. * @param dcName Name of the datacenter. * @return {@link MockCluster} object. - * @throws IOException if an exception happens during cluster creation. */ public static MockCluster createOneNodeRecoveryCluster(MockDataNodeId vcrNode, MockDataNodeId recoveryNode, String dcName) { diff --git a/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java b/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java index 400a0e5e41..7297041b84 100644 --- a/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java +++ b/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java @@ -127,7 +127,7 @@ public static void main(String args[]) { if (fileString == null) { manager = (new StaticClusterAgentsFactory(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(Utils.readStringFromFile(hardwareLayoutPath)), clusterMapConfig), - null))).getClusterMap(); + clusterMapConfig))).getClusterMap(); } else { manager = (new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutPath, partitionLayoutPath)).getClusterMap();