Skip to content

Commit

Permalink
Add cluster primary balance contraint for rebalancing with buffer
Browse files Browse the repository at this point in the history
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
  • Loading branch information
Arpit-Bandejiya committed Mar 14, 2024
1 parent 2b17902 commit 8aed71b
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,17 @@ public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreac
return primaryShardCount >= allowedPrimaryShardCount;
};
}

/**
* Defines a predicate which returns true when a node contains more than average number of primary shards. This
* constraint is used in weight calculation during allocation only. When breached a high weight {@link ConstraintTypes#CONSTRAINT_WEIGHT}
* is assigned to node resulting in lesser chances of node being selected as allocation target
*/
public static Predicate<Constraint.ConstraintParams> isPrimaryShardsPerNodeBreached(float buffer) {
return (params) -> {
int primaryShardCount = params.getNode().numPrimaryShards();
int allowedPrimaryShardCount = (int) Math.ceil(params.getBalancer().avgPrimaryShardsPerNode() * (1 + buffer));
return primaryShardCount >= allowedPrimaryShardCount;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPerIndexPrimaryShardsPerNodeBreached;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.isPrimaryShardsPerNodeBreached;

/**
* Constraints applied during rebalancing round; specify conditions which, if breached, reduce the
Expand All @@ -27,9 +29,10 @@ public class RebalanceConstraints {

private Map<String, Constraint> constraints;

public RebalanceConstraints() {
public RebalanceConstraints(float preferPrimaryBalanceBuffer) {
this.constraints = new HashMap<>();
this.constraints.putIfAbsent(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPerIndexPrimaryShardsPerNodeBreached()));
this.constraints.putIfAbsent(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, new Constraint(isPrimaryShardsPerNodeBreached(preferPrimaryBalanceBuffer)));
}

public void updateRebalanceConstraint(String constraint, boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,21 @@ public class BalancedShardsAllocator implements ShardsAllocator {
Property.NodeScope
);

public static final Setting<Float> PREFER_PRIMARY_SHARD_BALANCE_BUFFER = Setting.floatSetting(
"cluster.routing.allocation.balance.prefer_primary_balance_buffer",
0.05f,
0.0f,
Property.Dynamic,
Property.NodeScope
);

private volatile boolean movePrimaryFirst;
private volatile ShardMovementStrategy shardMovementStrategy;

private volatile boolean preferPrimaryShardBalance;
private volatile float preferPrimaryShardBalanceBuffer;
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private volatile float threshold;

Expand All @@ -158,14 +169,17 @@ public BalancedShardsAllocator(Settings settings) {

@Inject
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings), PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
setThreshold(THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings));
setPreferPrimaryShardBalanceBuffer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER.get(settings));
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}

Expand All @@ -190,8 +204,23 @@ private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrateg
}
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
private void setIndexBalanceFactor(float indexBalanceFactor) {
this.indexBalanceFactor = indexBalanceFactor;
setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer);
}

private void setShardBalanceFactor(float shardBalanceFactor) {
this.shardBalanceFactor = shardBalanceFactor;
setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer);
}

private void setPreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) {
this.preferPrimaryShardBalanceBuffer = preferPrimaryShardBalanceBuffer;
setWeightFunction(indexBalanceFactor, shardBalanceFactor, preferPrimaryShardBalanceBuffer);
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor, float preferPrimaryShardBalanceDelta) {
weightFunction = new WeightFunction(indexBalance, shardBalanceFactor, preferPrimaryShardBalanceDelta);
}

/**
Expand All @@ -203,6 +232,7 @@ private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

private void setThreshold(float threshold) {
Expand Down Expand Up @@ -347,8 +377,9 @@ static class WeightFunction {
private final float theta1;
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;
private final float preferPrimaryBalanceBuffer;

WeightFunction(float indexBalance, float shardBalance) {
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
Expand All @@ -358,7 +389,8 @@ static class WeightFunction {
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.constraints = new AllocationConstraints();
this.rebalanceConstraints = new RebalanceConstraints();
this.rebalanceConstraints = new RebalanceConstraints(preferPrimaryBalanceBuffer);
this.preferPrimaryBalanceBuffer = preferPrimaryBalanceBuffer;
// Enable index shard per node breach constraint
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
}
Expand Down

0 comments on commit 8aed71b

Please sign in to comment.