diff --git a/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java b/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java index d96f1cca66..b0bc502c9c 100644 --- a/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java +++ b/ambry-api/src/main/java/com.github.ambry/config/ClusterMapConfig.java @@ -216,6 +216,14 @@ public class ClusterMapConfig { @Config("clustermap.replica.catchup.target") public final int clustermapReplicaCatchupTarget; + /** + * The minimum number of replicas in local datacenter required for a partition to serve PUT request. This is used to + * get writable partitions for PUT operation. Any partition with replica count larger than or equal to this number is + * acceptable to be considered as a candidate. + */ + @Config("clustermap.writable.partition.min.replica.count") + public final int clustermapWritablePartitionMinReplicaCount; + public ClusterMapConfig(VerifiableProperties verifiableProperties) { clusterMapFixedTimeoutDatanodeErrorThreshold = verifiableProperties.getIntInRange("clustermap.fixedtimeout.datanode.error.threshold", 3, 1, 100); @@ -261,5 +269,7 @@ public ClusterMapConfig(VerifiableProperties verifiableProperties) { verifiableProperties.getLongInRange("clustermap.replica.catchup.acceptable.lag.bytes", 0L, 0L, Long.MAX_VALUE); clustermapReplicaCatchupTarget = verifiableProperties.getIntInRange("clustermap.replica.catchup.target", 0, 0, Integer.MAX_VALUE); + clustermapWritablePartitionMinReplicaCount = + verifiableProperties.getIntInRange("clustermap.writable.partition.min.replica.count", 3, 0, Integer.MAX_VALUE); } } diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java index 4f91c5309c..6c34469dd4 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/ClusterMapUtils.java @@ -341,6 +341,7 @@ static boolean areAllReplicasForPartitionUp(PartitionId partition) { * Not thread safe. */ static class PartitionSelectionHelper { + private final int minimumLocalReplicaCount; private Collection allPartitions; private Map>> partitionIdsByClassAndLocalReplicaCount; private Map> partitionIdToLocalReplicas; @@ -349,10 +350,13 @@ static class PartitionSelectionHelper { /** * @param allPartitions the list of all {@link PartitionId}s * @param localDatacenterName the name of the local datacenter. Can be null if datacenter specific replica counts - * are not required. + * @param minimumLocalReplicaCount the minimum number of replicas in local datacenter. This is used when selecting + * writable partitions. */ - PartitionSelectionHelper(Collection allPartitions, String localDatacenterName) { + PartitionSelectionHelper(Collection allPartitions, String localDatacenterName, + int minimumLocalReplicaCount) { this.localDatacenterName = localDatacenterName; + this.minimumLocalReplicaCount = minimumLocalReplicaCount; updatePartitions(allPartitions, localDatacenterName); } @@ -364,6 +368,7 @@ static class PartitionSelectionHelper { */ void updatePartitions(Collection allPartitions, String localDatacenterName) { this.allPartitions = allPartitions; + // todo when new partitions added into clustermap, dynamically update these two maps. partitionIdsByClassAndLocalReplicaCount = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); partitionIdToLocalReplicas = new HashMap<>(); for (PartitionId partition : allPartitions) { @@ -432,7 +437,6 @@ List getWritablePartitions(String partitionClass) { PartitionId getRandomWritablePartition(String partitionClass, List partitionsToExclude) { PartitionId anyWritablePartition = null; List partitionsInClass = getPartitionsInClass(partitionClass, true); - int workingSize = partitionsInClass.size(); while (workingSize > 0) { int randomIndex = ThreadLocalRandom.current().nextInt(workingSize); @@ -487,20 +491,23 @@ private boolean areAllLocalReplicasForPartitionUp(PartitionId partitionId) { * Returns the partitions belonging to the {@code partitionClass}. Returns all partitions if {@code partitionClass} * is {@code null}. * @param partitionClass the class of the partitions desired. - * @param highestReplicaCountOnly if {@code true}, returns only the partitions with the highest number of replicas - * in the local datacenter. + * @param minimumReplicaCountRequired if {@code true}, returns only the partitions with the number of replicas in + * local datacenter that is larger than or equal to minimum required count. * @return the partitions belonging to the {@code partitionClass}. Returns all partitions if {@code partitionClass} * is {@code null}. */ - private List getPartitionsInClass(String partitionClass, boolean highestReplicaCountOnly) { + private List getPartitionsInClass(String partitionClass, boolean minimumReplicaCountRequired) { List toReturn = new ArrayList<>(); if (partitionClass == null) { toReturn.addAll(allPartitions); } else if (partitionIdsByClassAndLocalReplicaCount.containsKey(partitionClass)) { SortedMap> partitionsByReplicaCount = partitionIdsByClassAndLocalReplicaCount.get(partitionClass); - if (highestReplicaCountOnly) { - toReturn.addAll(partitionsByReplicaCount.get(partitionsByReplicaCount.lastKey())); + if (minimumReplicaCountRequired) { + // get partitions with replica count >= min replica count specified in ClusterMapConfig + for (List partitionIds : partitionsByReplicaCount.tailMap(minimumLocalReplicaCount).values()) { + toReturn.addAll(partitionIds); + } } else { for (List partitionIds : partitionIdsByClassAndLocalReplicaCount.get(partitionClass).values()) { toReturn.addAll(partitionIds); diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java index 9c8b8156be..10c6d26e5f 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixBootstrapUpgradeUtil.java @@ -329,7 +329,7 @@ private HelixBootstrapUpgradeUtil(String hardwareLayoutPath, String partitionLay } else { staticClusterMap = (new StaticClusterAgentsFactory(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(Utils.readStringFromFile(hardwareLayoutPath)), clusterMapConfig), - null))).getClusterMap(); + clusterMapConfig))).getClusterMap(); } String clusterNameInStaticClusterMap = staticClusterMap.partitionLayout.getClusterName(); clusterName = clusterNamePrefix + clusterNameInStaticClusterMap; diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java index 446367918a..dd39bdfa6e 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/HelixClusterManager.java @@ -232,7 +232,8 @@ public HelixClusterManager(ClusterMapConfig clusterMapConfig, String instanceNam } localDatacenterId = dcToDcZkInfo.get(clusterMapConfig.clusterMapDatacenterName).dcZkInfo.getDcId(); partitionSelectionHelper = - new PartitionSelectionHelper(partitionMap.values(), clusterMapConfig.clusterMapDatacenterName); + new PartitionSelectionHelper(partitionMap.values(), clusterMapConfig.clusterMapDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } /** diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java index 786340dfeb..8b8424dc57 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/PartitionLayout.java @@ -13,6 +13,7 @@ */ package com.github.ambry.clustermap; +import com.github.ambry.config.ClusterMapConfig; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -52,16 +53,16 @@ class PartitionLayout { * Create a PartitionLayout * @param hardwareLayout the {@link HardwareLayout} to use. * @param jsonObject the {@link JSONObject} that represents the partition layout - * @param localDatacenterName the name of the local datacenter. Can be {@code null}. + * @param clusterMapConfig the {@link ClusterMapConfig} to use. * @throws JSONException */ - public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, String localDatacenterName) + public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, ClusterMapConfig clusterMapConfig) throws JSONException { if (logger.isTraceEnabled()) { logger.trace("PartitionLayout " + hardwareLayout + ", " + jsonObject.toString()); } this.hardwareLayout = hardwareLayout; - this.localDatacenterName = localDatacenterName; + this.localDatacenterName = clusterMapConfig.clusterMapDatacenterName; this.clusterName = jsonObject.getString("clusterName"); this.version = jsonObject.getLong("version"); this.partitionMap = new HashMap<>(); @@ -70,26 +71,28 @@ public PartitionLayout(HardwareLayout hardwareLayout, JSONObject jsonObject, Str } validate(); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } /** * Constructor for initial PartitionLayout * @param hardwareLayout the {@link JSONObject} that represents the partition layout - * @param localDatacenterName the name of the local datacenter. Can be {@code null}. + * @param clusterMapConfig the {@link ClusterMapConfig} to use. */ - public PartitionLayout(HardwareLayout hardwareLayout, String localDatacenterName) { + public PartitionLayout(HardwareLayout hardwareLayout, ClusterMapConfig clusterMapConfig) { if (logger.isTraceEnabled()) { logger.trace("PartitionLayout " + hardwareLayout); } this.hardwareLayout = hardwareLayout; - this.localDatacenterName = localDatacenterName; + this.localDatacenterName = clusterMapConfig.clusterMapDatacenterName; this.clusterName = hardwareLayout.getClusterName(); this.version = 1; this.maxPartitionId = MinPartitionId; this.partitionMap = new HashMap<>(); validate(); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitionMap.values(), localDatacenterName, + clusterMapConfig.clustermapWritablePartitionMinReplicaCount); } public HardwareLayout getHardwareLayout() { diff --git a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java index 5799270aa7..c403ef9228 100644 --- a/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java +++ b/ambry-clustermap/src/main/java/com.github.ambry.clustermap/StaticClusterAgentsFactory.java @@ -52,7 +52,7 @@ public StaticClusterAgentsFactory(ClusterMapConfig clusterMapConfig, String hard String partitionLayoutFilePath) throws JSONException, IOException { this(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(readStringFromFile(hardwareLayoutFilePath)), clusterMapConfig), - new JSONObject(readStringFromFile(partitionLayoutFilePath)), clusterMapConfig.clusterMapDatacenterName)); + new JSONObject(readStringFromFile(partitionLayoutFilePath)), clusterMapConfig)); } /** diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java index 3c2a674420..29f461dfad 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/ClusterMapUtilsTest.java @@ -60,10 +60,12 @@ public void partitionSelectionHelperTest() { // 2 partitions with 3 replicas in two datacenters "DC1" and "DC2" (class "max-replicas-all-sites") // 2 partitions with 3 replicas in "DC1" and 1 replica in "DC2" (class "max-local-one-remote") // 2 partitions with 3 replicas in "DC2" and 1 replica in "DC1" (class "max-local-one-remote") + // minimum number of replicas required for choosing writable partition is 3. final String dc1 = "DC1"; final String dc2 = "DC2"; final String maxReplicasAllSites = "max-replicas-all-sites"; final String maxLocalOneRemote = "max-local-one-remote"; + final int minimumLocalReplicaCount = 3; MockDataNodeId dc1Dn1 = getDataNodeId("dc1dn1", dc1); MockDataNodeId dc1Dn2 = getDataNodeId("dc1dn2", dc1); @@ -86,7 +88,7 @@ public void partitionSelectionHelperTest() { Collection allPartitionIdsMain = Collections.unmodifiableSet( new HashSet<>(Arrays.asList(everywhere1, everywhere2, majorDc11, majorDc12, majorDc21, majorDc22))); ClusterMapUtils.PartitionSelectionHelper psh = - new ClusterMapUtils.PartitionSelectionHelper(allPartitionIdsMain, null); + new ClusterMapUtils.PartitionSelectionHelper(allPartitionIdsMain, null, minimumLocalReplicaCount); String[] dcsToTry = {null, "", dc1, dc2}; for (String dc : dcsToTry) { @@ -143,14 +145,51 @@ public void partitionSelectionHelperTest() { // expected. Nothing to do. } verifyWritablePartitionsReturned(psh, allPartitionIds, maxReplicasAllSites, everywhere1, everywhere2, - maxLocalOneRemote, expectedWritableForMaxLocalOneRemote); + maxLocalOneRemote, expectedWritableForMaxLocalOneRemote, dc); if (candidate1 != null && candidate2 != null) { verifyWritablePartitionsReturned(psh, allPartitionIds, maxLocalOneRemote, candidate1, candidate2, - maxReplicasAllSites, new HashSet<>(Arrays.asList(everywhere1, everywhere2))); + maxReplicasAllSites, new HashSet<>(Arrays.asList(everywhere1, everywhere2)), dc); } } } + /** + * Test partition with different number of replicas in local datacenter. + */ + @Test + public void partitionWithDifferentReplicaCntTest() { + // set up partitions in local dc: + // partition1 has 2 replicas; partition2 has 3 replicas; partition3 has 4 replicas + final String dc1 = "DC1"; + final String partitionClass = "default-partition-class"; + List dataNodeIdList = new ArrayList<>(); + for (int i = 1; i <= 4; ++i) { + dataNodeIdList.add(getDataNodeId("node" + i, dc1)); + } + MockPartitionId partition1 = new MockPartitionId(1, partitionClass, dataNodeIdList.subList(0, 2), 0); + MockPartitionId partition2 = new MockPartitionId(2, partitionClass, dataNodeIdList.subList(0, 3), 0); + MockPartitionId partition3 = new MockPartitionId(3, partitionClass, dataNodeIdList.subList(0, 4), 0); + List allPartitions = Arrays.asList(partition1, partition2, partition3); + int minimumLocalReplicaCount = 3; + ClusterMapUtils.PartitionSelectionHelper psh = + new ClusterMapUtils.PartitionSelectionHelper(allPartitions, dc1, minimumLocalReplicaCount); + // verify get all partitions return correct result + assertEquals("Returned partitions are not expected", allPartitions, psh.getPartitions(null)); + // verify get writable partitions return partition2 and partition3 only + assertEquals("Returned writable partitions are not expected", Arrays.asList(partition2, partition3), + psh.getWritablePartitions(partitionClass)); + assertNotSame("Get random writable partition shouldn't return partition1", partition1, + psh.getRandomWritablePartition(partitionClass, null)); + + // create another partition selection helper with minimumLocalReplicaCount = 4 + minimumLocalReplicaCount = 4; + psh = new ClusterMapUtils.PartitionSelectionHelper(allPartitions, dc1, minimumLocalReplicaCount); + assertEquals("Returned writable partitions are not expected", Arrays.asList(partition3), + psh.getWritablePartitions(partitionClass)); + assertEquals("Get random writable partition should return partition3 only", partition3, + psh.getRandomWritablePartition(partitionClass, null)); + } + /** * @param hostname the host name of the {@link MockDataNodeId}. * @param dc the name of the dc of the {@link MockDataNodeId}. @@ -230,13 +269,19 @@ private void checkCaseInsensitivityForPartitionSelectionHelper(ClusterMapUtils.P * @param classBeingTested the partition class being tested * @param expectedReturnForClassBeingTested the list of partitions that can expected to be returned for * {@code classBeingTested}. + * @param localDc the local dc name. */ private void verifyGetWritablePartition(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, - Set expectedReturnForClassBeingTested) { + Set expectedReturnForClassBeingTested, String localDc) { assertCollectionEquals("Partitions returned not as expected", allPartitionIds, psh.getWritablePartitions(null)); - assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassBeingTested, - psh.getWritablePartitions(classBeingTested)); + if (localDc == null || localDc.isEmpty()) { + assertCollectionEquals("Partitions returned not as expected", Collections.emptyList(), + psh.getWritablePartitions(classBeingTested)); + } else { + assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassBeingTested, + psh.getWritablePartitions(classBeingTested)); + } } /** @@ -247,14 +292,19 @@ private void verifyGetWritablePartition(ClusterMapUtils.PartitionSelectionHelper * @param classBeingTested the partition class being tested * @param expectedReturnForClassBeingTested the list of partitions that can expected to be returned for * {@code classBeingTested}. + * @param localDc the local dc name. */ private void verifyGetRandomWritablePartition(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, - Set expectedReturnForClassBeingTested) { + Set expectedReturnForClassBeingTested, String localDc) { assertInCollection("Random partition returned not as expected", allPartitionIds, psh.getRandomWritablePartition(null, null)); - assertInCollection("Random partition returned not as expected", expectedReturnForClassBeingTested, - psh.getRandomWritablePartition(classBeingTested, null)); + if (localDc == null || localDc.isEmpty()) { + assertNull("Partitions returned not as expected", psh.getRandomWritablePartition(classBeingTested, null)); + } else { + assertInCollection("Random partition returned not as expected", expectedReturnForClassBeingTested, + psh.getRandomWritablePartition(classBeingTested, null)); + } } /** @@ -269,37 +319,44 @@ private void verifyGetRandomWritablePartition(ClusterMapUtils.PartitionSelection * {@code classBeingTested} aren't affected). * @param expectedReturnForClassNotBeingTested the list of partitions that can expected to be returned for * {@code classsNotBeingTested}. + * @param localDc the local dc name. */ private void verifyWritablePartitionsReturned(ClusterMapUtils.PartitionSelectionHelper psh, Set allPartitionIds, String classBeingTested, MockPartitionId testedPart1, MockPartitionId testedPart2, String classsNotBeingTested, - Set expectedReturnForClassNotBeingTested) { + Set expectedReturnForClassNotBeingTested, String localDc) { Set expectedReturnForClassBeingTested = new HashSet<>(Arrays.asList(testedPart1, testedPart2)); // no problematic scenarios - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); //verify excluded partitions behavior in getRandomPartition assertNull(psh.getRandomWritablePartition(null, new ArrayList<>(allPartitionIds))); - checkCaseInsensitivityForPartitionSelectionHelper(psh, false, classBeingTested, expectedReturnForClassBeingTested); + if (localDc != null && !localDc.isEmpty()) { + checkCaseInsensitivityForPartitionSelectionHelper(psh, false, classBeingTested, + expectedReturnForClassBeingTested); + } // one replica of one partition of "classBeingTested" down ((MockReplicaId) testedPart1.getReplicaIds().get(0)).markReplicaDownStatus(true); allPartitionIds.remove(testedPart1); expectedReturnForClassBeingTested.remove(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); //for getRandomWritablePartition one replica being down doesnt change anything unless its a local replica expectedReturnForClassBeingTested.add(testedPart1); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); // one replica of other partition of "classBeingTested" down too ((MockReplicaId) testedPart2.getReplicaIds().get(0)).markReplicaDownStatus(true); allPartitionIds.remove(testedPart2); // if both have a replica down, then even though both are unhealthy, they are both returned. expectedReturnForClassBeingTested.add(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); if (expectedReturnForClassNotBeingTested != null) { assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassNotBeingTested, @@ -315,15 +372,16 @@ private void verifyWritablePartitionsReturned(ClusterMapUtils.PartitionSelection ((MockReplicaId) testedPart1.getReplicaIds().get(0)).setSealedState(true); allPartitionIds.remove(testedPart1); expectedReturnForClassBeingTested.remove(testedPart1); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, + localDc); // all READ_ONLY ((MockReplicaId) testedPart2.getReplicaIds().get(0)).setSealedState(true); allPartitionIds.remove(testedPart2); expectedReturnForClassBeingTested.remove(testedPart2); - verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested); - verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, null); + verifyGetWritablePartition(psh, allPartitionIds, classBeingTested, expectedReturnForClassBeingTested, localDc); + verifyGetRandomWritablePartition(psh, allPartitionIds, classBeingTested, null, localDc); if (expectedReturnForClassNotBeingTested != null) { assertCollectionEquals("Partitions returned not as expected", expectedReturnForClassNotBeingTested, diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java index 661e3c1e85..ec74d8b710 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/MockClusterMap.java @@ -156,7 +156,14 @@ public MockClusterMap(boolean enableSSLPorts, int numNodes, int numMountPointsPe } else { specialPartition = null; } - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + // find a partition belong to DEFAULT_PARTITION_CLASS + PartitionId defaultPartition = partitions.values() + .stream() + .filter(p -> p.getPartitionClass().equals(DEFAULT_PARTITION_CLASS)) + .findFirst() + .get(); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(defaultPartition.getReplicaIds().size(), 3)); } /** @@ -175,7 +182,8 @@ public MockClusterMap(boolean enableSSLPorts, List datanodes, in partitions = new HashMap<>(); partitionIdList.forEach(p -> partitions.put(Long.valueOf(p.toPathString()), p)); this.localDatacenterName = localDatacenterName; - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(partitionIdList.get(0).getReplicaIds().size(), 3)); Set dcNames = new HashSet<>(); datanodes.forEach(node -> dcNames.add(node.getDatacenterName())); dataCentersInClusterMap.addAll(dcNames); @@ -217,7 +225,8 @@ private MockClusterMap(MockDataNodeId recoveryNode, MockDataNodeId vcrNode, Stri recoveryReplica.setPeerReplicas(Collections.singletonList(vcrReplica)); partitions.put(mockPartitionId.partition, mockPartitionId); - partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName); + partitionSelectionHelper = new ClusterMapUtils.PartitionSelectionHelper(partitions.values(), localDatacenterName, + Math.min(mockPartitionId.getReplicaIds().size(), 3)); specialPartition = null; } diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java index 361e04a0f3..ffc0545404 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/StaticClusterManagerTest.java @@ -13,9 +13,9 @@ */ package com.github.ambry.clustermap; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.server.ServerErrorCode; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.Utils; import java.io.DataInputStream; @@ -46,9 +46,18 @@ * Tests {@link StaticClusterManager} class. */ public class StaticClusterManagerTest { + private final ClusterMapConfig clusterMapConfig; @Rule public org.junit.rules.TemporaryFolder folder = new TemporaryFolder(); + public StaticClusterManagerTest() { + Properties props = new Properties(); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.cluster.name", "cluster"); + props.setProperty("clustermap.datacenter.name", "dc1"); + clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + } + // Useful for understanding partition layout affect on free capacity across all hardware. public String freeCapacityDump(StaticClusterManager clusterMapManager, HardwareLayout hardwareLayout) { StringBuilder sb = new StringBuilder(); @@ -159,7 +168,7 @@ public void findDatacenter() { @Test public void addNewPartition() { TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -181,7 +190,7 @@ public void nonRackAwareAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -223,7 +232,7 @@ public void rackAwareAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha", true); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -259,7 +268,7 @@ public void rackAwareOverAllocationTest() { long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha", true); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -284,7 +293,7 @@ public void rackAwareOverAllocationTest() { @Test public void capacities() { TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = (new StaticClusterAgentsFactory(getDummyConfig(), partitionLayout)).getClusterMap(); @@ -347,10 +356,9 @@ public void persistAndReadBack() throws Exception { String hardwareLayoutDe = tmpDir + "/hardwareLayoutDe.json"; String partitionLayoutDe = tmpDir + "/partitionLayoutDe.json"; - StaticClusterManager clusterMapManagerSer = getTestClusterMap(); - clusterMapManagerSer.persist(hardwareLayoutSer, partitionLayoutSer); - ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + StaticClusterManager clusterMapManagerSer = getTestClusterMap(clusterMapConfig); + clusterMapManagerSer.persist(hardwareLayoutSer, partitionLayoutSer); StaticClusterManager clusterMapManagerDe = (new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutSer, partitionLayoutSer)).getClusterMap(); @@ -481,11 +489,6 @@ public void getPartitionsTest() throws IOException, JSONException { */ @Test public void onReplicaEventTest() { - Properties props = new Properties(); - props.setProperty("clustermap.host.name", "localhost"); - props.setProperty("clustermap.cluster.name", "cluster"); - props.setProperty("clustermap.datacenter.name", "dc1"); - ClusterMapConfig clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); TestPartitionLayout testPartitionLayout = new TestPartitionLayout(testHardwareLayout, null); ClusterMap clusterMapManager = diff --git a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java index 0121a9276d..d0c48839e7 100644 --- a/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java +++ b/ambry-clustermap/src/test/java/com.github.ambry.clustermap/TestUtils.java @@ -15,9 +15,9 @@ import com.codahale.metrics.MetricRegistry; import com.github.ambry.commons.ResponseHandler; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.ClusterMapConfig; import com.github.ambry.config.VerifiableProperties; +import com.github.ambry.server.ServerErrorCode; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -792,6 +792,7 @@ public static class TestPartitionLayout { protected PartitionLayout partitionLayout; private JSONArray jsonPartitions; private long version; + private ClusterMapConfig clusterMapConfig; protected JSONObject makeJsonPartitionLayout() throws JSONException { return makeJsonPartitionLayout(DEFAULT_PARTITION_CLASS); @@ -830,9 +831,14 @@ public TestPartitionLayout(TestHardwareLayout testHardwareLayout, int partitionC this.replicaCountPerDc = replicaCountPerDc; this.testHardwareLayout = testHardwareLayout; - this.partitionLayout = - new PartitionLayout(testHardwareLayout.getHardwareLayout(), makeJsonPartitionLayout(), localDc); this.dcCount = testHardwareLayout.getHardwareLayout().getDatacenterCount(); + Properties props = new Properties(); + props.setProperty("clustermap.host.name", "localhost"); + props.setProperty("clustermap.cluster.name", "cluster"); + props.setProperty("clustermap.datacenter.name", localDc == null ? "" : localDc); + clusterMapConfig = new ClusterMapConfig(new VerifiableProperties(props)); + this.partitionLayout = + new PartitionLayout(testHardwareLayout.getHardwareLayout(), makeJsonPartitionLayout(), clusterMapConfig); } public TestPartitionLayout(TestHardwareLayout testHardwareLayout, String localDc) throws JSONException { @@ -844,7 +850,7 @@ void addNewPartitions(int i, String partitionClass, PartitionState partitionStat throws JSONException { this.partitionCount += i; this.partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), - updateJsonPartitionLayout(partitionClass, partitionState), localDc); + updateJsonPartitionLayout(partitionClass, partitionState), clusterMapConfig); } public PartitionLayout getPartitionLayout() { @@ -910,10 +916,10 @@ public TestPartitionLayoutWithDuplicateReplicas(TestHardwareLayout testHardwareL } public static StaticClusterManager getTestClusterMap(int partitionCount, int replicaCountPerDatacenter, - long replicaCapacityInBytes) throws JSONException { + long replicaCapacityInBytes, ClusterMapConfig clusterMapConfig) throws JSONException { TestUtils.TestHardwareLayout testHardwareLayout = new TestHardwareLayout("Alpha"); - PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), null); + PartitionLayout partitionLayout = new PartitionLayout(testHardwareLayout.getHardwareLayout(), clusterMapConfig); StaticClusterManager clusterMapManager = new StaticClusterManager(partitionLayout, null, new MetricRegistry()); List allocatedPartitions; @@ -925,12 +931,12 @@ public static StaticClusterManager getTestClusterMap(int partitionCount, int rep return clusterMapManager; } - public static StaticClusterManager getTestClusterMap() throws JSONException { + public static StaticClusterManager getTestClusterMap(ClusterMapConfig clusterMapConfig) throws JSONException { int numPartitions = 5; int replicaCountPerDatacenter = 2; long replicaCapacityInBytes = 100 * 1024 * 1024 * 1024L; - return getTestClusterMap(numPartitions, replicaCountPerDatacenter, replicaCapacityInBytes); + return getTestClusterMap(numPartitions, replicaCountPerDatacenter, replicaCapacityInBytes, clusterMapConfig); } /** diff --git a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java index a7caa71b36..8da290ad01 100644 --- a/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java +++ b/ambry-router/src/test/java/com.github.ambry.router/PutManagerTest.java @@ -23,7 +23,6 @@ import com.github.ambry.commons.BlobIdFactory; import com.github.ambry.commons.ByteBufferReadableStreamChannel; import com.github.ambry.commons.LoggingNotificationSystem; -import com.github.ambry.server.ServerErrorCode; import com.github.ambry.config.CryptoServiceConfig; import com.github.ambry.config.KMSConfig; import com.github.ambry.config.RouterConfig; @@ -35,6 +34,7 @@ import com.github.ambry.messageformat.MetadataContentSerDe; import com.github.ambry.notification.NotificationBlobType; import com.github.ambry.protocol.PutRequest; +import com.github.ambry.server.ServerErrorCode; import com.github.ambry.store.StoreKey; import com.github.ambry.utils.ByteBufferInputStream; import com.github.ambry.utils.MockTime; @@ -108,7 +108,9 @@ public class PutManagerTest { private static final int MAX_PORTS_PLAIN_TEXT = 3; private static final int MAX_PORTS_SSL = 3; private static final int CHECKOUT_TIMEOUT_MS = 1000; - private static final String LOCAL_DC = "DC1"; + // here we set local dc to "DC3" because MockClusterMap uses DC3 as default local dc (where special class partition + // has 3 replicas) + private static final String LOCAL_DC = "DC3"; private static final String EXTERNAL_ASSET_TAG = "ExternalAssetTag"; /** diff --git a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java index 590ebbc206..eea298e21a 100644 --- a/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java +++ b/ambry-server/src/integration-test/java/com.github.ambry.server/MockCluster.java @@ -144,7 +144,6 @@ private MockCluster(MockClusterMap mockClusterMap, Properties serverSslProps, bo * @param recoveryNode The data node. * @param dcName Name of the datacenter. * @return {@link MockCluster} object. - * @throws IOException if an exception happens during cluster creation. */ public static MockCluster createOneNodeRecoveryCluster(MockDataNodeId vcrNode, MockDataNodeId recoveryNode, String dcName) { diff --git a/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java b/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java index 400a0e5e41..7297041b84 100644 --- a/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java +++ b/ambry-tools/src/main/java/com.github.ambry/clustermap/PartitionManager.java @@ -127,7 +127,7 @@ public static void main(String args[]) { if (fileString == null) { manager = (new StaticClusterAgentsFactory(clusterMapConfig, new PartitionLayout( new HardwareLayout(new JSONObject(Utils.readStringFromFile(hardwareLayoutPath)), clusterMapConfig), - null))).getClusterMap(); + clusterMapConfig))).getClusterMap(); } else { manager = (new StaticClusterAgentsFactory(clusterMapConfig, hardwareLayoutPath, partitionLayoutPath)).getClusterMap();