Skip to content

Commit

Permalink
Add test for move
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Aug 21, 2024
1 parent e426ffb commit c67486a
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;

public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase {

private final DiscoveryNode node1 = newNode("node1");
private final DiscoveryNode node2 = newNode("node2");
private final DiscoveryNode node3 = newNode("node3");
private final DiscoveryNode node1 = newNode("node1", "node1", Collections.singletonMap("zone", "1a"));
private final DiscoveryNode node2 = newNode("node2", "node2", Collections.singletonMap("zone", "1b"));
private final DiscoveryNode node3 = newNode("node3", "node3", Collections.singletonMap("zone", "1c"));

public void testAllUnassignedShardsAllocatedWhenNoTimeOut() {
int numberOfIndices = 2;
Expand All @@ -43,14 +45,23 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOut() {
int totalPrimaryCount = numberOfIndices * numberOfShards;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// passing total shard count for timed out latch such that no shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(
settings.build(),
clusterSettings,
new CountDownLatch(totalShardCount)
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(totalShardCount));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
Expand All @@ -67,10 +78,23 @@ public void testAllUnassignedShardsIgnoredWhenTimedOut() {
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// passing 0 for timed out latch such that all shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, new CountDownLatch(0));
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
Expand All @@ -87,15 +111,24 @@ public void testAllocatePartialPrimaryShardsUntilTimedOut() {
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int shardsToAllocate = randomIntBetween(1, numberOfShards * numberOfIndices);
// passing shards to allocate for timed out latch such that only few primary shards are allocated in this reroute round
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(
settings.build(),
clusterSettings,
new CountDownLatch(shardsToAllocate)
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
Expand All @@ -112,16 +145,25 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() {
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
int shardsToAllocate = randomIntBetween(numberOfShards * numberOfIndices, totalShardCount);
// passing shards to allocate for timed out latch such that all primary shards and few replica shards are allocated in this reroute
// round
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(
settings.build(),
clusterSettings,
new CountDownLatch(shardsToAllocate)
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(shardsToAllocate));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
Expand All @@ -132,20 +174,73 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOut() {
assertEquals(numberOfShards * numberOfIndices, node1Recoveries + node2Recoveries + node3Recoveries);
}

private RoutingAllocation buildRoutingAllocation(
AllocationDeciders deciders,
int numberOfIndices,
int numberOfShards,
int numberOfReplicas
) {
public void testAllShardsMoveWhenExcludedAndTimeoutNotBreached() {
int numberOfIndices = 3;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
MockAllocationService allocationService = createAllocationService();
state = applyStartedShardsUntilNoChange(state, allocationService);
// check all shards allocated
assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size());
int node1ShardCount = state.getRoutingNodes().node("node1").size();
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build();
int shardsToMove = 10 + 1000; // such that time out is never breached
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove));
RoutingAllocation allocation = new RoutingAllocation(
allocationDecidersForExcludeAPI(settings),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
allocator.allocate(allocation);
List<ShardRouting> relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING);
assertEquals(node1ShardCount, relocatingShards.size());
assertEquals(node1ShardCount, allocation.routingNodes().getRelocatingShardCount());
}

public void testNoShardsMoveWhenExcludedAndTimeoutNotBreached() {
int numberOfIndices = 3;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3))
.build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, ClusterInfo.EMPTY, null, System.nanoTime());
MockAllocationService allocationService = createAllocationService();
state = applyStartedShardsUntilNoChange(state, allocationService);
// check all shards allocated
assertEquals(0, state.getRoutingNodes().shardsWithState(INITIALIZING).size());
assertEquals(totalShardCount, state.getRoutingNodes().shardsWithState(STARTED).size());
Settings settings = Settings.builder().put("cluster.routing.allocation.exclude.zone", "1a").build();
int shardsToMove = 0; // such that time out is never breached
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings, new CountDownLatch(shardsToMove));
RoutingAllocation allocation = new RoutingAllocation(
allocationDecidersForExcludeAPI(settings),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
allocator.allocate(allocation);
List<ShardRouting> relocatingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.RELOCATING);
assertEquals(0, relocatingShards.size());
assertEquals(0, allocation.routingNodes().getRelocatingShardCount());
}

private RoutingTable buildRoutingTable(Metadata metadata) {
Expand All @@ -171,8 +266,8 @@ private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int num
static class TestBalancedShardsAllocator extends BalancedShardsAllocator {
private final CountDownLatch timedOutLatch;

public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, CountDownLatch timedOutLatch) {
super(settings, clusterSettings);
public TestBalancedShardsAllocator(Settings settings, CountDownLatch timedOutLatch) {
super(settings);
this.timedOutLatch = timedOutLatch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -213,6 +214,16 @@ protected static AllocationDeciders throttleAllocationDeciders() {
);
}

protected static AllocationDeciders allocationDecidersForExcludeAPI(Settings settings) {
return new AllocationDeciders(
Arrays.asList(
new TestAllocateDecision(Decision.YES),
new SameShardAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
new FilterAllocationDecider(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS))
)
);
}

protected ClusterState applyStartedShardsUntilNoChange(ClusterState clusterState, AllocationService service) {
ClusterState lastClusterState;
do {
Expand Down

0 comments on commit c67486a

Please sign in to comment.