Skip to content

Commit

Permalink
Use count down latch to mark partial shards timed out
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Aug 21, 2024
1 parent 9ab5a29 commit 31f02cf
Showing 1 changed file with 27 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.CountDownLatch;

public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase {

Expand All @@ -37,38 +37,47 @@ public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationT
private final DiscoveryNode node3 = newNode("node3");

public void testAllUnassignedShardsAllocatedWhenNoTimeOut() {
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, false);
int numberOfIndices = 2;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalPrimaryCount = numberOfIndices * numberOfShards;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// passing total shard count for timed out latch such that no shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(
settings.build(),
clusterSettings,
new CountDownLatch(totalShardCount)
);
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId());
int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId());
assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), initializingShards.size());
assertEquals(totalShardCount, initializingShards.size());
assertEquals(0, allocation.routingNodes().unassigned().ignored().size());
assertEquals(numberOfIndices * numberOfShards, node1Recoveries + node2Recoveries + node3Recoveries);
assertEquals(totalPrimaryCount, node1Recoveries + node2Recoveries + node3Recoveries);
}

public void testAllUnassignedShardsIgnoredWhenTimedOut() {
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, true);
int numberOfIndices = 2;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder();
ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
// passing 0 for timed out latch such that all shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), clusterSettings, new CountDownLatch(0));
RoutingAllocation allocation = buildRoutingAllocation(yesAllocationDeciders(), numberOfIndices, numberOfShards, numberOfReplicas);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId());
int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId());
assertEquals(0, initializingShards.size());
assertEquals(numberOfIndices * (numberOfShards * (numberOfReplicas + 1)), allocation.routingNodes().unassigned().ignored().size());
assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size());
assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries);
}

Expand Down Expand Up @@ -109,16 +118,20 @@ private Metadata buildMetadata(Metadata.Builder mb, int numberOfIndices, int num
}

static class TestBalancedShardsAllocator extends BalancedShardsAllocator {
private final AtomicBoolean timedOut;
private final CountDownLatch timedOutLatch;

public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, boolean timedOut) {
public TestBalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings, CountDownLatch timedOutLatch) {
super(settings, clusterSettings);
this.timedOut = new AtomicBoolean(timedOut);
this.timedOutLatch = timedOutLatch;
}

@Override
protected boolean allocatorTimedOut(long currentTime) {
return timedOut.get();
if (timedOutLatch.getCount() == 0) {
return true;
}
timedOutLatch.countDown();
return false;
}
}
}

0 comments on commit 31f02cf

Please sign in to comment.