Skip to content

Commit

Permalink
Fix responsibility check for existing shards allocator when timed out (
Browse files Browse the repository at this point in the history
…opensearch-project#15223)

* Fix responsibility check for existing shards allocator when timed out

Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Sep 4, 2024
1 parent 438cfc4 commit ada6cfd
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,20 @@ protected void allocateUnassignedBatchOnTimeout(Set<ShardId> shardIds, RoutingAl
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;
if (unassignedShard.primary() == primary && shardIds.contains(unassignedShard.shardId())) {
if (isResponsibleFor(unassignedShard) == false) {
continue;
}
allocationDecision = AllocateUnassignedDecision.throttle(null);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected abstract boolean isResponsibleFor(ShardRouting shardRouting);

protected void executeDecision(
ShardRouting shardRouting,
AllocateUnassignedDecision allocateUnassignedDecision,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public abstract class PrimaryShardAllocator extends BaseGatewayShardAllocator {
/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard) {
protected boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() // must be primary
&& shard.unassigned() // must be unassigned
// only handle either an existing store or a snapshot recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public void processExistingRecoveries(RoutingAllocation allocation) {
/**
* Is the allocator responsible for allocating the given {@link ShardRouting}?
*/
protected static boolean isResponsibleFor(final ShardRouting shard) {
protected boolean isResponsibleFor(final ShardRouting shard) {
return shard.primary() == false // must be a replica
&& shard.unassigned() // must be unassigned
// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
RoutingAllocation allocation,
Supplier<Map<DiscoveryNode, StoreFilesMetadata>> nodeStoreFileMetaDataMapSupplier
) {
if (!isResponsibleFor(shardRouting)) {
if (isResponsibleFor(shardRouting) == false) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shardRouting, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.stream.Collectors;

import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED;
import static org.opensearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED;

public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -283,40 +284,16 @@ public void testAllocateUnassignedBatchOnTimeoutWithNoMatchingPrimaryShards() {
assertEquals(0, ignoredShards.size());
}

public void testAllocateUnassignedBatchOnTimeoutWithNonPrimaryShards() {
public void testAllocateUnassignedBatchOnTimeoutSkipIgnoringNewPrimaryShards() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(1);
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, INDEX_CREATED);
ShardRouting shardRouting = routingAllocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard();

ShardRouting shardRouting = routingAllocation.routingTable()
.getIndicesRouting()
.get("test")
.shard(shardId.id())
.replicaShards()
.get(0);
Set<ShardId> shardIds = new HashSet<>();
shardIds.add(shardRouting.shardId());
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false);

List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
assertEquals(1, ignoredShards.size());
}

public void testAllocateUnassignedBatchOnTimeoutWithNoShards() {
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
AllocationDeciders allocationDeciders = randomAllocationDeciders(Settings.builder().build(), clusterSettings, random());
setUpShards(1);
final RoutingAllocation routingAllocation = routingAllocationWithOnePrimary(allocationDeciders, CLUSTER_RECOVERED, "allocId-0");

ShardRouting shardRouting = routingAllocation.routingTable()
.getIndicesRouting()
.get("test")
.shard(shardId.id())
.replicaShards()
.get(0);
Set<ShardId> shardIds = new HashSet<>();
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, false);
batchAllocator.allocateUnassignedBatchOnTimeout(shardIds, routingAllocation, true);

List<ShardRouting> ignoredShards = routingAllocation.routingNodes().unassigned().ignored();
assertEquals(0, ignoredShards.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,24 @@ public void testAllocateUnassignedBatchOnTimeoutWithAlreadyRecoveringReplicaShar
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
}

public void testAllocateUnassignedBatchOnTimeoutSkipIgnoringNewReplicaShards() {
RoutingAllocation allocation = onePrimaryOnNode1And1Replica(
yesAllocationDeciders(),
Settings.EMPTY,
UnassignedInfo.Reason.INDEX_CREATED
);
final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
Set<ShardId> shards = new HashSet<>();
while (iterator.hasNext()) {
ShardRouting sr = iterator.next();
if (sr.primary() == false) {
shards.add(sr.shardId());
}
}
testBatchAllocator.allocateUnassignedBatchOnTimeout(shards, allocation, false);
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
}

private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders) {
return onePrimaryOnNode1And1Replica(deciders, Settings.EMPTY, UnassignedInfo.Reason.CLUSTER_RECOVERED);
}
Expand Down

0 comments on commit ada6cfd

Please sign in to comment.