Skip to content

Commit

Permalink
Add ThresholdShedder Strategy for loadbalancer and expose loadbalance…
Browse files Browse the repository at this point in the history
… metric to prometheus (apache#6772)

### Motivation
The Only one overload shedder strategy is `OverloadShedder`, which collects each broker's max resource usage and compare with threshold (default value is 85%). When max resource usage reaches the threshold, it will trigger bundle unloading, which will migrate parts of bundles to other brokers. The overload shedder strategy has some drawbacks as follows:
- Not support configure other overload shedder strategies
- It is hard to determine the threshold value,  the default threshold is 85%. But for a broker, the max resource usage is few to reach 85%, which will lead to unbalanced traffic between brokers. The heavy traffic broker's read cache hit rate will decrease.
- When you restart the most brokers of the pulsar cluster at the same time, the whole traffic in the cluster will goes to the rest brokers. The restarted brokers will have no traffic for a long time, due to the rest brokers max resource usage not reach the threshold.

### Changes
1. Support multiple overload shedder strategy, which only need to configure in `broker.conf`
2. I develop `ThresholdShedder` strategy, the main idea as follow:
    - Calculate the average resource usage of the brokers, and individual broker resource usage will compare with the average value. If it greatter than average value plus threshold, the overload shedder will be triggered.
    `broker resource usage > average resource usage + threshold`
    - Each kind of resources (ie bandwithIn, bandwithOut, CPU, Memory, Direct Memory), has weight(default is 1.0) when calculate broker's resource usage.
    - Record the pulsar broker cluster history average resource usage, new average resource usage will be calculate as follow:
    `new_avg = old_avg * factor + (1-factor) * avg`
    `new_avg`: newest average resoruce usage
    `old_avg`: old average resource usge which is calculate in last round.
    `factor`: the decrease factor, default value is `0.9`
    `avg`: the average resource usage of the brokers
3. expose load balance metric to prometheus
4. fix a bug in `OverloadShedder`, which specify the unloaded bundle in the overload's own broker.

Please help check this implementation, if it is ok, i will add test case.
  • Loading branch information
hangc0276 authored and huangdx0726 committed Aug 24, 2020
1 parent b8955df commit ced5363
Show file tree
Hide file tree
Showing 10 changed files with 481 additions and 3 deletions.
37 changes: 37 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,43 @@ supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally
# Default algorithm name for namespace bundle split
defaultNamespaceBundleSplitAlgorithm=range_equally_divide

# load shedding strategy, support OverloadShedder and ThresholdShedder, default is OverloadShedder
loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.OverloadShedder

# The broker resource usage threshold.
# When the broker resource usage is gratter than the pulsar cluster average resource usge,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only take effect in ThresholdSheddler strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only take effect in ThresholdSheddler strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently.
# It only take effect in ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10

### --- Replication --- ###

# Enable replication metrics
Expand Down
34 changes: 34 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,40 @@ loadBalancerNamespaceBundleMaxBandwidthMbytes=100
# maximum number of bundles in a namespace
loadBalancerNamespaceMaximumBundles=128

# The broker resource usage threshold.
# When the broker resource usage is gratter than the pulsar cluster average resource usge,
# the threshold shedder will be triggered to offload bundles from the broker.
# It only take effect in ThresholdSheddler strategy.
loadBalancerBrokerThresholdShedderPercentage=10

# When calculating new resource usage, the history usage accounts for.
# It only take effect in ThresholdSheddler strategy.
loadBalancerHistoryResourcePercentage=0.9

# The BandWithIn usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerBandwithInResourceWeight=1.0

# The BandWithOut usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerBandwithOutResourceWeight=1.0

# The CPU usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerCPUResourceWeight=1.0

# The heap memory usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerMemoryResourceWeight=1.0

# The direct memory usage weight when calculating new resourde usage.
# It only take effect in ThresholdShedder strategy.
loadBalancerDirectMemoryResourceWeight=1.0

# Bundle unload minimum throughput threshold (MB), avoding bundle unload frequently.
# It only take effect in ThresholdShedder strategy.
loadBalancerBundleUnloadMinThroughputThreshold=10

### --- Replication --- ###

# Enable replication metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "load placement strategy[weightedRandomSelection/leastLoadedServer] (only used by SimpleLoadManagerImpl)"
)
private String loadBalancerPlacementStrategy = "leastLoadedServer"; // weighted random selection

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "load balance load shedding strategy"
)
private String loadBalancerLoadSheddingStrategy = "org.apache.pulsar.broker.loadbalance.impl.OverloadShedder";

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
Expand Down Expand Up @@ -1201,6 +1208,63 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Usage threshold to determine a broker as over-loaded"
)
private int loadBalancerBrokerOverloadedThresholdPercentage = 85;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Usage threshold to determine a broker whether to start threshold shedder"
)
private int loadBalancerBrokerThresholdShedderPercentage = 10;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Resource history Usage Percentage When adding new resource usage info"
)
private double loadBalancerHistoryResourcePercentage = 0.9;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "BandwithIn Resource Usage Weight"
)
private double loadBalancerBandwithInResourceWeight = 1.0;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "BandwithOut Resource Usage Weight"
)
private double loadBalancerBandwithOutResourceWeight = 1.0;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "CPU Resource Usage Weight"
)
private double loadBalancerCPUResourceWeight = 1.0;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Memory Resource Usage Weight"
)
private double loadBalancerMemoryResourceWeight = 1.0;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Direct Memory Resource Usage Weight"
)
private double loadBalancerDirectMemoryResourceWeight = 1.0;

