diff --git a/CHANGELOG.md b/CHANGELOG.md index 36415d25f8302..a06c6fd3c824f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -171,6 +171,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix per request latency last phase not tracked ([#10934](https://github.com/opensearch-project/OpenSearch/pull/10934)) - Fix for stuck update action in a bulk with `retry_on_conflict` property ([#11152](https://github.com/opensearch-project/OpenSearch/issues/11152)) - Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369)) +- Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167)) ### Security diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 75448520a499c..45f64a5b29b04 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -65,7 +65,6 @@ public class LocalShardsBalancer extends ShardsBalancer { private final float threshold; private final Metadata metadata; - private final float avgShardsPerNode; private final float avgPrimaryShardsPerNode; private final BalancedShardsAllocator.NodeSorter sorter; @@ -85,7 +84,6 @@ public LocalShardsBalancer( this.threshold = threshold; this.routingNodes = allocation.routingNodes(); this.metadata = allocation.metadata(); - avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); avgPrimaryShardsPerNode = (float) (StreamSupport.stream(metadata.spliterator(), false) .mapToInt(IndexMetadata::getNumberOfShards) .sum()) / routingNodes.size(); @@ -663,7 +661,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) { RoutingNode targetNode = null; final List nodeExplanationMap = explain ? new ArrayList<>() : null; int weightRanking = 0; - int targetNodeProcessed = 0; for (BalancedShardsAllocator.ModelNode currentNode : sorter.modelNodes) { if (currentNode != sourceNode) { RoutingNode target = currentNode.getRoutingNode(); @@ -677,7 +674,6 @@ MoveDecision decideMove(final ShardRouting shardRouting) { continue; } } - targetNodeProcessed++; // don't use canRebalance as we want hard filtering rules to apply. See #17698 Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); if (explain) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index cbbcd61fc5295..8a14ce3f1a288 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -406,7 +406,7 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti allocation.metadata(), allocation.routingTable() ); - ShardRouting initShard = routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes()); + routingNodes.initializeShard(shard, node.nodeId(), null, shardSize, allocation.changes()); nodeQueue.offer(node); allocated = true; break; @@ -444,7 +444,6 @@ private void tryAllocateUnassignedShard(Queue nodeQueue, ShardRouti // Break out if all nodes in the queue have been checked for this shard if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) { - throttled = true; break; } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java index 22c3156fb3537..1263efd19ac46 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/AllocationDeciders.java @@ -257,7 +257,7 @@ public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation al Decision.Multi ret = new Decision.Multi(); for (AllocationDecider decider : allocations) { Decision decision = decider.canAllocateAnyShardToNode(node, allocation); - if (decision.type().canPremptivelyReturn()) { + if (decision.type().canPreemptivelyReturn()) { if (logger.isTraceEnabled()) { logger.trace("Shard can not be allocated on node [{}] due to [{}]", node.nodeId(), decider.getClass().getSimpleName()); } @@ -279,7 +279,7 @@ public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocat for (AllocationDecider decider : allocations) { Decision decision = decider.canMoveAway(shardRouting, allocation); // short track if a NO is returned. - if (decision.type().canPremptivelyReturn()) { + if (decision.type().canPreemptivelyReturn()) { if (logger.isTraceEnabled()) { logger.trace("Shard [{}] can not be moved away due to [{}]", shardRouting, decider.getClass().getSimpleName()); } @@ -301,7 +301,7 @@ public Decision canMoveAnyShard(RoutingAllocation allocation) { for (AllocationDecider decider : allocations) { Decision decision = decider.canMoveAnyShard(allocation); // short track if a NO is returned. - if (decision.type().canPremptivelyReturn()) { + if (decision.type().canPreemptivelyReturn()) { if (allocation.debugDecision() == false) { return decision; } else { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java index ac5a18c3fcb21..938c457606c79 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/Decision.java @@ -144,7 +144,7 @@ public boolean higherThan(Type other) { return false; } - public boolean canPremptivelyReturn() { + public boolean canPreemptivelyReturn() { return this == THROTTLE || this == NO; } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java index ed178ed7e1526..0be1a1f36118d 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsAllocateUnassignedTests.java @@ -13,11 +13,16 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.allocator.RemoteShardsBalancer; import java.util.HashMap; import java.util.Map; +import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_NO; +import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED; +import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.NO_ATTEMPT; + public class RemoteShardsAllocateUnassignedTests extends RemoteShardsBalancerBaseTestCase { /** @@ -89,6 +94,38 @@ public void testPrimaryAllocation() { } } + /** + * Test remote unassigned shard allocation when deciders make NO or THROTTLED decision. + */ + public void testNoRemoteAllocation() { + final int localOnlyNodes = 10; + final int remoteCapableNodes = 5; + final int localIndices = 2; + final int remoteIndices = 1; + final ClusterState oldState = createInitialCluster(localOnlyNodes, remoteCapableNodes, localIndices, remoteIndices); + final boolean throttle = randomBoolean(); + final AllocationService service = this.createRejectRemoteAllocationService(throttle); + final ClusterState newState = allocateShardsAndBalance(oldState, service); + final RoutingNodes routingNodes = newState.getRoutingNodes(); + final RoutingAllocation allocation = getRoutingAllocation(newState, routingNodes); + + assertEquals(totalShards(remoteIndices), routingNodes.unassigned().size()); + + for (ShardRouting shard : newState.getRoutingTable().allShards()) { + if (RoutingPool.getShardPool(shard, allocation) == RoutingPool.REMOTE_CAPABLE) { + assertTrue(shard.unassigned()); + if (shard.primary()) { + final UnassignedInfo.AllocationStatus expect = throttle ? DECIDERS_THROTTLED : DECIDERS_NO; + assertEquals(expect, shard.unassignedInfo().getLastAllocationStatus()); + } else { + assertEquals(NO_ATTEMPT, shard.unassignedInfo().getLastAllocationStatus()); + } + } else { + assertFalse(shard.unassigned()); + } + } + } + /** * Test remote unassigned shard allocation when remote capable nodes fail to come up. */ diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java index dbb08a999877d..a1db6cd83ab6c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RemoteShardsBalancerBaseTestCase.java @@ -20,7 +20,9 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.RoutingNodes; +import org.opensearch.cluster.routing.RoutingPool; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; @@ -28,6 +30,7 @@ import org.opensearch.cluster.routing.allocation.allocator.ShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; +import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -201,6 +204,36 @@ public AllocationService createRemoteCapableAllocationService(String excludeNode ); } + public AllocationService createRejectRemoteAllocationService(boolean throttle) { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + return new OpenSearchAllocationTestCase.MockAllocationService( + createRejectRemoteAllocationDeciders(throttle), + new TestGatewayAllocator(), + createShardAllocator(settings), + EmptyClusterInfoService.INSTANCE, + SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES + ); + } + + public AllocationDeciders createRejectRemoteAllocationDeciders(boolean throttle) { + Settings settings = Settings.Builder.EMPTY_SETTINGS; + List deciders = new ArrayList<>( + ClusterModule.createAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, Collections.emptyList()) + ); + deciders.add(new AllocationDecider() { + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + if (RoutingPool.REMOTE_CAPABLE.equals(RoutingPool.getShardPool(shardRouting, allocation))) { + return throttle ? Decision.THROTTLE : Decision.NO; + } else { + return Decision.ALWAYS; + } + } + }); + Collections.shuffle(deciders, random()); + return new AllocationDeciders(deciders); + } + public AllocationDeciders createAllocationDeciders() { Settings settings = Settings.Builder.EMPTY_SETTINGS; return randomAllocationDeciders(settings, EMPTY_CLUSTER_SETTINGS, random());