From 8aed71b4709362faeb20e66c4fe1962433d20927 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 11:35:13 +0530 Subject: [PATCH 01/16] Add cluster primary balance contraint for rebalancing with buffer Signed-off-by: Arpit Bandejiya --- .../routing/allocation/ConstraintTypes.java | 13 ++++++ .../allocation/RebalanceConstraints.java | 5 ++- .../allocator/BalancedShardsAllocator.java | 44 ++++++++++++++++--- 3 files changed, 55 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index ae2d4a0926194..f9d8b8a3794dc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -81,4 +81,17 @@ public static Predicate isPrimaryShardsPerNodeBreac return primaryShardCount >= allowedPrimaryShardCount; }; } + + /** + * Defines a predicate which returns true when a node contains more than average number of primary shards. This + * constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT} + * is assigned to node resulting in lesser chances of node being selected as allocation target + */ + public static Predicate isPrimaryShardsPerNodeBreached(float buffer) { + return (params) -> { + int primaryShardCount = params.getNode().numPrimaryShards(); + int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer)); + return primaryShardCount >= allowedPrimaryShardCount; + }; + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index a4036ec47ec0e..a01d8da6e72fb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -14,8 +14,10 @@ import java.util.HashMap; import java.util.Map; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached; /** * Constraints applied during rebalancing round; specify conditions which, if breached, reduce the @@ -27,9 +29,10 @@ public class RebalanceConstraints { private Map constraints; - public RebalanceConstraints() { + public RebalanceConstraints(float preferPrimaryBalanceBuffer) { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer))); } public void updateRebalanceConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 41ace0e7661fe..ccc85ced729d3 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -145,10 +145,21 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.balance.prefer_primary_balance_buffer", + 0.05f, + 0.0f, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; + private volatile float preferPrimaryShardBalanceBuffer; + private volatile float indexBalanceFactor; + private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; @@ -158,14 +169,17 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings)); + setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings), PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setPreferPrimaryShardBalanceBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); - clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction); + clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); } @@ -190,8 +204,23 @@ private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrateg } } - private void setWeightFunction(float indexBalance, float shardBalanceFactor) { - weightFunction = new WeightFunction(indexBalance, shardBalanceFactor); + private void setIndexBalanceFactor(float indexBalanceFactor) { + this.indexBalanceFactor = indexBalanceFactor; + setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); + } + + private void setShardBalanceFactor(float shardBalanceFactor) { + this.shardBalanceFactor = shardBalanceFactor; + setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); + } + + private void setPreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { + this.preferPrimaryShardBalanceBuffer = preferPrimaryShardBalanceBuffer; + setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); + } + + private void setWeightFunction(float indexBalance, float shardBalanceFactor, float preferPrimaryShardBalanceDelta) { + weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, preferPrimaryShardBalanceDelta); } /** @@ -203,6 +232,7 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); } private void setThreshold(float threshold) { @@ -347,8 +377,9 @@ static class WeightFunction { private final float theta1; private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; + private final float preferPrimaryBalanceBuffer; - WeightFunction(float indexBalance, float shardBalance) { + WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); @@ -358,7 +389,8 @@ static class WeightFunction { this.indexBalance = indexBalance; this.shardBalance = shardBalance; this.constraints = new AllocationConstraints(); - this.rebalanceConstraints = new RebalanceConstraints(); + this.rebalanceConstraints = new RebalanceConstraints(preferPrimaryBalanceBuffer); + this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; // Enable index shard per node breach constraint updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); } From a2eaddd33aae128ea8a85b72880598d070799628 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 11:41:39 +0530 Subject: [PATCH 02/16] Fix spotless check Signed-off-by: Arpit Bandejiya --- .../cluster/routing/allocation/RebalanceConstraints.java | 5 ++++- .../allocation/allocator/BalancedShardsAllocator.java | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index a01d8da6e72fb..41853514630cb 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -32,7 +32,10 @@ public class RebalanceConstraints { public RebalanceConstraints(float preferPrimaryBalanceBuffer) { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer))); + this.constraints.putIfAbsent( + CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer)) + ); } public void updateRebalanceConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ccc85ced729d3..c6f5187c03e47 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -169,7 +169,11 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings), PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); + setWeightFunction( + INDEX_BALANCE_FACTOR_SETTING.get(settings), + SHARD_BALANCE_FACTOR_SETTING.get(settings), + PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings) + ); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); From 761adc311e267aa5c321f71ba3f30b33d2b11494 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 14:08:02 +0530 Subject: [PATCH 03/16] Tuning constraints Signed-off-by: Arpit Bandejiya --- .../cluster/routing/allocation/ConstraintTypes.java | 2 +- .../routing/allocation/allocator/BalancedShardsAllocator.java | 4 +++- .../routing/allocation/allocator/LocalShardsBalancer.java | 2 +- .../java/org/opensearch/common/settings/ClusterSettings.java | 1 + 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index f9d8b8a3794dc..42baa34de0632 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -91,7 +91,7 @@ public static Predicate isPrimaryShardsPerNodeBreac return (params) -> { int primaryShardCount = params.getNode().numPrimaryShards(); int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer)); - return primaryShardCount >= allowedPrimaryShardCount; + return primaryShardCount > allowedPrimaryShardCount; }; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c6f5187c03e47..798cfaae3a085 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -169,6 +169,9 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { + setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); + setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); + setPreferPrimaryShardBalanceBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); setWeightFunction( INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings), @@ -177,7 +180,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); - setPreferPrimaryShardBalanceBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 45f64a5b29b04..2296e6353d16e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -998,7 +998,7 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala // doing such relocation wouldn't help in primary balance. if (preferPrimaryBalance == true && shard.primary() - && maxNode.numPrimaryShards(shard.getIndexName()) - minNode.numPrimaryShards(shard.getIndexName()) < 2) { + && maxNode.numPrimaryShards() - minNode.numPrimaryShards() < 2) { continue; } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8695ff878c8bc..f05c8ca2ed940 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -248,6 +248,7 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, + BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, From f842b41310a810d5431333a84ed64c22518a65d6 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 15:47:34 +0530 Subject: [PATCH 04/16] Fix the spotless checks Signed-off-by: Arpit Bandejiya --- .../routing/allocation/allocator/LocalShardsBalancer.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 2296e6353d16e..6f3098ee683b6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -996,9 +996,7 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala } // This is a safety net which prevents un-necessary primary shard relocations from maxNode to minNode when // doing such relocation wouldn't help in primary balance. - if (preferPrimaryBalance == true - && shard.primary() - && maxNode.numPrimaryShards() - minNode.numPrimaryShards() < 2) { + if (preferPrimaryBalance == true && shard.primary() && maxNode.numPrimaryShards() - minNode.numPrimaryShards() < 2) { continue; } From 55ed855e05ab582518a0d8c33148e2a8323ccb9b Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 11 Mar 2024 13:28:05 +0530 Subject: [PATCH 05/16] Add random allocation strategy Signed-off-by: Arpit Bandejiya --- .../allocator/BalancedShardsAllocator.java | 24 ++++++++++++++--- .../allocator/LocalShardsBalancer.java | 27 ++++++++++++++++++- 2 files changed, 47 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 798cfaae3a085..da8494616edff 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -153,6 +153,16 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + /** + * This setting governs whether shards should be randomly allocated during assignment. + */ + public static final Setting PREFER_RANDOM_SHARD_ALLOCATION = Setting.boolSetting( + "cluster.routing.allocation.balance.prefer_random_allocation", + false, + Property.Dynamic, + Property.NodeScope + ); + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -162,6 +172,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; + private volatile boolean preferRandomShardAllocation; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -180,6 +191,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setPreferRandomShardAllocation(PREFER_RANDOM_SHARD_ALLOCATION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -187,6 +199,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(PREFER_RANDOM_SHARD_ALLOCATION, this::setPreferRandomShardAllocation); } /** @@ -245,6 +258,9 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setPreferRandomShardAllocation(boolean preferRandomShardAllocation) { + this.preferRandomShardAllocation = preferRandomShardAllocation; + } @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -257,7 +273,8 @@ public void allocate(RoutingAllocation allocation) { shardMovementStrategy, weightFunction, threshold, - preferPrimaryShardBalance + preferPrimaryShardBalance, + preferRandomShardAllocation ); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); @@ -278,7 +295,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f shardMovementStrategy, weightFunction, threshold, - preferPrimaryShardBalance + preferPrimaryShardBalance, + preferRandomShardAllocation ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -533,7 +551,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 6f3098ee683b6..965dacf65aaa6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -28,6 +28,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; +import org.opensearch.common.Randomness; import org.opensearch.common.collect.Tuple; import org.opensearch.gateway.PriorityComparator; @@ -69,6 +70,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; + private final boolean preferRandomShardAllocation; public LocalShardsBalancer( Logger logger, @@ -76,7 +78,8 @@ public LocalShardsBalancer( ShardMovementStrategy shardMovementStrategy, BalancedShardsAllocator.WeightFunction weight, float threshold, - boolean preferPrimaryBalance + boolean preferPrimaryBalance, + boolean preferRandomShardAllocation ) { this.logger = logger; this.allocation = allocation; @@ -92,6 +95,7 @@ public LocalShardsBalancer( inEligibleTargetNode = new HashSet<>(); this.preferPrimaryBalance = preferPrimaryBalance; this.shardMovementStrategy = shardMovementStrategy; + this.preferRandomShardAllocation = preferRandomShardAllocation; } /** @@ -888,6 +892,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { /* find an node with minimal weight we can allocate on*/ float minWeight = Float.POSITIVE_INFINITY; BalancedShardsAllocator.ModelNode minNode = null; + List minNodes = new ArrayList<>(); Decision decision = null; /* Don't iterate over an identity hashset here the * iteration order is different for each run and makes testing hard */ @@ -931,11 +936,26 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); + minNodes.add(node); } else { updateMinNode = currentDecision.type() == Decision.Type.YES; + /* If updateMinNode is true, it means the earlier nodes had decision type THROTTLE. We will need to clear the list, + * and add new nodes to the list. + */ + if(updateMinNode) { + minNodes.clear(); + minNodes.add(node); + } } } else { updateMinNode = currentWeight < minWeight; + /* Since we have found nodes with less weight. We will need to clear the earlier minNodes list + * and add the new nodes to the list. + */ + if (updateMinNode) { + minNodes.clear(); + minNodes.add(node); + } } if (updateMinNode) { minNode = node; @@ -959,6 +979,11 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); } } + + if(preferRandomShardAllocation && !minNodes.isEmpty()){ + minNode = minNodes.get(Randomness.get().nextInt(minNodes.size())); + } + return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions); } From 32df4e610c948bc3961799ef94b43b20906bec07 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 12:14:41 +0530 Subject: [PATCH 06/16] Fix spotless errors Signed-off-by: Arpit Bandejiya --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 1 + .../routing/allocation/allocator/LocalShardsBalancer.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index da8494616edff..8193532dabcb2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -261,6 +261,7 @@ private void setThreshold(float threshold) { private void setPreferRandomShardAllocation(boolean preferRandomShardAllocation) { this.preferRandomShardAllocation = preferRandomShardAllocation; } + @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 965dacf65aaa6..a20f1a532e51d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -942,7 +942,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { /* If updateMinNode is true, it means the earlier nodes had decision type THROTTLE. We will need to clear the list, * and add new nodes to the list. */ - if(updateMinNode) { + if (updateMinNode) { minNodes.clear(); minNodes.add(node); } @@ -980,7 +980,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } } - if(preferRandomShardAllocation && !minNodes.isEmpty()){ + if (preferRandomShardAllocation && !minNodes.isEmpty()) { minNode = minNodes.get(Randomness.get().nextInt(minNodes.size())); } From 938ac4ded20ca872c326aa1b6f31bf7a889030c3 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 14 Mar 2024 15:50:38 +0530 Subject: [PATCH 07/16] Testing the allocation changes for all tests Signed-off-by: Arpit Bandejiya --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 8193532dabcb2..d6bec6ac4cba0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -158,7 +158,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { */ public static final Setting PREFER_RANDOM_SHARD_ALLOCATION = Setting.boolSetting( "cluster.routing.allocation.balance.prefer_random_allocation", - false, + true, Property.Dynamic, Property.NodeScope ); From 6cda5cd29855496ceab785dc27c5c60376b32824 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 18 Mar 2024 11:48:00 +0530 Subject: [PATCH 08/16] Enable the prefer_primary to true Signed-off-by: Arpit Bandejiya --- .../routing/allocation/allocator/BalancedShardsAllocator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index d6bec6ac4cba0..016ca32ec6d05 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -140,7 +140,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { public static final Setting PREFER_PRIMARY_SHARD_BALANCE = Setting.boolSetting( "cluster.routing.allocation.balance.prefer_primary", - false, + true, Property.Dynamic, Property.NodeScope ); @@ -154,7 +154,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); /** - * This setting governs whether shards should be randomly allocated during assignment. + * This setting governs whether shards should be randomly allocated among the eligible nodes during assignment. */ public static final Setting PREFER_RANDOM_SHARD_ALLOCATION = Setting.boolSetting( "cluster.routing.allocation.balance.prefer_random_allocation", From ece3b263061cfa215e73e327652d4a3b458499fe Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Mon, 18 Mar 2024 11:51:28 +0530 Subject: [PATCH 09/16] Add changelog.md Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index cac3b74fe716c..8bd4d2f941817 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) - Add Remote Store Migration Experimental flag and allow mixed mode clusters under same ([#11986](https://github.com/opensearch-project/OpenSearch/pull/11986)) - [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) +- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 From 3d4d8654a316a548ec71a8eec76c4f2ebb77edb2 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 19 Mar 2024 10:38:30 +0530 Subject: [PATCH 10/16] Add UTs Signed-off-by: Arpit Bandejiya --- .../routing/allocation/ConstraintTypes.java | 5 + .../allocation/RebalanceConstraints.java | 4 +- .../allocator/BalancedShardsAllocator.java | 29 ++++-- .../common/settings/ClusterSettings.java | 4 +- .../allocation/BalanceConfigurationTests.java | 94 +++++++++++++++++-- 5 files changed, 117 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 42baa34de0632..0f83ad3898b6e 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -28,6 +28,11 @@ public class ConstraintTypes { */ public final static String CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; + /** + * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices + */ + public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; + /** * Defines an index constraint which is breached when a node contains more than avg number of shards for an index */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 41853514630cb..ca714ab1ff557 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -14,7 +14,7 @@ import java.util.HashMap; import java.util.Map; -import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached; @@ -33,7 +33,7 @@ public RebalanceConstraints(float preferPrimaryBalanceBuffer) { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); this.constraints.putIfAbsent( - CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer)) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 016ca32ec6d05..e580e5c12e9f4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -61,6 +61,7 @@ import java.util.Set; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; +import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID; @@ -140,13 +141,20 @@ public class BalancedShardsAllocator implements ShardsAllocator { public static final Setting PREFER_PRIMARY_SHARD_BALANCE = Setting.boolSetting( "cluster.routing.allocation.balance.prefer_primary", - true, + false, Property.Dynamic, Property.NodeScope ); - public static final Setting PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting( - "cluster.routing.allocation.balance.prefer_primary_balance_buffer", + public static final Setting PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting( + "cluster.routing.allocation.rebalance.prefer_primary", + false, + Property.Dynamic, + Property.NodeScope + ); + + public static final Setting PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting( + "cluster.routing.allocation.rebalance.primary_buffer", 0.05f, 0.0f, Property.Dynamic, @@ -180,16 +188,17 @@ public BalancedShardsAllocator(Settings settings) { @Inject public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { - setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); - setPreferPrimaryShardBalanceBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings)); + setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); + setPreferPrimaryShardBalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); setWeightFunction( INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings), - PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings) + PRIMARY_SHARD_REBALANCE_BUFFER.get(settings) ); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); + setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setPreferRandomShardAllocation(PREFER_RANDOM_SHARD_ALLOCATION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); @@ -197,7 +206,8 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor); clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor); - clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); + clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(PREFER_RANDOM_SHARD_ALLOCATION, this::setPreferRandomShardAllocation); } @@ -251,7 +261,10 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); - this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); + } + + private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { + this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); } private void setThreshold(float threshold) { diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6526826a9a58e..b6af7e72516cc 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -250,8 +250,10 @@ public void apply(Settings value, Settings current, Settings previous) { AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING, BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING, - BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE_BUFFER, + BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, + BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, + BalancedShardsAllocator.PREFER_RANDOM_SHARD_ALLOCATION, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index bbbafb2c20529..e2b64d1d0fc12 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -72,6 +72,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -91,7 +92,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float shardBalance = 0.0f; + final float shardBalance = 0.001f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); @@ -140,10 +141,13 @@ public void testIndexBalance() { } private Settings.Builder getSettingsBuilderForPrimaryBalance() { - return getSettingsBuilderForPrimaryBalance(true); + return getSettingsBuilderForPrimaryBalance(true, false); } - private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrimaryBalance) { + private Settings.Builder getSettingsBuilderForPrimaryReBalance() { + return getSettingsBuilderForPrimaryBalance(true, true); + } + private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrimaryBalance, boolean preferPrimaryRebalance) { final float indexBalance = 0.55f; final float shardBalance = 0.45f; final float balanceThreshold = 1.0f; @@ -155,6 +159,7 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrima ); settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance); + settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); settings.put(BalancedShardsAllocator.PREFER_RANDOM_SHARD_ALLOCATION.getKey(), randomBoolean()); @@ -202,7 +207,7 @@ public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { int balanceFailed = 0; AllocationService strategy = createAllocationService( - getSettingsBuilderForPrimaryBalance(false).build(), + getSettingsBuilderForPrimaryBalance(false ,false).build(), new TestGatewayAllocator() ); for (int i = 0; i < numberOfRuns; i++) { @@ -245,6 +250,33 @@ public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { assertTrue(balanceFailed <= 1); } + /** + * This test verifies primary shard balance is attained with PREFER_PRIMARY_SHARD_BALANCE setting. + */ + public void testPrimaryBalanceWithPreferPrimaryReBalanceSetting() { + final int numberOfNodes = 4; + final int numberOfIndices = 4; + final int numberOfShards = 4; + final int numberOfReplicas = 1; + final int numberOfRuns = 5; + final float buffer = 0.05f; + int balanceFailed = 0; + + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); + for (int i = 0; i < numberOfRuns; i++) { + ClusterState clusterState = initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + clusterState = removeOneNode(clusterState, strategy); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPrimaryBalance(clusterState, buffer); + } catch (Exception e) { + balanceFailed++; + logger.info("Unexpected assertion failure"); + } + } + assertTrue(balanceFailed <= 1); + } + /** * This test verifies the allocation logic when nodes breach multiple constraints and ensure node breaching min * constraints chosen for allocation. @@ -368,7 +400,7 @@ public void testPrimaryBalanceWithContrainstBreaching() { */ public void testGlobalPrimaryBalance() throws Exception { AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); - ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .build(); clusterState = addNode(clusterState, strategy); clusterState = addNode(clusterState, strategy); @@ -381,6 +413,33 @@ public void testGlobalPrimaryBalance() throws Exception { verifyPrimaryBalance(clusterState); } + /** + * This test verifies global balance by creating indices iteratively and verify primary shards do not pile up on one + * @throws Exception generic exception + */ + public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { + final float buffer = 0.05f; + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .build(); + clusterState = addNode(clusterState, strategy); + clusterState = addNode(clusterState, strategy); + clusterState = addNode(clusterState, strategy); + clusterState = addNode(clusterState, strategy); + clusterState = addNode(clusterState, strategy); + + clusterState = addIndex(clusterState, strategy, "test-index1", 5, 1); + clusterState = addIndex(clusterState, strategy, "test-index2", 5, 1); + clusterState = addIndex(clusterState, strategy, "test-index3", 5, 1); + + clusterState = removeOneNode(clusterState, strategy); + + clusterState = applyAllocationUntilNoChange(clusterState, strategy); + + logger.info(ShardAllocations.printShardDistribution(clusterState)); + verifyPrimaryBalance(clusterState, buffer); + } + /** * This test mimics a cluster state which can not be rebalanced due to * {@link org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider} @@ -538,6 +597,25 @@ private void verifyPerIndexPrimaryBalance(ClusterState currentState) { } } + private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throws Exception { + assertBusy(() -> { + RoutingNodes nodes = clusterState.getRoutingNodes(); + int totalPrimaryShards = 0; + for (final IndexRoutingTable index : clusterState.getRoutingTable().indicesRouting().values()) { + totalPrimaryShards += index.primaryShardsActive(); + } + final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / clusterState.getRoutingNodes().size()); + for (RoutingNode node : nodes) { + final int primaryCount = node.shardsWithState(STARTED) + .stream() + .filter(ShardRouting::primary) + .collect(Collectors.toList()) + .size(); + assertTrue(primaryCount < (avgPrimaryShardsPerNode * (1 + buffer))); + } + }, 60, TimeUnit.SECONDS); + } + private void verifyPrimaryBalance(ClusterState clusterState) throws Exception { assertBusy(() -> { RoutingNodes nodes = clusterState.getRoutingNodes(); @@ -568,8 +646,8 @@ public void testShardBalance() { ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), ClusterRebalanceAllocationDecider.ClusterRebalanceType.ALWAYS.toString() ); - settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); + settings.put(BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING.getKey(), indexBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); AllocationService strategy = createAllocationService(settings.build(), new TestGatewayAllocator()); @@ -665,7 +743,7 @@ private ClusterState initCluster( for (int i = 0; i < numberOfNodes; i++) { nodes.add(newNode("node" + i)); } - ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(nodes) .metadata(metadata) .routingTable(initialRoutingTable) @@ -919,7 +997,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing nodes.add(node); } - ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .nodes(nodes) .metadata(metadata) .routingTable(routingTable) From d7657af80696dfb976119dae38c927c8ff17dbce Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Tue, 19 Mar 2024 19:56:48 +0530 Subject: [PATCH 11/16] Fix spotless check Signed-off-by: Arpit Bandejiya --- .../routing/allocation/BalanceConfigurationTests.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index e2b64d1d0fc12..ba08438cd1b3b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -147,6 +147,7 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance() { private Settings.Builder getSettingsBuilderForPrimaryReBalance() { return getSettingsBuilderForPrimaryBalance(true, true); } + private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrimaryBalance, boolean preferPrimaryRebalance) { final float indexBalance = 0.55f; final float shardBalance = 0.45f; @@ -207,7 +208,7 @@ public void testPrimaryBalanceWithoutPreferPrimaryBalanceSetting() { int balanceFailed = 0; AllocationService strategy = createAllocationService( - getSettingsBuilderForPrimaryBalance(false ,false).build(), + getSettingsBuilderForPrimaryBalance(false, false).build(), new TestGatewayAllocator() ); for (int i = 0; i < numberOfRuns; i++) { @@ -400,8 +401,7 @@ public void testPrimaryBalanceWithContrainstBreaching() { */ public void testGlobalPrimaryBalance() throws Exception { AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); - ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); clusterState = addNode(clusterState, strategy); clusterState = addNode(clusterState, strategy); @@ -420,8 +420,7 @@ public void testGlobalPrimaryBalance() throws Exception { public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { final float buffer = 0.05f; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); - ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .build(); + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); clusterState = addNode(clusterState, strategy); clusterState = addNode(clusterState, strategy); clusterState = addNode(clusterState, strategy); From 566c7ef01f7669e5081a3b63c4282ab351968014 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 21 Mar 2024 18:08:12 +0530 Subject: [PATCH 12/16] Fix UTs Signed-off-by: Arpit Bandejiya --- .../cluster/routing/allocation/BalanceConfigurationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index ba08438cd1b3b..9bae46d292899 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -419,7 +419,7 @@ public void testGlobalPrimaryBalance() throws Exception { */ public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { final float buffer = 0.05f; - AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); clusterState = addNode(clusterState, strategy); clusterState = addNode(clusterState, strategy); From 0ca76bfa6ee224a0aecc2a9cb3d003fafdc9d5e4 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Fri, 22 Mar 2024 23:09:16 +0530 Subject: [PATCH 13/16] refactoring code Signed-off-by: Arpit Bandejiya --- CHANGELOG.md | 2 +- .../allocation/AllocationConstraints.java | 7 +- .../allocation/AllocationParameter.java | 24 ++ .../routing/allocation/ConstraintTypes.java | 21 +- .../allocation/RebalanceConstraints.java | 4 +- .../allocation/RebalanceParameter.java | 24 ++ .../allocator/BalancedShardsAllocator.java | 74 ++++--- .../allocator/LocalShardsBalancer.java | 206 +++++++++++++----- .../common/settings/ClusterSettings.java | 2 +- .../allocation/BalanceConfigurationTests.java | 2 +- 10 files changed, 253 insertions(+), 113 deletions(-) create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java create mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceParameter.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b85e6f4ea5df1..a476d83e99918 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) - [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) - Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) -- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 @@ -117,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601)) - [Tiered caching] Add policies controlling which values can enter pluggable caches [EXPERIMENTAL] ([#12542](https://github.com/opensearch-project/OpenSearch/pull/12542)) - [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625)) +- Add cluster primary balance contraint for rebalancing with buffer ([#12656](https://github.com/opensearch-project/OpenSearch/pull/12656)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index 5375910c57579..a7732cde0d7f0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,11 +28,14 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints() { + public AllocationConstraints(AllocationParameter allocationParameter) { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached())); + this.constraints.putIfAbsent( + CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, + new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer())) + ); } public void updateAllocationConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java new file mode 100644 index 0000000000000..2444f6405278e --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +/** + * RebalanceConstraint Params + */ +public class AllocationParameter { + private final float preferPrimaryBalanceBuffer; + + public AllocationParameter(float preferPrimaryBalanceBuffer) { + this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; + } + + public float getPreferPrimaryBalanceBuffer() { + return preferPrimaryBalanceBuffer; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index 0f83ad3898b6e..fa2bff875a6d5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -31,7 +31,7 @@ public class ConstraintTypes { /** * Defines a cluster constraint which is breached when a node contains more than avg primary shards across all indices */ - public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.balance.constraint"; + public final static String CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID = "cluster.primary.shard.rebalance.constraint"; /** * Defines an index constraint which is breached when a node contains more than avg number of shards for an index @@ -75,22 +75,9 @@ public static Predicate isPerIndexPrimaryShardsPerN } /** - * Defines a predicate which returns true when a node contains more than average number of primary shards. This - * constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT} - * is assigned to node resulting in lesser chances of node being selected as allocation target - */ - public static Predicate isPrimaryShardsPerNodeBreached() { - return (params) -> { - int primaryShardCount = params.getNode().numPrimaryShards(); - int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode()); - return primaryShardCount >= allowedPrimaryShardCount; - }; - } - - /** - * Defines a predicate which returns true when a node contains more than average number of primary shards. This - * constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT} - * is assigned to node resulting in lesser chances of node being selected as allocation target + * Defines a predicate which returns true when a node contains more than average number of primary shards with added buffer. This + * constraint is used in weight calculation during allocation/rebalance both. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT} + * is assigned to node resulting in lesser chances of node being selected as allocation/rebalance target */ public static Predicate isPrimaryShardsPerNodeBreached(float buffer) { return (params) -> { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index ca714ab1ff557..6e86dcc706aef 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -29,12 +29,12 @@ public class RebalanceConstraints { private Map constraints; - public RebalanceConstraints(float preferPrimaryBalanceBuffer) { + public RebalanceConstraints(RebalanceParameter rebalanceParameter) { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); this.constraints.putIfAbsent( CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, - new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer)) + new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer())) ); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceParameter.java new file mode 100644 index 0000000000000..35fbaede93ba3 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceParameter.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation; + +/** + * RebalanceConstraint Params + */ +public class RebalanceParameter { + private float preferPrimaryBalanceBuffer; + + public RebalanceParameter(float preferPrimaryBalanceBuffer) { + this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; + } + + public float getPreferPrimaryBalanceBuffer() { + return preferPrimaryBalanceBuffer; + } +} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index e580e5c12e9f4..ef971d9cd5650 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,9 +43,11 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; +import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; +import org.opensearch.cluster.routing.allocation.RebalanceParameter; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.ShardAllocationDecision; import org.opensearch.common.inject.Inject; @@ -147,15 +149,15 @@ public class BalancedShardsAllocator implements ShardsAllocator { ); public static final Setting PREFER_PRIMARY_SHARD_REBALANCE = Setting.boolSetting( - "cluster.routing.allocation.rebalance.prefer_primary", + "cluster.routing.allocation.rebalance.primary.enable", false, Property.Dynamic, Property.NodeScope ); public static final Setting PRIMARY_SHARD_REBALANCE_BUFFER = Setting.floatSetting( - "cluster.routing.allocation.rebalance.primary_buffer", - 0.05f, + "cluster.routing.allocation.rebalance.primary.buffer", + 0.10f, 0.0f, Property.Dynamic, Property.NodeScope @@ -164,9 +166,9 @@ public class BalancedShardsAllocator implements ShardsAllocator { /** * This setting governs whether shards should be randomly allocated among the eligible nodes during assignment. */ - public static final Setting PREFER_RANDOM_SHARD_ALLOCATION = Setting.boolSetting( - "cluster.routing.allocation.balance.prefer_random_allocation", - true, + public static final Setting ALLOW_RANDOM_ALLOCATION = Setting.boolSetting( + "cluster.routing.allocation.allow_random", + false, Property.Dynamic, Property.NodeScope ); @@ -175,7 +177,8 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile ShardMovementStrategy shardMovementStrategy; private volatile boolean preferPrimaryShardBalance; - private volatile float preferPrimaryShardBalanceBuffer; + private volatile boolean preferPrimaryShardRebalance; + private volatile float preferPrimaryShardRebalanceBuffer; private volatile float indexBalanceFactor; private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; @@ -190,26 +193,22 @@ public BalancedShardsAllocator(Settings settings) { public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); - setPreferPrimaryShardBalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); - setWeightFunction( - INDEX_BALANCE_FACTOR_SETTING.get(settings), - SHARD_BALANCE_FACTOR_SETTING.get(settings), - PRIMARY_SHARD_REBALANCE_BUFFER.get(settings) - ); + setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); + setWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); - setPreferRandomShardAllocation(PREFER_RANDOM_SHARD_ALLOCATION.get(settings)); + setPreferRandomShardAllocation(ALLOW_RANDOM_ALLOCATION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); - clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor); - clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor); + clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); - clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); - clusterSettings.addSettingsUpdateConsumer(PREFER_RANDOM_SHARD_ALLOCATION, this::setPreferRandomShardAllocation); + clusterSettings.addSettingsUpdateConsumer(ALLOW_RANDOM_ALLOCATION, this::setPreferRandomShardAllocation); } /** @@ -235,21 +234,33 @@ private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrateg private void setIndexBalanceFactor(float indexBalanceFactor) { this.indexBalanceFactor = indexBalanceFactor; - setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); } private void setShardBalanceFactor(float shardBalanceFactor) { this.shardBalanceFactor = shardBalanceFactor; - setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); } - private void setPreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { - this.preferPrimaryShardBalanceBuffer = preferPrimaryShardBalanceBuffer; - setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer); + private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) { + this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer; + } + + private void updateIndexBalanceFactor(float indexBalanceFactor) { + this.indexBalanceFactor = indexBalanceFactor; + setWeightFunction(); + } + + private void updateShardBalanceFactor(float shardBalanceFactor) { + this.shardBalanceFactor = shardBalanceFactor; + setWeightFunction(); + } + + private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { + this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardBalanceBuffer; + setWeightFunction(); } - private void setWeightFunction(float indexBalance, float shardBalanceFactor, float preferPrimaryShardBalanceDelta) { - weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, preferPrimaryShardBalanceDelta); + private void setWeightFunction() { + weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); } /** @@ -264,6 +275,7 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { } private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { + this.preferPrimaryShardRebalance = preferPrimaryShardRebalance; this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); } @@ -288,6 +300,7 @@ public void allocate(RoutingAllocation allocation) { weightFunction, threshold, preferPrimaryShardBalance, + preferPrimaryShardRebalance, preferRandomShardAllocation ); localShardsBalancer.allocateUnassigned(); @@ -310,6 +323,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f weightFunction, threshold, preferPrimaryShardBalance, + preferPrimaryShardRebalance, preferRandomShardAllocation ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; @@ -415,7 +429,6 @@ static class WeightFunction { private final float theta1; private AllocationConstraints constraints; private RebalanceConstraints rebalanceConstraints; - private final float preferPrimaryBalanceBuffer; WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) { float sum = indexBalance + shardBalance; @@ -426,9 +439,10 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; - this.constraints = new AllocationConstraints(); - this.rebalanceConstraints = new RebalanceConstraints(preferPrimaryBalanceBuffer); - this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; + AllocationParameter allocationParameter = new AllocationParameter(0.0f); + RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); + this.constraints = new AllocationConstraints(allocationParameter); + this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); // Enable index shard per node breach constraint updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); } @@ -565,7 +579,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index a20f1a532e51d..3e654bb003a4d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -62,6 +62,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final ShardMovementStrategy shardMovementStrategy; private final boolean preferPrimaryBalance; + private final boolean preferPrimaryRebalance; private final BalancedShardsAllocator.WeightFunction weight; private final float threshold; @@ -79,6 +80,7 @@ public LocalShardsBalancer( BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance, + boolean preferPrimaryRebalance, boolean preferRandomShardAllocation ) { this.logger = logger; @@ -94,6 +96,7 @@ public LocalShardsBalancer( sorter = newNodeSorter(); inEligibleTargetNode = new HashSet<>(); this.preferPrimaryBalance = preferPrimaryBalance; + this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; this.preferRandomShardAllocation = preferRandomShardAllocation; } @@ -869,6 +872,130 @@ void allocateUnassigned() { // clear everything we have either added it or moved to ignoreUnassigned } + private class MinWeightedNodeDecision { + public float minWeight; + public Decision decision; + public List nodes; + + public MinWeightedNodeDecision(List nodes, float minWeight, Decision decision) { + this.minWeight = minWeight; + this.decision = decision; + this.nodes = nodes; + } + + public void updateMinNode(BalancedShardsAllocator.ModelNode node, boolean allowRandomAllocation) { + if (allowRandomAllocation) { + nodes.add(node); + } else { + if (nodes.isEmpty()) { + nodes.add(node); + } else { + nodes.set(0, node); + } + } + } + + public void updateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision, boolean allowRandomAllocation) { + if (allowRandomAllocation) { + nodes.add(node); + } else { + nodes.set(0, node); + } + minWeight = weight; + this.decision = decision; + } + + public void clearAndUpdateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision) { + nodes.clear(); + nodes.add(node); + minWeight = weight; + this.decision = decision; + } + + public BalancedShardsAllocator.ModelNode getMinNode(boolean allowRandomAllocation) { + if (allowRandomAllocation) { + return nodes.isEmpty() ? null : nodes.get(Randomness.get().nextInt(nodes.size())); + } else { + return nodes.isEmpty() ? null : nodes.get(0); + } + } + } + + MinWeightedNodeDecision updateEligibleNodes( + MinWeightedNodeDecision minWeightedNodeDecision, + Decision currentDecision, + float currentWeight, + boolean allowRandomAllocation, + ShardRouting shard, + BalancedShardsAllocator.ModelNode node + ) { + if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { + if (allowRandomAllocation) { + /* General Algorithm + * 1. If weights are not equal: + * 1.a Update the list if the new weight is less than minWeight regardless of decision(THROTTLE|YES) + * 2. If weights are equal: + * 2.a If current weight is YES: + * 2.a.1 Append to list if new decision is YES + * 2.a.2 No op in case the new decision is THROTTLE + * 2.b If current decision is THROTTLE: + * 2.b.1 Update the list if new decision is YES + * 2.b.2 Append to list if new decision is THROTTLE + */ + if (currentWeight == minWeightedNodeDecision.minWeight) { + if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { + minWeightedNodeDecision.updateMinNode(node, allowRandomAllocation); + } else { + if (currentDecision.type() == Decision.Type.YES) { + // The decision is throttle in this case, since we prefer YES decision, we will clear the old nodes + minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision); + } + } + } else { + if (currentWeight < minWeightedNodeDecision.minWeight) { + minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision); + } + } + } else { + final boolean updateMinNode; + BalancedShardsAllocator.ModelNode minNode = null; + if (!minWeightedNodeDecision.nodes.isEmpty()) { + minNode = minWeightedNodeDecision.nodes.get(0); + } + if (currentWeight == minWeightedNodeDecision.minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index().getName()); + final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); + updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) + && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); + } else { + updateMinNode = currentDecision.type() == Decision.Type.YES; + } + } else { + updateMinNode = currentWeight < minWeightedNodeDecision.minWeight; + } + if (updateMinNode) { + minWeightedNodeDecision.updateMinNode(node, currentWeight, currentDecision, allowRandomAllocation); + } + } + + } + + return minWeightedNodeDecision; + } + /** * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the @@ -890,10 +1017,8 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } /* find an node with minimal weight we can allocate on*/ - float minWeight = Float.POSITIVE_INFINITY; - BalancedShardsAllocator.ModelNode minNode = null; - List minNodes = new ArrayList<>(); - Decision decision = null; + MinWeightedNodeDecision minWeightedNodeDecision = new MinWeightedNodeDecision(new ArrayList<>(), Float.POSITIVE_INFINITY, null); + /* Don't iterate over an identity hashset here the * iteration order is different for each run and makes testing hard */ Map nodeExplanationMap = explain ? new HashMap<>() : null; @@ -907,7 +1032,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { // weight of this index currently on the node float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); // moving the shard would not improve the balance, and we are not in explain mode, so short circuit - if (currentWeight > minWeight && explain == false) { + if (currentWeight > minWeightedNodeDecision.minWeight && explain == false) { continue; } @@ -916,57 +1041,19 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { - final boolean updateMinNode; - if (currentWeight == minWeight) { - /* we have an equal weight tie breaking: - * 1. if one decision is YES prefer it - * 2. prefer the node that holds the primary for this index with the next id in the ring ie. - * for the 3 shards 2 replica case we try to build up: - * 1 2 0 - * 2 0 1 - * 0 1 2 - * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater - * than the id of the shard we need to assign. This works find when new indices are created since - * primaries are added first and we only add one shard set a time in this algorithm. - */ - if (currentDecision.type() == decision.type()) { - final int repId = shard.id(); - final int nodeHigh = node.highestPrimary(shard.index().getName()); - final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); - updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) - && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); - minNodes.add(node); - } else { - updateMinNode = currentDecision.type() == Decision.Type.YES; - /* If updateMinNode is true, it means the earlier nodes had decision type THROTTLE. We will need to clear the list, - * and add new nodes to the list. - */ - if (updateMinNode) { - minNodes.clear(); - minNodes.add(node); - } - } - } else { - updateMinNode = currentWeight < minWeight; - /* Since we have found nodes with less weight. We will need to clear the earlier minNodes list - * and add the new nodes to the list. - */ - if (updateMinNode) { - minNodes.clear(); - minNodes.add(node); - } - } - if (updateMinNode) { - minNode = node; - minWeight = currentWeight; - decision = currentDecision; - } - } + minWeightedNodeDecision = updateEligibleNodes( + minWeightedNodeDecision, + currentDecision, + currentWeight, + preferRandomShardAllocation, + shard, + node + ); } - if (decision == null) { + + if (minWeightedNodeDecision.decision == null) { // decision was not set and a node was not assigned, so treat it as a NO decision - decision = Decision.NO; + minWeightedNodeDecision.decision = Decision.NO; } List nodeDecisions = null; if (explain) { @@ -980,11 +1067,12 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } } - if (preferRandomShardAllocation && !minNodes.isEmpty()) { - minNode = minNodes.get(Randomness.get().nextInt(minNodes.size())); - } - - return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions); + BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(preferRandomShardAllocation); + return AllocateUnassignedDecision.fromDecision( + minWeightedNodeDecision.decision, + minNode != null ? minNode.getRoutingNode().node() : null, + nodeDecisions + ); } private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index b6af7e72516cc..3bcd4e9491466 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -253,7 +253,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, - BalancedShardsAllocator.PREFER_RANDOM_SHARD_ALLOCATION, + BalancedShardsAllocator.ALLOW_RANDOM_ALLOCATION, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index 9bae46d292899..f63294ff2e793 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -163,7 +163,7 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrima settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); - settings.put(BalancedShardsAllocator.PREFER_RANDOM_SHARD_ALLOCATION.getKey(), randomBoolean()); + settings.put(BalancedShardsAllocator.ALLOW_RANDOM_ALLOCATION.getKey(), randomBoolean()); return settings; } From 462f286d279d337810a226762310a4dee0fe1a42 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Sun, 24 Mar 2024 20:14:03 +0530 Subject: [PATCH 14/16] address comments Signed-off-by: Arpit Bandejiya --- .../cluster/routing/allocation/BalanceConfigurationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index f63294ff2e793..fc5721a5380ed 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -92,7 +92,7 @@ public class BalanceConfigurationTests extends OpenSearchAllocationTestCase { public void testIndexBalance() { /* Tests balance over indices only */ final float indexBalance = 1.0f; - final float shardBalance = 0.001f; + final float shardBalance = 0.0f; final float balanceThreshold = 1.0f; Settings.Builder settings = Settings.builder(); From c2baa39470d1d3a17001f8444227cc53962b973e Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Wed, 27 Mar 2024 14:06:49 +0530 Subject: [PATCH 15/16] Address comments Signed-off-by: Arpit Bandejiya --- .../SegmentReplicationAllocationIT.java | 87 +++++++- .../allocation/AllocationConstraints.java | 7 +- .../allocation/AllocationParameter.java | 24 -- .../routing/allocation/ConstraintTypes.java | 2 +- .../allocator/BalancedShardsAllocator.java | 29 +-- .../allocator/LocalShardsBalancer.java | 207 +++++------------- .../common/settings/ClusterSettings.java | 1 - .../allocation/BalanceConfigurationTests.java | 100 +++++++-- 8 files changed, 224 insertions(+), 233 deletions(-) delete mode 100644 server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java index 30edea6551067..669e24f9fb555 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationAllocationIT.java @@ -31,6 +31,9 @@ import java.util.stream.Collectors; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE; +import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) @@ -58,6 +61,20 @@ public void enablePreferPrimaryBalance() { ); } + public void setAllocationRelocationStrategy(boolean preferPrimaryBalance, boolean preferPrimaryRebalance, float buffer) { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder() + .put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), preferPrimaryBalance) + .put(PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance) + .put(PRIMARY_SHARD_REBALANCE_BUFFER.getKey(), buffer) + ) + ); + } + /** * This test verifies that the overall primary balance is attained during allocation. This test verifies primary * balance per index and across all indices is maintained. @@ -87,7 +104,7 @@ public void testGlobalPrimaryAllocation() throws Exception { state = client().admin().cluster().prepareState().execute().actionGet().getState(); logger.info(ShardAllocations.printShardDistribution(state)); verifyPerIndexPrimaryBalance(); - verifyPrimaryBalance(); + verifyPrimaryBalance(0.0f); } /** @@ -224,6 +241,70 @@ public void testAllocationWithDisruption() throws Exception { verifyPerIndexPrimaryBalance(); } + /** + * Similar to testSingleIndexShardAllocation test but creates multiple indices, multiple nodes adding in and getting + * removed. The test asserts post each such event that primary shard distribution is balanced for each index as well as across the nodes + * when the PREFER_PRIMARY_SHARD_REBALANCE is set to true + */ + public void testAllocationAndRebalanceWithDisruption() throws Exception { + internalCluster().startClusterManagerOnlyNode(); + final int maxReplicaCount = 2; + final int maxShardCount = 2; + // Create higher number of nodes than number of shards to reduce chances of SameShardAllocationDecider kicking-in + // and preventing primary relocations + final int nodeCount = randomIntBetween(5, 10); + final int numberOfIndices = randomIntBetween(1, 10); + final float buffer = randomIntBetween(1, 4) * 0.10f; + + logger.info("--> Creating {} nodes", nodeCount); + final List nodeNames = new ArrayList<>(); + for (int i = 0; i < nodeCount; i++) { + nodeNames.add(internalCluster().startNode()); + } + setAllocationRelocationStrategy(true, true, buffer); + + int shardCount, replicaCount; + ClusterState state; + for (int i = 0; i < numberOfIndices; i++) { + shardCount = randomIntBetween(1, maxShardCount); + replicaCount = randomIntBetween(1, maxReplicaCount); + logger.info("--> Creating index test{} with primary {} and replica {}", i, shardCount, replicaCount); + createIndex("test" + i, shardCount, replicaCount, i % 2 == 0); + ensureGreen(TimeValue.timeValueSeconds(60)); + if (logger.isTraceEnabled()) { + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + } + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + + final int additionalNodeCount = randomIntBetween(1, 5); + logger.info("--> Adding {} nodes", additionalNodeCount); + + internalCluster().startNodes(additionalNodeCount); + ensureGreen(TimeValue.timeValueSeconds(60)); + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + + int nodeCountToStop = additionalNodeCount; + while (nodeCountToStop > 0) { + internalCluster().stopRandomDataNode(); + // give replica a chance to promote as primary before terminating node containing the replica + ensureGreen(TimeValue.timeValueSeconds(60)); + nodeCountToStop--; + } + state = client().admin().cluster().prepareState().execute().actionGet().getState(); + logger.info("--> Cluster state post nodes stop {}", state); + logger.info(ShardAllocations.printShardDistribution(state)); + verifyPerIndexPrimaryBalance(); + verifyPrimaryBalance(buffer); + } + /** * Utility method which ensures cluster has balanced primary shard distribution across a single index. * @throws Exception exception @@ -263,7 +344,7 @@ private void verifyPerIndexPrimaryBalance() throws Exception { }, 60, TimeUnit.SECONDS); } - private void verifyPrimaryBalance() throws Exception { + private void verifyPrimaryBalance(float buffer) throws Exception { assertBusy(() -> { final ClusterState currentState = client().admin().cluster().prepareState().execute().actionGet().getState(); RoutingNodes nodes = currentState.getRoutingNodes(); @@ -278,7 +359,7 @@ private void verifyPrimaryBalance() throws Exception { .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount <= avgPrimaryShardsPerNode); + assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer))); } }, 60, TimeUnit.SECONDS); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index a7732cde0d7f0..fb39ba0053486 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -28,14 +28,11 @@ public class AllocationConstraints { private Map constraints; - public AllocationConstraints(AllocationParameter allocationParameter) { + public AllocationConstraints() { this.constraints = new HashMap<>(); this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent( - CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, - new Constraint(isPrimaryShardsPerNodeBreached(allocationParameter.getPreferPrimaryBalanceBuffer())) - ); + this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); } public void updateAllocationConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java deleted file mode 100644 index 2444f6405278e..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationParameter.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.routing.allocation; - -/** - * RebalanceConstraint Params - */ -public class AllocationParameter { - private final float preferPrimaryBalanceBuffer; - - public AllocationParameter(float preferPrimaryBalanceBuffer) { - this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer; - } - - public float getPreferPrimaryBalanceBuffer() { - return preferPrimaryBalanceBuffer; - } -} diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java index fa2bff875a6d5..08fe8f92d1f80 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ConstraintTypes.java @@ -83,7 +83,7 @@ public static Predicate isPrimaryShardsPerNodeBreac return (params) -> { int primaryShardCount = params.getNode().numPrimaryShards(); int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer)); - return primaryShardCount > allowedPrimaryShardCount; + return primaryShardCount >= allowedPrimaryShardCount; }; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index ef971d9cd5650..c29759fd90fc4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -43,7 +43,6 @@ import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.AllocationConstraints; -import org.opensearch.cluster.routing.allocation.AllocationParameter; import org.opensearch.cluster.routing.allocation.ConstraintTypes; import org.opensearch.cluster.routing.allocation.MoveDecision; import org.opensearch.cluster.routing.allocation.RebalanceConstraints; @@ -163,16 +162,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); - /** - * This setting governs whether shards should be randomly allocated among the eligible nodes during assignment. - */ - public static final Setting ALLOW_RANDOM_ALLOCATION = Setting.boolSetting( - "cluster.routing.allocation.allow_random", - false, - Property.Dynamic, - Property.NodeScope - ); - private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -183,7 +172,6 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; - private volatile boolean preferRandomShardAllocation; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -199,7 +187,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); - setPreferRandomShardAllocation(ALLOW_RANDOM_ALLOCATION.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -208,7 +195,6 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); - clusterSettings.addSettingsUpdateConsumer(ALLOW_RANDOM_ALLOCATION, this::setPreferRandomShardAllocation); } /** @@ -283,10 +269,6 @@ private void setThreshold(float threshold) { this.threshold = threshold; } - private void setPreferRandomShardAllocation(boolean preferRandomShardAllocation) { - this.preferRandomShardAllocation = preferRandomShardAllocation; - } - @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -300,8 +282,7 @@ public void allocate(RoutingAllocation allocation) { weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance, - preferRandomShardAllocation + preferPrimaryShardRebalance ); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); @@ -323,8 +304,7 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance, - preferRandomShardAllocation + preferPrimaryShardRebalance ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; @@ -439,9 +419,8 @@ static class WeightFunction { theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; - AllocationParameter allocationParameter = new AllocationParameter(0.0f); RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); - this.constraints = new AllocationConstraints(allocationParameter); + this.constraints = new AllocationConstraints(); this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); // Enable index shard per node breach constraint updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); @@ -579,7 +558,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index bac057759fb66..ec25d041bda43 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -28,7 +28,6 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; -import org.opensearch.common.Randomness; import org.opensearch.common.collect.Tuple; import org.opensearch.gateway.PriorityComparator; @@ -71,7 +70,6 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; private final Set inEligibleTargetNode; - private final boolean preferRandomShardAllocation; public LocalShardsBalancer( Logger logger, @@ -80,8 +78,7 @@ public LocalShardsBalancer( BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance, - boolean preferPrimaryRebalance, - boolean preferRandomShardAllocation + boolean preferPrimaryRebalance ) { this.logger = logger; this.allocation = allocation; @@ -98,7 +95,6 @@ public LocalShardsBalancer( this.preferPrimaryBalance = preferPrimaryBalance; this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; - this.preferRandomShardAllocation = preferRandomShardAllocation; } /** @@ -872,134 +868,6 @@ void allocateUnassigned() { // clear everything we have either added it or moved to ignoreUnassigned } - private class MinWeightedNodeDecision { - public float minWeight; - public Decision decision; - public List nodes; - - public MinWeightedNodeDecision(List nodes, float minWeight, Decision decision) { - this.minWeight = minWeight; - this.decision = decision; - this.nodes = nodes; - } - - public void updateMinNode(BalancedShardsAllocator.ModelNode node, boolean allowRandomAllocation) { - if (allowRandomAllocation) { - nodes.add(node); - } else { - if (nodes.isEmpty()) { - nodes.add(node); - } else { - nodes.set(0, node); - } - } - } - - public void updateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision, boolean allowRandomAllocation) { - if (allowRandomAllocation) { - nodes.add(node); - } else { - if (nodes.isEmpty()) { - nodes.add(node); - } else { - nodes.set(0, node); - } - } - minWeight = weight; - this.decision = decision; - } - - public void clearAndUpdateMinNode(BalancedShardsAllocator.ModelNode node, float weight, Decision decision) { - nodes.clear(); - nodes.add(node); - minWeight = weight; - this.decision = decision; - } - - public BalancedShardsAllocator.ModelNode getMinNode(boolean allowRandomAllocation) { - if (allowRandomAllocation) { - return nodes.isEmpty() ? null : nodes.get(Randomness.get().nextInt(nodes.size())); - } else { - return nodes.isEmpty() ? null : nodes.get(0); - } - } - } - - MinWeightedNodeDecision updateEligibleNodes( - MinWeightedNodeDecision minWeightedNodeDecision, - Decision currentDecision, - float currentWeight, - boolean allowRandomAllocation, - ShardRouting shard, - BalancedShardsAllocator.ModelNode node - ) { - if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { - if (allowRandomAllocation) { - /* General Algorithm - * 1. If weights are not equal: - * 1.a Update the list if the new weight is less than minWeight regardless of decision(THROTTLE|YES) - * 2. If weights are equal: - * 2.a If current weight is YES: - * 2.a.1 Append to list if new decision is YES - * 2.a.2 No op in case the new decision is THROTTLE - * 2.b If current decision is THROTTLE: - * 2.b.1 Update the list if new decision is YES - * 2.b.2 Append to list if new decision is THROTTLE - */ - if (currentWeight == minWeightedNodeDecision.minWeight) { - if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { - minWeightedNodeDecision.updateMinNode(node, allowRandomAllocation); - } else { - if (currentDecision.type() == Decision.Type.YES) { - // The decision is throttle in this case, since we prefer YES decision, we will clear the old nodes - minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision); - } - } - } else { - if (currentWeight < minWeightedNodeDecision.minWeight) { - minWeightedNodeDecision.clearAndUpdateMinNode(node, currentWeight, currentDecision); - } - } - } else { - final boolean updateMinNode; - BalancedShardsAllocator.ModelNode minNode = null; - if (!minWeightedNodeDecision.nodes.isEmpty()) { - minNode = minWeightedNodeDecision.nodes.get(0); - } - if (currentWeight == minWeightedNodeDecision.minWeight) { - /* we have an equal weight tie breaking: - * 1. if one decision is YES prefer it - * 2. prefer the node that holds the primary for this index with the next id in the ring ie. - * for the 3 shards 2 replica case we try to build up: - * 1 2 0 - * 2 0 1 - * 0 1 2 - * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater - * than the id of the shard we need to assign. This works find when new indices are created since - * primaries are added first and we only add one shard set a time in this algorithm. - */ - if (currentDecision.type() == minWeightedNodeDecision.decision.type()) { - final int repId = shard.id(); - final int nodeHigh = node.highestPrimary(shard.index().getName()); - final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); - updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) - && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); - } else { - updateMinNode = currentDecision.type() == Decision.Type.YES; - } - } else { - updateMinNode = currentWeight < minWeightedNodeDecision.minWeight; - } - if (updateMinNode) { - minWeightedNodeDecision.updateMinNode(node, currentWeight, currentDecision, allowRandomAllocation); - } - } - - } - - return minWeightedNodeDecision; - } - /** * Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the * first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the @@ -1021,8 +889,9 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { } /* find an node with minimal weight we can allocate on*/ - MinWeightedNodeDecision minWeightedNodeDecision = new MinWeightedNodeDecision(new ArrayList<>(), Float.POSITIVE_INFINITY, null); - + float minWeight = Float.POSITIVE_INFINITY; + BalancedShardsAllocator.ModelNode minNode = null; + Decision decision = null; /* Don't iterate over an identity hashset here the * iteration order is different for each run and makes testing hard */ Map nodeExplanationMap = explain ? new HashMap<>() : null; @@ -1036,7 +905,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { // weight of this index currently on the node float currentWeight = weight.weightWithAllocationConstraints(this, node, shard.getIndexName()); // moving the shard would not improve the balance, and we are not in explain mode, so short circuit - if (currentWeight > minWeightedNodeDecision.minWeight && explain == false) { + if (currentWeight > minWeight && explain == false) { continue; } @@ -1045,19 +914,42 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeExplanationMap.put(node.getNodeId(), new NodeAllocationResult(node.getRoutingNode().node(), currentDecision, 0)); nodeWeights.add(Tuple.tuple(node.getNodeId(), currentWeight)); } - minWeightedNodeDecision = updateEligibleNodes( - minWeightedNodeDecision, - currentDecision, - currentWeight, - preferRandomShardAllocation, - shard, - node - ); + if (currentDecision.type() == Decision.Type.YES || currentDecision.type() == Decision.Type.THROTTLE) { + final boolean updateMinNode; + if (currentWeight == minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index().getName()); + final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); + updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) + && (nodeHigh < minNodeHigh)) || (nodeHigh > repId && minNodeHigh < repId)); + } else { + updateMinNode = currentDecision.type() == Decision.Type.YES; + } + } else { + updateMinNode = currentWeight < minWeight; + } + if (updateMinNode) { + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } + } } - - if (minWeightedNodeDecision.decision == null) { + if (decision == null) { // decision was not set and a node was not assigned, so treat it as a NO decision - minWeightedNodeDecision.decision = Decision.NO; + decision = Decision.NO; } List nodeDecisions = null; if (explain) { @@ -1070,13 +962,7 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { nodeDecisions.add(new NodeAllocationResult(current.getNode(), current.getCanAllocateDecision(), ++weightRanking)); } } - - BalancedShardsAllocator.ModelNode minNode = minWeightedNodeDecision.getMinNode(preferRandomShardAllocation); - return AllocateUnassignedDecision.fromDecision( - minWeightedNodeDecision.decision, - minNode != null ? minNode.getRoutingNode().node() : null, - nodeDecisions - ); + return AllocateUnassignedDecision.fromDecision(decision, minNode != null ? minNode.getRoutingNode().node() : null, nodeDecisions); } private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed(); @@ -1112,11 +998,18 @@ private boolean tryRelocateShard(BalancedShardsAllocator.ModelNode minNode, Bala continue; } // This is a safety net which prevents un-necessary primary shard relocations from maxNode to minNode when - // doing such relocation wouldn't help in primary balance. - if (preferPrimaryBalance == true && shard.primary() && maxNode.numPrimaryShards() - minNode.numPrimaryShards() < 2) { + // doing such relocation wouldn't help in primary balance. The condition won't be applicable when we enable node level + // primary rebalance + if (preferPrimaryBalance == true + && preferPrimaryRebalance == false + && shard.primary() + && maxNode.numPrimaryShards(shard.getIndexName()) - minNode.numPrimaryShards(shard.getIndexName()) < 2) { + continue; + } + // Relax the above condition to per node to allow rebalancing to attain global balance + if (preferPrimaryRebalance == true && shard.primary() && maxNode.numPrimaryShards() - minNode.numPrimaryShards() < 2) { continue; } - final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision); maxNode.removeShard(shard); long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index cbb4588dfd4e4..7d61e6dd49835 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -254,7 +254,6 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.PRIMARY_SHARD_REBALANCE_BUFFER, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE, BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE, - BalancedShardsAllocator.ALLOW_RANDOM_ALLOCATION, BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java index fc5721a5380ed..11cbe89645657 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/BalanceConfigurationTests.java @@ -163,7 +163,6 @@ private Settings.Builder getSettingsBuilderForPrimaryBalance(boolean preferPrima settings.put(BalancedShardsAllocator.PREFER_PRIMARY_SHARD_REBALANCE.getKey(), preferPrimaryRebalance); settings.put(BalancedShardsAllocator.SHARD_BALANCE_FACTOR_SETTING.getKey(), shardBalance); settings.put(BalancedShardsAllocator.THRESHOLD_SETTING.getKey(), balanceThreshold); - settings.put(BalancedShardsAllocator.ALLOW_RANDOM_ALLOCATION.getKey(), randomBoolean()); return settings; } @@ -251,16 +250,43 @@ public void testPrimaryBalanceWithPreferPrimaryBalanceSetting() { assertTrue(balanceFailed <= 1); } + /** + * This test verifies primary shard balance is attained setting. + */ + public void testPrimaryBalanceNotSolvedForNodeDropWithPreferPrimaryBalanceSetting() { + final int numberOfNodes = 4; + final int numberOfIndices = 4; + final int numberOfShards = 4; + final int numberOfReplicas = 1; + final int numberOfRuns = 5; + final float buffer = 0.10f; + int balanceFailed = 0; + + AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryBalance().build(), new TestGatewayAllocator()); + for (int i = 0; i < numberOfRuns; i++) { + ClusterState clusterState = initCluster(strategy, numberOfIndices, numberOfNodes, numberOfShards, numberOfReplicas); + clusterState = removeOneNode(clusterState, strategy); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + try { + verifyPrimaryBalance(clusterState, buffer); + } catch (AssertionError | Exception e) { + balanceFailed++; + logger.info("Unexpected assertion failure"); + } + } + assertTrue(balanceFailed >= 4); + } + /** * This test verifies primary shard balance is attained with PREFER_PRIMARY_SHARD_BALANCE setting. */ - public void testPrimaryBalanceWithPreferPrimaryReBalanceSetting() { + public void testPrimaryBalanceSolvedWithPreferPrimaryRebalanceSetting() { final int numberOfNodes = 4; final int numberOfIndices = 4; final int numberOfShards = 4; final int numberOfReplicas = 1; final int numberOfRuns = 5; - final float buffer = 0.05f; + final float buffer = 0.10f; int balanceFailed = 0; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); @@ -410,7 +436,7 @@ public void testGlobalPrimaryBalance() throws Exception { clusterState = addIndex(clusterState, strategy, "test-index3", 1, 1); logger.info(ShardAllocations.printShardDistribution(clusterState)); - verifyPrimaryBalance(clusterState); + verifyPrimaryBalance(clusterState, 0.0f); } /** @@ -418,18 +444,15 @@ public void testGlobalPrimaryBalance() throws Exception { * @throws Exception generic exception */ public void testGlobalPrimaryBalanceWithNodeDrops() throws Exception { - final float buffer = 0.05f; + final float buffer = 0.10f; AllocationService strategy = createAllocationService(getSettingsBuilderForPrimaryReBalance().build(), new TestGatewayAllocator()); ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build(); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); - clusterState = addNode(clusterState, strategy); + clusterState = addNodes(clusterState, strategy, 5); + + clusterState = addIndices(clusterState, strategy, 5, 1, 8); - clusterState = addIndex(clusterState, strategy, "test-index1", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index2", 5, 1); - clusterState = addIndex(clusterState, strategy, "test-index3", 5, 1); + logger.info(ShardAllocations.printShardDistribution(clusterState)); + verifyPrimaryBalance(clusterState, buffer); clusterState = removeOneNode(clusterState, strategy); @@ -596,7 +619,7 @@ private void verifyPerIndexPrimaryBalance(ClusterState currentState) { } } - private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throws Exception { + private void verifySkewedPrimaryBalance(ClusterState clusterState, int delta) throws Exception { assertBusy(() -> { RoutingNodes nodes = clusterState.getRoutingNodes(); int totalPrimaryShards = 0; @@ -604,18 +627,22 @@ private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throw totalPrimaryShards += index.primaryShardsActive(); } final int avgPrimaryShardsPerNode = (int) Math.ceil(totalPrimaryShards * 1f / clusterState.getRoutingNodes().size()); + int maxPrimaryShardOnNode = Integer.MIN_VALUE; + int minPrimaryShardOnNode = Integer.MAX_VALUE; for (RoutingNode node : nodes) { final int primaryCount = node.shardsWithState(STARTED) .stream() .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount < (avgPrimaryShardsPerNode * (1 + buffer))); + maxPrimaryShardOnNode = Math.max(maxPrimaryShardOnNode, primaryCount); + minPrimaryShardOnNode = Math.min(minPrimaryShardOnNode, primaryCount); } + assertTrue(maxPrimaryShardOnNode - minPrimaryShardOnNode < delta); }, 60, TimeUnit.SECONDS); } - private void verifyPrimaryBalance(ClusterState clusterState) throws Exception { + private void verifyPrimaryBalance(ClusterState clusterState, float buffer) throws Exception { assertBusy(() -> { RoutingNodes nodes = clusterState.getRoutingNodes(); int totalPrimaryShards = 0; @@ -629,7 +656,7 @@ private void verifyPrimaryBalance(ClusterState clusterState) throws Exception { .filter(ShardRouting::primary) .collect(Collectors.toList()) .size(); - assertTrue(primaryCount <= avgPrimaryShardsPerNode); + assertTrue(primaryCount <= (avgPrimaryShardsPerNode * (1 + buffer))); } }, 60, TimeUnit.SECONDS); } @@ -712,6 +739,34 @@ private ClusterState addIndex( return applyAllocationUntilNoChange(clusterState, strategy); } + private ClusterState addIndices( + ClusterState clusterState, + AllocationService strategy, + int numberOfShards, + int numberOfReplicas, + int numberOfIndices + ) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(clusterState.routingTable()); + + for (int i = 0; i < numberOfIndices; i++) { + IndexMetadata.Builder index = IndexMetadata.builder("test" + i) + .settings(settings(Version.CURRENT)) + .numberOfShards(numberOfShards) + .numberOfReplicas(numberOfReplicas); + + metadataBuilder = metadataBuilder.put(index); + routingTableBuilder.addAsNew(index.build()); + } + + clusterState = ClusterState.builder(clusterState) + .metadata(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) + .build(); + clusterState = strategy.reroute(clusterState, "indices-created"); + return applyAllocationUntilNoChange(clusterState, strategy); + } + private ClusterState initCluster( AllocationService strategy, int numberOfIndices, @@ -751,6 +806,17 @@ private ClusterState initCluster( return applyAllocationUntilNoChange(clusterState, strategy); } + private ClusterState addNodes(ClusterState clusterState, AllocationService strategy, int numberOfNodes) { + logger.info("now, start [{}] more node, check that rebalancing will happen because we set it to always", numberOfNodes); + DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); + for (int i = 0; i < numberOfNodes; i++) { + nodes.add(newNode("node" + (clusterState.nodes().getSize() + i))); + } + clusterState = ClusterState.builder(clusterState).nodes(nodes.build()).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + return applyStartedShardsUntilNoChange(clusterState, strategy); + } + private ClusterState addNode(ClusterState clusterState, AllocationService strategy) { logger.info("now, start 1 more node, check that rebalancing will happen because we set it to always"); clusterState = ClusterState.builder(clusterState) From b28f5e8842231dcde092b84ffff3c2ed9e5a9400 Mon Sep 17 00:00:00 2001 From: Arpit Bandejiya Date: Thu, 28 Mar 2024 17:27:53 +0530 Subject: [PATCH 16/16] Address the comments Signed-off-by: Arpit Bandejiya --- .../routing/allocation/AllocationConstraints.java | 6 +++--- .../routing/allocation/RebalanceConstraints.java | 4 ++-- .../allocation/allocator/BalancedShardsAllocator.java | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java index fb39ba0053486..6702db4b43e91 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationConstraints.java @@ -30,9 +30,9 @@ public class AllocationConstraints { public AllocationConstraints() { this.constraints = new HashMap<>(); - this.constraints.putIfAbsent(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); - this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); + this.constraints.put(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, new Constraint(isIndexShardsPerNodeBreached())); + this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.put(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(0.0f))); } public void updateAllocationConstraint(String constraint, boolean enable) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java index 6e86dcc706aef..2c2138af18abc 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/RebalanceConstraints.java @@ -31,8 +31,8 @@ public class RebalanceConstraints { public RebalanceConstraints(RebalanceParameter rebalanceParameter) { this.constraints = new HashMap<>(); - this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); - this.constraints.putIfAbsent( + this.constraints.put(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached())); + this.constraints.put( CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(rebalanceParameter.getPreferPrimaryBalanceBuffer())) ); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c29759fd90fc4..b2443490dd973 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -182,7 +182,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); - setWeightFunction(); + updateWeightFunction(); setThreshold(THRESHOLD_SETTING.get(settings)); setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); @@ -232,20 +232,20 @@ private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebala private void updateIndexBalanceFactor(float indexBalanceFactor) { this.indexBalanceFactor = indexBalanceFactor; - setWeightFunction(); + updateWeightFunction(); } private void updateShardBalanceFactor(float shardBalanceFactor) { this.shardBalanceFactor = shardBalanceFactor; - setWeightFunction(); + updateWeightFunction(); } private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardBalanceBuffer; - setWeightFunction(); + updateWeightFunction(); } - private void setWeightFunction() { + private void updateWeightFunction() { weightFunction = new WeightFunction(this.indexBalanceFactor, this.shardBalanceFactor, this.preferPrimaryShardRebalanceBuffer); }