@FieldContext(
dynamic = true,
category = CATEGORY_LOAD_BALANCER,
doc = "Bundle unload minimum throughput threshold (MB)"
)
private double loadBalancerBundleUnloadMinThroughputThreshold = 10;

@FieldContext(
category = CATEGORY_LOAD_BALANCER,
doc = "Interval to flush dynamic resource quota to ZooKeeper"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
*/
package org.apache.pulsar.broker.loadbalance;

import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.pulsar.zookeeper.ZooKeeperCache.Deserializer;
Expand Down Expand Up @@ -119,4 +121,11 @@ public interface ModularLoadManager {
* @return
*/
LocalBrokerData getBrokerLocalData(String broker);

/**
* Fetch load balancing metrics.
*
* @return List of LoadBalancing Metrics
*/
List<Metrics> getLoadBalancingMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.web.PulsarWebResource.path;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;

Expand All @@ -35,10 +36,12 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -67,6 +70,7 @@
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
Expand Down Expand Up @@ -185,6 +189,17 @@ public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCach
private static final Deserializer<LocalBrokerData> loadReportDeserializer = (key, content) -> jsonMapper()
.readValue(content, LocalBrokerData.class);

// record load balancing metrics
private AtomicReference<List<Metrics>> loadBalancingMetrics = new AtomicReference<>();
// record bundle unload metrics
private AtomicReference<List<Metrics>> bundleUnloadMetrics = new AtomicReference<>();
// record bundle split metrics
private AtomicReference<List<Metrics>> bundleSplitMetrics = new AtomicReference<>();

private long bundleSplitCount = 0;
private long unloadBrokerCount = 0;
private long unloadBundleCount = 0;

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
*/
Expand All @@ -195,7 +210,6 @@ public ModularLoadManagerImpl() {
filterPipeline = new ArrayList<>();
loadData = new LoadData();
loadSheddingPipeline = new ArrayList<>();
loadSheddingPipeline.add(new OverloadShedder());
preallocatedBundleToBroker = new ConcurrentHashMap<>();
scheduler = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = Maps.newHashMap();
Expand Down Expand Up @@ -275,6 +289,25 @@ public LocalBrokerData deserialize(String key, byte[] content) throws Exception
.registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));
pulsar.getConfigurationCache().failureDomainCache()
.registerListener((path, data, stat) -> scheduler.execute(() -> refreshBrokerToFailureDomainMap()));

loadSheddingPipeline.add(createLoadSheddingStrategy());
}

private LoadSheddingStrategy createLoadSheddingStrategy() {
try {
Class<?> loadSheddingClass = Class.forName(conf.getLoadBalancerLoadSheddingStrategy());
Object loadSheddingInstance = loadSheddingClass.newInstance();
if (loadSheddingInstance instanceof LoadSheddingStrategy) {
return (LoadSheddingStrategy) loadSheddingInstance;
} else {
log.error("create load shedding strategy failed. using OverloadShedder instead.");
return new OverloadShedder();
}
} catch (Exception e) {
log.error("Error when trying to create load shedding strategy: ", e);
}

return new OverloadShedder();
}

