From b06e78e30b5579bd1729f6df67b6472290fdd5d4 Mon Sep 17 00:00:00 2001 From: Qiang Huang Date: Fri, 24 Jun 2022 14:06:42 +0800 Subject: [PATCH] add switch for enable/disable distribute bundles evenly (#16059) --- .../apache/pulsar/broker/ServiceConfiguration.java | 8 ++++++++ .../loadbalance/impl/ModularLoadManagerImpl.java | 13 ++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 2a0ee8356e1cb..c8c3a9aa02356 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2002,6 +2002,14 @@ public class ServiceConfiguration implements PulsarConfiguration { + " should be offload from some over-loaded broker to other under-loaded brokers" ) private int loadBalancerSheddingIntervalMinutes = 1; + + @FieldContext( + dynamic = true, + category = CATEGORY_LOAD_BALANCER, + doc = "enable/disable distribute bundles evenly" + ) + private boolean loadBalancerDistributeBundlesEvenlyEnabled = true; + @FieldContext( category = CATEGORY_LOAD_BALANCER, dynamic = true, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index b6e7a7dde199d..a32b52ea84425 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -828,10 +828,17 @@ public Optional selectBrokerForAssignment(final ServiceUnitId serviceUni LoadManagerShared.filterAntiAffinityGroupOwnedBrokers(pulsar, serviceUnit.toString(), brokerCandidateCache, brokerToNamespaceToBundleRange, brokerToFailureDomainMap); - // distribute bundles evenly to candidate-brokers - LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), brokerCandidateCache, - brokerToNamespaceToBundleRange); + // distribute bundles evenly to candidate-brokers if enable + if (conf.isLoadBalancerDistributeBundlesEvenlyEnabled()) { + LoadManagerShared.removeMostServicingBrokersForNamespace(serviceUnit.toString(), + brokerCandidateCache, + brokerToNamespaceToBundleRange); + if (log.isDebugEnabled()) { + log.debug("enable distribute bundles evenly to candidate-brokers, broker candidate count={}", + brokerCandidateCache.size()); + } + } log.info("{} brokers being considered for assignment of {}", brokerCandidateCache.size(), bundle); // Use the filter pipeline to finalize broker candidates.