diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 26a04de31ce39..61e7aaed5ecff 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -44,7 +44,6 @@ import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; -import java.util.Locale; import java.util.function.BiFunction; import static org.opensearch.cluster.routing.allocation.decider.Decision.THROTTLE; @@ -211,20 +210,9 @@ private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNo allocation, currentInRecoveries, replicasInitialRecoveries, - (x, y) -> getInitialPrimaryNodeOutgoingRecoveries(x, y), + this::getInitialPrimaryNodeOutgoingRecoveries, replicasInitialRecoveries, - String.format( - Locale.ROOT, - "[%s=%d]", - CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), - replicasInitialRecoveries - ), - String.format( - Locale.ROOT, - "[%s=%d]", - CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), - replicasInitialRecoveries - ) + true ); } @@ -238,22 +226,9 @@ private Decision allocateNonInitialShardCopies(ShardRouting shardRouting, Routin allocation, currentInRecoveries, concurrentIncomingRecoveries, - (x, y) -> getPrimaryNodeOutgoingRecoveries(x, y), + this::getPrimaryNodeOutgoingRecoveries, concurrentOutgoingRecoveries, - String.format( - Locale.ROOT, - "[%s=%d] (can also be set via [%s])", - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), - concurrentIncomingRecoveries, - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey() - ), - String.format( - Locale.ROOT, - "[%s=%d] (can also be set via [%s])", - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), - concurrentOutgoingRecoveries, - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey() - ) + false ); } @@ -274,18 +249,30 @@ private Decision allocateShardCopies( int inRecoveriesLimit, BiFunction primaryNodeOutRecoveriesFunc, int outRecoveriesLimit, - String incomingRecoveriesSettingMsg, - String outGoingRecoveriesSettingMsg + boolean isInitialShardCopies ) { // Allocating a shard to this node will increase the incoming recoveries if (currentInRecoveries >= inRecoveriesLimit) { - return allocation.decision( - THROTTLE, - NAME, - "reached the limit of incoming shard recoveries [%d], cluster setting %s", - currentInRecoveries, - incomingRecoveriesSettingMsg - ); + if (isInitialShardCopies) { + return allocation.decision( + THROTTLE, + NAME, + "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d]", + currentInRecoveries, + CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + inRecoveriesLimit + ); + } else { + return allocation.decision( + THROTTLE, + NAME, + "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d] (can also be set via [%s])", + currentInRecoveries, + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), + inRecoveriesLimit, + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey() + ); + } } else { // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); @@ -294,14 +281,30 @@ private Decision allocateShardCopies( } int primaryNodeOutRecoveries = primaryNodeOutRecoveriesFunc.apply(shardRouting, allocation); if (primaryNodeOutRecoveries >= outRecoveriesLimit) { - return allocation.decision( - THROTTLE, - NAME, - "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + "cluster setting %s", - primaryNodeOutRecoveries, - primaryShard.currentNodeId(), - outGoingRecoveriesSettingMsg - ); + if (isInitialShardCopies) { + return allocation.decision( + THROTTLE, + NAME, + "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + + "cluster setting [%s=%d]", + primaryNodeOutRecoveries, + primaryShard.currentNodeId(), + CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + inRecoveriesLimit + ); + } else { + return allocation.decision( + THROTTLE, + NAME, + "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + + "cluster setting [%s=%d] (can also be set via [%s])", + primaryNodeOutRecoveries, + primaryShard.currentNodeId(), + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), + outRecoveriesLimit, + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey() + ); + } } else { return allocation.decision( YES,