/**
Expand Down Expand Up @@ -593,9 +626,32 @@ public synchronized void doLoadShedding() {
}
});
});

updateBundleUnloadingMetrics(bundlesToUnload);
}
}

/**
* As leader broker, update bundle unloading metrics.
*
* @param bundlesToUnload
*/
private void updateBundleUnloadingMetrics(Multimap<String, String> bundlesToUnload) {
unloadBrokerCount += bundlesToUnload.keySet().size();
unloadBundleCount += bundlesToUnload.values().size();

List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();

dimensions.put("metric", "bundleUnloading");

Metrics m = Metrics.create(dimensions);
m.put("brk_lb_unload_broker_count", unloadBrokerCount);
m.put("brk_lb_unload_bundle_count", unloadBundleCount);
metrics.add(m);
this.bundleUnloadMetrics.set(metrics);
}

public boolean shouldAntiAffinityNamespaceUnload(String namespace, String bundle, String currentBroker) {
try {
Optional<Policies> nsPolicies = pulsar.getConfigurationCache().policiesCache()
Expand Down Expand Up @@ -659,10 +715,31 @@ public void checkNamespaceBundleSplit() {
log.error("Failed to split namespace bundle {}", bundleName, e);
}
}

updateBundleSplitMetrics(bundlesToBeSplit);
}

}

/**
* As leader broker, update bundle split metrics.
*
* @param bundlesToBeSplit
*/
private void updateBundleSplitMetrics(Set<String> bundlesToBeSplit) {
bundleSplitCount += bundlesToBeSplit.size();

List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();

dimensions.put("metric", "bundlesSplit");

Metrics m = Metrics.create(dimensions);
m.put("brk_lb_bundles_split_count", bundleSplitCount);
metrics.add(m);
this.bundleSplitMetrics.set(metrics);
}

/**
* When the broker data ZooKeeper nodes are updated, update the broker data map.
*/
Expand Down Expand Up @@ -851,12 +928,35 @@ public LocalBrokerData updateLocalBrokerData() {
try {
final SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
localData.update(systemResourceUsage, getBundleStats());
updateLoadBalancingMetrics(systemResourceUsage);
} catch (Exception e) {
log.warn("Error when attempting to update local broker data", e);
}
return localData;
}

/**
* As any broker, update System Resource Usage Percentage.
*
* @param systemResourceUsage
*/
private void updateLoadBalancingMetrics(final SystemResourceUsage systemResourceUsage) {
List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();

dimensions.put("broker", conf.getAdvertisedAddress());
dimensions.put("metric", "loadBalancing");

Metrics m = Metrics.create(dimensions);
m.put("brk_lb_cpu_usage", systemResourceUsage.getCpu().percentUsage());
m.put("brk_lb_memory_usage", systemResourceUsage.getMemory().percentUsage());
m.put("brk_lb_directMemory_usage", systemResourceUsage.getDirectMemory().percentUsage());
m.put("brk_lb_bandwidth_in_usage", systemResourceUsage.getBandwidthIn().percentUsage());
m.put("brk_lb_bandwidth_out_usage", systemResourceUsage.getBandwidthOut().percentUsage());
metrics.add(m);
this.loadBalancingMetrics.set(metrics);
}

/**
* As any broker, write the local broker data to ZooKeeper.
*/
Expand Down Expand Up @@ -987,4 +1087,23 @@ public LocalBrokerData getBrokerLocalData(String broker) {
return null;
}
}

@Override
public List<Metrics> getLoadBalancingMetrics() {
List<Metrics> metricsCollection = new ArrayList<>();

if (this.loadBalancingMetrics.get() != null) {
metricsCollection.addAll(this.loadBalancingMetrics.get());
}

if (this.bundleUnloadMetrics.get() != null) {
metricsCollection.addAll(this.bundleUnloadMetrics.get());
}

if (this.bundleSplitMetrics.get() != null) {
metricsCollection.addAll(this.bundleSplitMetrics.get());
}

return metricsCollection;
}
}
Loading

0 comments on commit ced5363

Please sign in to comment.