diff --git a/conf/broker.conf b/conf/broker.conf index 5c5d8d42817e9..b715c4e515bc8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1414,6 +1414,25 @@ loadBalancerBundleUnloadMinThroughputThreshold=10 # Time to wait for the unloading of a namespace bundle namespaceBundleUnloadingTimeoutMs=60000 +# configuration for AvgShedder, a new shedding and placement strategy +# The low threshold for the difference between the highest and lowest loaded brokers. +loadBalancerAvgShedderLowThreshold = 15 + +# The high threshold for the difference between the highest and lowest loaded brokers. +loadBalancerAvgShedderHighThreshold = 40 + +# The number of times the low threshold is triggered before the bundle is unloaded. +loadBalancerAvgShedderHitCountLowThreshold = 8 + +# The number of times the high threshold is triggered before the bundle is unloaded. +loadBalancerAvgShedderHitCountHighThreshold = 2 + +# In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio. +# For AvgShedder, recommend to set to 0.5, so that it will distribute the load evenly +# between the highest and lowest brokers. +maxUnloadPercentage = 0.2 + + ### --- Load balancer extension --- ### # Option to enable the debug mode for the load balancer logics. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 73bf2316b8287..aba3ad3a669f5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2395,21 +2395,51 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the minimum message that triggers unload." + doc = "The low threshold for the difference between the highest and lowest loaded brokers." + ) + private int loadBalancerAvgShedderLowThreshold = 15; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The high threshold for the difference between the highest and lowest loaded brokers." + ) + private int loadBalancerAvgShedderHighThreshold = 40; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The number of times the low threshold is triggered before the bundle is unloaded." + ) + private int loadBalancerAvgShedderHitCountLowThreshold = 8; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "The number of times the high threshold is triggered before the bundle is unloaded." + ) + private int loadBalancerAvgShedderHitCountHighThreshold = 2; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum message that triggers unload." ) private int minUnloadMessage = 1000; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload." + doc = "In the UniformLoadShedder and AvgShedder strategy, the minimum throughput that triggers unload." ) private int minUnloadMessageThroughput = 1 * 1024 * 1024; @FieldContext( dynamic = true, category = CATEGORY_LOAD_BALANCER, - doc = "In the UniformLoadShedder strategy, the maximum unload ratio." + doc = "In the UniformLoadShedder and AvgShedder strategy, the maximum unload ratio." + + "For AvgShedder, recommend to set to 0.5, so that it will distribute the load " + + "evenly between the highest and lowest brokers." ) private double maxUnloadPercentage = 0.2; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java new file mode 100644 index 0000000000000..39ff242fc6c17 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedder.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.hash.Hashing; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Random; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.broker.loadbalance.LoadSheddingStrategy; +import org.apache.pulsar.broker.loadbalance.ModularLoadManagerStrategy; +import org.apache.pulsar.policies.data.loadbalancer.BrokerData; +import org.apache.pulsar.policies.data.loadbalancer.BundleData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; + +@Slf4j +public class AvgShedder implements LoadSheddingStrategy, ModularLoadManagerStrategy { + // map bundle to broker. + private final Map bundleBrokerMap = new HashMap<>(); + // map broker to Scores. scores:0-100 + private final Map brokerScoreMap = new HashMap<>(); + // map broker hit count for high threshold/low threshold + private final Map brokerHitCountForHigh = new HashMap<>(); + private final Map brokerHitCountForLow = new HashMap<>(); + private static final double MB = 1024 * 1024; + + @Override + public Multimap findBundlesForUnloading(LoadData loadData, ServiceConfiguration conf) { + // result returned by shedding, map broker to bundles. + Multimap selectedBundlesCache = ArrayListMultimap.create(); + + // configuration for shedding. + final double minThroughputThreshold = conf.getMinUnloadMessageThroughput(); + final double minMsgThreshold = conf.getMinUnloadMessage(); + final double maxUnloadPercentage = conf.getMaxUnloadPercentage(); + final double lowThreshold = conf.getLoadBalancerAvgShedderLowThreshold(); + final double highThreshold = conf.getLoadBalancerAvgShedderHighThreshold(); + final int hitCountHighThreshold = conf.getLoadBalancerAvgShedderHitCountHighThreshold(); + final int hitCountLowThreshold = conf.getLoadBalancerAvgShedderHitCountLowThreshold(); + if (log.isDebugEnabled()) { + log.debug("highThreshold:{}, lowThreshold:{}, hitCountHighThreshold:{}, hitCountLowThreshold:{}, " + + "minMsgThreshold:{}, minThroughputThreshold:{}", + highThreshold, lowThreshold, hitCountHighThreshold, hitCountLowThreshold, + minMsgThreshold, minThroughputThreshold); + } + + List brokers = calculateScoresAndSort(loadData, conf); + log.info("sorted broker list:{}", brokers); + + // find broker pairs for shedding. + List> pairs = findBrokerPairs(brokers, lowThreshold, highThreshold); + log.info("brokerHitCountForHigh:{}, brokerHitCountForLow:{}", brokerHitCountForHigh, brokerHitCountForLow); + if (pairs.isEmpty()) { + if (log.isDebugEnabled()) { + log.debug("there is no any overload broker, no need to shedding bundles."); + } + brokerHitCountForHigh.clear(); + brokerHitCountForLow.clear(); + return selectedBundlesCache; + } + + // choosing bundles to unload. + for (Pair pair : pairs) { + String overloadedBroker = pair.getRight(); + String underloadedBroker = pair.getLeft(); + + // check hit count for high threshold and low threshold. + if (!(brokerHitCountForHigh.computeIfAbsent(underloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountHighThreshold) + && !(brokerHitCountForHigh.computeIfAbsent(overloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountHighThreshold) + && !(brokerHitCountForLow.computeIfAbsent(underloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountLowThreshold) + && !(brokerHitCountForLow.computeIfAbsent(overloadedBroker, __ -> new MutableInt(0)) + .intValue() >= hitCountLowThreshold)) { + continue; + } + + // if hit, remove entry. + brokerHitCountForHigh.remove(underloadedBroker); + brokerHitCountForHigh.remove(overloadedBroker); + brokerHitCountForLow.remove(underloadedBroker); + brokerHitCountForLow.remove(overloadedBroker); + + // select bundle for unloading. + selectBundleForUnloading(loadData, overloadedBroker, underloadedBroker, minThroughputThreshold, + minMsgThreshold, maxUnloadPercentage, selectedBundlesCache); + } + return selectedBundlesCache; + } + + private void selectBundleForUnloading(LoadData loadData, String overloadedBroker, String underloadedBroker, + double minThroughputThreshold, double minMsgThreshold, + double maxUnloadPercentage, Multimap selectedBundlesCache) { + // calculate how much throughput to unload. + LocalBrokerData minLocalBrokerData = loadData.getBrokerData().get(underloadedBroker).getLocalData(); + LocalBrokerData maxLocalBrokerData = loadData.getBrokerData().get(overloadedBroker).getLocalData(); + + double minMsgRate = minLocalBrokerData.getMsgRateIn() + minLocalBrokerData.getMsgRateOut(); + double maxMsgRate = maxLocalBrokerData.getMsgRateIn() + maxLocalBrokerData.getMsgRateOut(); + + double minThroughput = minLocalBrokerData.getMsgThroughputIn() + minLocalBrokerData.getMsgThroughputOut(); + double maxThroughput = maxLocalBrokerData.getMsgThroughputIn() + maxLocalBrokerData.getMsgThroughputOut(); + + double msgRequiredFromUnloadedBundles = (maxMsgRate - minMsgRate) * maxUnloadPercentage; + double throughputRequiredFromUnloadedBundles = (maxThroughput - minThroughput) * maxUnloadPercentage; + + boolean isMsgRateToOffload; + MutableDouble trafficMarkedToOffload = new MutableDouble(0); + + if (msgRequiredFromUnloadedBundles > minMsgThreshold) { + isMsgRateToOffload = true; + trafficMarkedToOffload.setValue(msgRequiredFromUnloadedBundles); + } else if (throughputRequiredFromUnloadedBundles > minThroughputThreshold) { + isMsgRateToOffload = false; + trafficMarkedToOffload.setValue(throughputRequiredFromUnloadedBundles); + } else { + log.info( + "broker:[{}] is planning to shed bundles to broker:[{}],but the throughput {} MByte/s is " + + "less than minimumThroughputThreshold {} MByte/s, and the msgRate {} rate/s" + + " is also less than minimumMsgRateThreshold {} rate/s, skipping bundle unload.", + overloadedBroker, underloadedBroker, throughputRequiredFromUnloadedBundles / MB, + minThroughputThreshold / MB, msgRequiredFromUnloadedBundles, minMsgThreshold); + return; + } + + if (maxLocalBrokerData.getBundles().size() == 1) { + log.warn("HIGH USAGE WARNING : Sole namespace bundle {} is overloading broker {}. " + + "No Load Shedding will be done on this broker", + maxLocalBrokerData.getBundles().iterator().next(), overloadedBroker); + } else if (maxLocalBrokerData.getBundles().isEmpty()) { + log.warn("Broker {} is overloaded despite having no bundles", overloadedBroker); + } + + // do shedding + log.info( + "broker:[{}] is planning to shed bundles to broker:[{}]. " + + "maxBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "minBroker stat:scores:{}, throughput:{}, msgRate:{}. " + + "isMsgRateToOffload:{}, trafficMarkedToOffload:{}", + overloadedBroker, underloadedBroker, brokerScoreMap.get(overloadedBroker), maxThroughput, + maxMsgRate, brokerScoreMap.get(underloadedBroker), minThroughput, minMsgRate, + isMsgRateToOffload, trafficMarkedToOffload); + + loadData.getBundleDataForLoadShedding().entrySet().stream().filter(e -> + maxLocalBrokerData.getBundles().contains(e.getKey()) + ).filter(e -> + !loadData.getRecentlyUnloadedBundles().containsKey(e.getKey()) + ).map((e) -> { + BundleData bundleData = e.getValue(); + TimeAverageMessageData shortTermData = bundleData.getShortTermData(); + double traffic = isMsgRateToOffload + ? shortTermData.getMsgRateIn() + shortTermData.getMsgRateOut() + : shortTermData.getMsgThroughputIn() + shortTermData.getMsgThroughputOut(); + return Pair.of(e, traffic); + }).sorted((e1, e2) -> + Double.compare(e2.getRight(), e1.getRight()) + ).forEach(e -> { + Map.Entry bundle = e.getLeft(); + double traffic = e.getRight(); + if (traffic > 0 && traffic <= trafficMarkedToOffload.getValue()) { + selectedBundlesCache.put(overloadedBroker, bundle.getKey()); + bundleBrokerMap.put(bundle.getValue(), underloadedBroker); + trafficMarkedToOffload.add(-traffic); + if (log.isDebugEnabled()) { + log.debug("Found bundle to unload:{}, isMsgRateToOffload:{}, traffic:{}", + bundle, isMsgRateToOffload, traffic); + } + } + }); + } + + @Override + public void onActiveBrokersChange(Set activeBrokers) { + LoadSheddingStrategy.super.onActiveBrokersChange(activeBrokers); + } + + private List calculateScoresAndSort(LoadData loadData, ServiceConfiguration conf) { + brokerScoreMap.clear(); + + // calculate scores of brokers. + for (Map.Entry entry : loadData.getBrokerData().entrySet()) { + LocalBrokerData localBrokerData = entry.getValue().getLocalData(); + String broker = entry.getKey(); + Double score = calculateScores(localBrokerData, conf); + brokerScoreMap.put(broker, score); + if (log.isDebugEnabled()) { + log.info("broker:{}, scores:{}, throughput:{}, messageRate:{}", broker, score, + localBrokerData.getMsgThroughputIn() + localBrokerData.getMsgThroughputOut(), + localBrokerData.getMsgRateIn() + localBrokerData.getMsgRateOut()); + } + } + + // sort brokers by scores. + return brokerScoreMap.entrySet().stream().sorted((o1, o2) -> (int) (o1.getValue() - o2.getValue())) + .map(Map.Entry::getKey).toList(); + } + + private Double calculateScores(LocalBrokerData localBrokerData, final ServiceConfiguration conf) { + return localBrokerData.getMaxResourceUsageWithWeight( + conf.getLoadBalancerCPUResourceWeight(), + conf.getLoadBalancerDirectMemoryResourceWeight(), + conf.getLoadBalancerBandwidthInResourceWeight(), + conf.getLoadBalancerBandwidthOutResourceWeight()) * 100; + } + + private List> findBrokerPairs(List brokers, + double lowThreshold, double highThreshold) { + List> pairs = new LinkedList<>(); + int i = 0, j = brokers.size() - 1; + while (i <= j) { + String maxBroker = brokers.get(j); + String minBroker = brokers.get(i); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < lowThreshold) { + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + + brokerHitCountForLow.remove(maxBroker); + brokerHitCountForLow.remove(minBroker); + } else { + pairs.add(Pair.of(minBroker, maxBroker)); + if (brokerScoreMap.get(maxBroker) - brokerScoreMap.get(minBroker) < highThreshold) { + brokerHitCountForLow.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); + + brokerHitCountForHigh.remove(maxBroker); + brokerHitCountForHigh.remove(minBroker); + } else { + brokerHitCountForLow.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForLow.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); + + brokerHitCountForHigh.computeIfAbsent(minBroker, k -> new MutableInt(0)).increment(); + brokerHitCountForHigh.computeIfAbsent(maxBroker, k -> new MutableInt(0)).increment(); + } + } + i++; + j--; + } + return pairs; + } + + @Override + public Optional selectBroker(Set candidates, BundleData bundleToAssign, LoadData loadData, + ServiceConfiguration conf) { + final var brokerToUnload = bundleBrokerMap.getOrDefault(bundleToAssign, null); + if (brokerToUnload == null || !candidates.contains(bundleBrokerMap.get(bundleToAssign))) { + // cluster initializing or broker is shutdown + if (log.isDebugEnabled()) { + if (!bundleBrokerMap.containsKey(bundleToAssign)) { + log.debug("cluster is initializing"); + } else { + log.debug("expected broker:{} is shutdown, candidates:{}", bundleBrokerMap.get(bundleToAssign), + candidates); + } + } + String broker = getExpectedBroker(candidates, bundleToAssign); + bundleBrokerMap.put(bundleToAssign, broker); + return Optional.of(broker); + } else { + return Optional.of(brokerToUnload); + } + } + + private static String getExpectedBroker(Collection brokers, BundleData bundle) { + List sortedBrokers = new ArrayList<>(brokers); + Collections.sort(sortedBrokers); + + try { + // use random number as input of hashing function to avoid special case that, + // if there is 4 brokers running in the cluster,and add broker5,and shutdown broker3, + // then all bundles belonging to broker3 will be loaded on the same broker. + final long hashcode = Hashing.crc32().hashString(String.valueOf(new Random().nextInt()), + StandardCharsets.UTF_8).padToLong(); + final int index = (int) (Math.abs(hashcode) % sortedBrokers.size()); + if (log.isDebugEnabled()) { + log.debug("Assignment details: brokers={}, bundle={}, hashcode={}, index={}", + sortedBrokers, bundle, hashcode, index); + } + return sortedBrokers.get(index); + } catch (Throwable e) { + // theoretically this logic branch should not be executed + log.error("Bundle format of {} is invalid", bundle, e); + return sortedBrokers.get(Math.abs(bundle.hashCode()) % sortedBrokers.size()); + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 08c9483e87063..8f095b7d84df8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -270,7 +270,21 @@ public void initialize(final PulsarService pulsar) { () -> LoadManagerShared.refreshBrokerToFailureDomainMap(pulsar, brokerToFailureDomainMap)); }); - loadSheddingStrategy = createLoadSheddingStrategy(); + if (placementStrategy instanceof LoadSheddingStrategy) { + // if the placement strategy is also a load shedding strategy + // we need to check two strategies are the same + if (!conf.getLoadBalancerLoadSheddingStrategy().equals( + conf.getLoadBalancerPlacementStrategy())) { + throw new IllegalArgumentException("The load shedding strategy: " + + conf.getLoadBalancerLoadSheddingStrategy() + + " can't work with the placement strategy: " + + conf.getLoadBalancerPlacementStrategy()); + } + // bind the load shedding strategy and the placement strategy + loadSheddingStrategy = (LoadSheddingStrategy) placementStrategy; + } else { + loadSheddingStrategy = createLoadSheddingStrategy(); + } } public void handleDataNotification(Notification t) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java index f5bd0f46a5ec1..53ddde8856c63 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerStrategyTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.loadbalance; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; import java.util.Arrays; @@ -34,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.impl.AvgShedder; import org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate; import org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight; import org.apache.pulsar.broker.loadbalance.impl.RoundRobinBrokerSelector; @@ -47,6 +49,47 @@ @Test(groups = "broker") public class ModularLoadManagerStrategyTest { + public void testAvgShedderWithPreassignedBroker() throws Exception { + ModularLoadManagerStrategy strategy = new AvgShedder(); + Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap"); + field.setAccessible(true); + Map bundleBrokerMap = (Map) field.get(strategy); + BundleData bundleData = new BundleData(); + // assign bundle to broker1 in bundleBrokerMap. + bundleBrokerMap.put(bundleData, "1"); + assertEquals(strategy.selectBroker(Set.of("1", "2", "3"), bundleData, null, null), Optional.of("1")); + assertEquals(bundleBrokerMap.get(bundleData), "1"); + + // remove broker1 in candidates, only broker2 is candidate. + assertEquals(strategy.selectBroker(Set.of("2"), bundleData, null, null), Optional.of("2")); + assertEquals(bundleBrokerMap.get(bundleData), "2"); + } + + public void testAvgShedderWithoutPreassignedBroker() throws Exception { + ModularLoadManagerStrategy strategy = new AvgShedder(); + Field field = AvgShedder.class.getDeclaredField("bundleBrokerMap"); + field.setAccessible(true); + Map bundleBrokerMap = (Map) field.get(strategy); + BundleData bundleData = new BundleData(); + Set candidates = new HashSet<>(); + candidates.add("1"); + candidates.add("2"); + candidates.add("3"); + + // select broker from candidates randomly. + Optional selectedBroker = strategy.selectBroker(candidates, bundleData, null, null); + assertTrue(selectedBroker.isPresent()); + assertTrue(candidates.contains(selectedBroker.get())); + assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get()); + + // remove original broker in candidates + candidates.remove(selectedBroker.get()); + selectedBroker = strategy.selectBroker(candidates, bundleData, null, null); + assertTrue(selectedBroker.isPresent()); + assertTrue(candidates.contains(selectedBroker.get())); + assertEquals(bundleBrokerMap.get(bundleData), selectedBroker.get()); + } + // Test that least long term message rate works correctly. public void testLeastLongTermMessageRate() { BundleData bundleData = new BundleData(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java new file mode 100644 index 0000000000000..215e3d766a927 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/impl/AvgShedderTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance.impl; + +import com.google.common.collect.Multimap; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.loadbalance.LoadData; +import org.apache.pulsar.policies.data.loadbalancer.BrokerData; +import org.apache.pulsar.policies.data.loadbalancer.BundleData; +import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageBrokerData; +import org.apache.pulsar.policies.data.loadbalancer.TimeAverageMessageData; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@Test(groups = "broker") +public class AvgShedderTest { + private AvgShedder avgShedder; + private final ServiceConfiguration conf; + + public AvgShedderTest() { + conf = new ServiceConfiguration(); + } + + @BeforeMethod + public void setup() { + avgShedder = new AvgShedder(); + } + + private BrokerData initBrokerData() { + LocalBrokerData localBrokerData = new LocalBrokerData(); + localBrokerData.setCpu(new ResourceUsage()); + localBrokerData.setMemory(new ResourceUsage()); + localBrokerData.setBandwidthIn(new ResourceUsage()); + localBrokerData.setBandwidthOut(new ResourceUsage()); + BrokerData brokerData = new BrokerData(localBrokerData); + TimeAverageBrokerData timeAverageBrokerData = new TimeAverageBrokerData(); + brokerData.setTimeAverageData(timeAverageBrokerData); + return brokerData; + } + + @Test + public void testHitHighThreshold() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the high threshold to 40% and hit count high threshold to 2 + int hitCountForHighThreshold = 2; + conf.setLoadBalancerAvgShedderHighThreshold(40); + conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold); + brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // broker3 is in the middle + brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100)); + brokerData3.getLocalData().setMsgRateIn(5000); + brokerData3.getLocalData().setMsgRateOut(5000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle-" + i); + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle-" + i, bundle); + } + + // do shedding for the first time, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + + // do shedding for the second time, expect to shed 10 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 10); + + // assert that all the bundles are shed from broker1 + for (String broker : bundlesToUnload.keys()) { + assertEquals(broker, "broker1"); + } + // assert that all the bundles are shed to broker2 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } + } + + @Test + public void testHitLowThreshold() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the low threshold to 20% and hit count low threshold to 6 + int hitCountForLowThreshold = 6; + conf.setLoadBalancerAvgShedderLowThreshold(20); + conf.setLoadBalancerAvgShedderHitCountLowThreshold(hitCountForLowThreshold); + brokerData1.getLocalData().setCpu(new ResourceUsage(60, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(40, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // broker3 is in the middle + brokerData3.getLocalData().setCpu(new ResourceUsage(50, 100)); + brokerData3.getLocalData().setMsgRateIn(5000); + brokerData3.getLocalData().setMsgRateOut(5000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle-" + i); + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle-" + i, bundle); + } + + // do shedding for (lowCountForHighThreshold - 1) times, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload; + for (int i = 0; i < hitCountForLowThreshold - 1; i++) { + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + } + + // do shedding for the last time, expect to shed 10 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 10); + + // assert that all the bundles are shed from broker1 + for (String broker : bundlesToUnload.keys()) { + assertEquals(broker, "broker1"); + } + // assert that all the bundles are shed to broker2 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } + } + + @Test + public void testSheddingMultiplePairs() { + LoadData loadData = new LoadData(); + BrokerData brokerData1 = initBrokerData(); + BrokerData brokerData2 = initBrokerData(); + BrokerData brokerData3 = initBrokerData(); + BrokerData brokerData4 = initBrokerData(); + loadData.getBrokerData().put("broker1", brokerData1); + loadData.getBrokerData().put("broker2", brokerData2); + loadData.getBrokerData().put("broker3", brokerData3); + loadData.getBrokerData().put("broker4", brokerData4); + // AvgShedder will distribute the load evenly between the highest and lowest brokers + conf.setMaxUnloadPercentage(0.5); + + // Set the high threshold to 40% and hit count high threshold to 2 + int hitCountForHighThreshold = 2; + conf.setLoadBalancerAvgShedderHighThreshold(40); + conf.setLoadBalancerAvgShedderHitCountHighThreshold(hitCountForHighThreshold); + + // pair broker1 and broker2 + brokerData1.getLocalData().setCpu(new ResourceUsage(80, 100)); + brokerData2.getLocalData().setCpu(new ResourceUsage(30, 100)); + brokerData1.getLocalData().setMsgRateIn(10000); + brokerData1.getLocalData().setMsgRateOut(10000); + brokerData2.getLocalData().setMsgRateIn(1000); + brokerData2.getLocalData().setMsgRateOut(1000); + + // pair broker3 and broker4 + brokerData3.getLocalData().setCpu(new ResourceUsage(75, 100)); + brokerData3.getLocalData().setMsgRateIn(10000); + brokerData3.getLocalData().setMsgRateOut(10000); + brokerData4.getLocalData().setCpu(new ResourceUsage(35, 100)); + brokerData4.getLocalData().setMsgRateIn(1000); + brokerData4.getLocalData().setMsgRateOut(1000); + + // expect to shed bundles with message rate(in+out) ((10000+10000)-(1000+1000))/2 = 9000 + // each bundle with 450 msg rate in and 450 msg rate out + // so 9000/(450+450)=10 bundles will be shed + for (int i = 0; i < 11; i++) { + brokerData1.getLocalData().getBundles().add("bundle1-" + i); + brokerData3.getLocalData().getBundles().add("bundle3-" + i); + + BundleData bundle = new BundleData(); + TimeAverageMessageData timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + // as AvgShedder map BundleData to broker, the hashCode of different BundleData should be different + // so we need to set some different fields to make the hashCode different + timeAverageMessageData.setNumSamples(i); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle1-" + i, bundle); + + bundle = new BundleData(); + timeAverageMessageData = new TimeAverageMessageData(); + timeAverageMessageData.setMsgRateIn(450); + timeAverageMessageData.setMsgRateOut(450); + timeAverageMessageData.setNumSamples(i+11); + bundle.setShortTermData(timeAverageMessageData); + loadData.getBundleData().put("bundle3-" + i, bundle); + } + + // do shedding for the first time, expect to shed nothing because hit count is not enough + Multimap bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 0); + + // do shedding for the second time, expect to shed 10*2=20 bundles + bundlesToUnload = avgShedder.findBundlesForUnloading(loadData, conf); + assertEquals(bundlesToUnload.size(), 20); + + // assert that half of the bundles are shed from broker1, and the other half are shed from broker3 + for (String broker : bundlesToUnload.keys()) { + if (broker.equals("broker1")) { + assertEquals(bundlesToUnload.get(broker).size(), 10); + } else if (broker.equals("broker3")) { + assertEquals(bundlesToUnload.get(broker).size(), 10); + } else { + fail(); + } + } + + // assert that all the bundles from broker1 are shed to broker2, and all the bundles from broker3 are shed to broker4 + for (String bundle : bundlesToUnload.values()) { + BundleData bundleData = loadData.getBundleData().get(bundle); + if (bundle.startsWith("bundle1-")) { + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker2"); + } else if (bundle.startsWith("bundle3-")) { + assertEquals(avgShedder.selectBroker(loadData.getBrokerData().keySet(), bundleData, loadData, conf).get(), "broker4"); + } else { + fail(); + } + } + } +}