Skip to content

Commit

Permalink
Avoid cluster state
Browse files Browse the repository at this point in the history
  • Loading branch information
albertzaharovits committed Sep 6, 2024
1 parent cd39fc8 commit 5acbfb6
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@

package org.elasticsearch.action.support;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -122,21 +123,21 @@ public boolean enoughShardsActive(final int activeShardCount) {
* Returns true iff the given cluster state's routing table contains enough active
* shards for the given indices to meet the required shard count represented by this instance.
*/
public boolean enoughShardsActive(final ClusterState clusterState, final String... indices) {
public boolean enoughShardsActive(final Metadata metadata, final RoutingTable routingTable, final String... indices) {
if (this == ActiveShardCount.NONE) {
// not waiting for any active shards
return true;
}

for (final String indexName : indices) {
final IndexMetadata indexMetadata = clusterState.metadata().index(indexName);
final IndexMetadata indexMetadata = metadata.index(indexName);
if (indexMetadata == null) {
// its possible the index was deleted while waiting for active shard copies,
// in this case, we'll just consider it that we have enough active shard copies
// and we can stop waiting
continue;
}
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexName);
final IndexRoutingTable indexRoutingTable = routingTable.index(indexName);
if (indexRoutingTable == null && indexMetadata.getState() == IndexMetadata.State.CLOSE) {
// its possible the index was closed while waiting for active shard copies,
// in this case, we'll just consider it that we have enough active shard copies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void waitForActiveShards(
}

final ClusterState state = clusterService.state();
if (activeShardCount.enoughShardsActive(state, indexNames)) {
if (activeShardCount.enoughShardsActive(state.metadata(), state.routingTable(), indexNames)) {
listener.onResponse(true);
return;
}
Expand Down Expand Up @@ -82,7 +82,7 @@ public void onTimeout(TimeValue timeout) {
listener.onResponse(false);
}
},
newState -> activeShardCount.enoughShardsActive(newState, indexNames),
newState -> activeShardCount.enoughShardsActive(newState.metadata(), newState.routingTable(), indexNames),
timeout
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,11 @@ public void testEnoughShardsActiveZero() {
final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.NONE;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

public void testEnoughShardsActiveLevelOne() {
Expand All @@ -109,13 +109,13 @@ public void testEnoughShardsActiveLevelDefaultWithSearchOnlyRole() {
final int numberOfReplicas = randomIntBetween(4, 7);
final ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas, createCustomRoleStrategy(1));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, 1);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

public void testEnoughShardsActiveCustomLevelWithSearchOnlyRole() {
Expand All @@ -125,15 +125,15 @@ public void testEnoughShardsActiveCustomLevelWithSearchOnlyRole() {
final int activeShardCount = randomIntBetween(2, numberOfReplicas);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount);
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas, createCustomRoleStrategy(1));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startWaitOnShards(clusterState, indexName, activeShardCount);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

public void testEnoughShardsActiveWithNoSearchOnlyRoles() {
Expand All @@ -147,13 +147,13 @@ public void testEnoughShardsActiveWithNoSearchOnlyRoles() {
numberOfReplicas,
createCustomRoleStrategy(numberOfReplicas + 1)
);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, 1);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

private static ShardRoutingRoleStrategy createCustomRoleStrategy(int indexShardCount) {
Expand All @@ -177,15 +177,15 @@ public void testEnoughShardsActiveRandom() {
final int activeShardCount = randomIntBetween(2, numberOfReplicas);
final ActiveShardCount waitForActiveShards = ActiveShardCount.from(activeShardCount);
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, activeShardCount - 2);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startWaitOnShards(clusterState, indexName, activeShardCount - 1);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

public void testEnoughShardsActiveLevelAll() {
Expand All @@ -195,13 +195,13 @@ public void testEnoughShardsActiveLevelAll() {
// both values should represent "all"
final ActiveShardCount waitForActiveShards = randomBoolean() ? ActiveShardCount.from(numberOfReplicas + 1) : ActiveShardCount.ALL;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startLessThanWaitOnShards(clusterState, indexName, numberOfReplicas - randomIntBetween(1, numberOfReplicas));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

public void testEnoughShardsActiveValueBased() {
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testEnoughShardsActiveWithClosedIndex() {

final ClusterState clusterState = initializeWithClosedIndex(indexName, numberOfShards, numberOfReplicas);
for (ActiveShardCount waitForActiveShards : Arrays.asList(ActiveShardCount.DEFAULT, ActiveShardCount.ALL, ActiveShardCount.ONE)) {
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}
}

Expand All @@ -240,11 +240,11 @@ private void runTestForOneActiveShard(final ActiveShardCount waitForActiveShards
final int numberOfReplicas = randomIntBetween(4, 7);
assert waitForActiveShards == ActiveShardCount.ONE || waitForActiveShards == ActiveShardCount.DEFAULT;
ClusterState clusterState = initializeWithNewIndex(indexName, numberOfShards, numberOfReplicas);
assertFalse(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertFalse(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startPrimaries(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
clusterState = startAllShards(clusterState, indexName);
assertTrue(waitForActiveShards.enoughShardsActive(clusterState, indexName));
assertTrue(waitForActiveShards.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), indexName));
}

private ClusterState initializeWithNewIndex(String indexName, int numShards, int numReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().action(), index.getName());
return new Result(false, null);
}
if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
if (ActiveShardCount.ALL.enoughShardsActive(clusterState.metadata(), clusterState.routingTable(), index.getName()) == false) {
logger.debug(
"[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
getKey().action(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DesiredNodes;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.index.Index;
import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
Expand Down Expand Up @@ -46,7 +47,8 @@ public boolean isRetryable() {

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetadata idxMeta = clusterState.metadata().index(index);
Metadata metadata = clusterState.metadata();
IndexMetadata idxMeta = metadata.index(index);
if (idxMeta == null) {
// Index must have been since deleted, ignore it
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().action(), index.getName());
Expand All @@ -57,10 +59,10 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
preferredTierConfiguration,
clusterState.getNodes(),
DesiredNodes.latestFromClusterState(clusterState),
clusterState.metadata().nodeShutdowns()
metadata.nodeShutdowns()
);

if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {
if (ActiveShardCount.ALL.enoughShardsActive(metadata, clusterState.routingTable(), index.getName()) == false) {
if (preferredTierConfiguration.isEmpty()) {
logger.debug(
"[{}] lifecycle action for index [{}] cannot make progress because not all shards are active",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ConstructingObjectParser;
Expand Down Expand Up @@ -43,7 +45,9 @@ public boolean isRetryable() {

@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
IndexMetadata indexMetadata = clusterState.metadata().index(index);
Metadata metadata = clusterState.metadata();
RoutingTable routingTable = clusterState.routingTable();
IndexMetadata indexMetadata = metadata.index(index);
if (indexMetadata == null) {
// Index must have been since deleted, ignore it
logger.debug("[{}] lifecycle action for index [{}] executed but index no longer exists", getKey().action(), index.getName());
Expand All @@ -55,12 +59,12 @@ public Result isConditionMet(Index index, ClusterState clusterState) {

// We only want to make progress if all shards of the shrunk index are
// active
boolean indexExists = clusterState.metadata().index(shrunkenIndexName) != null;
boolean indexExists = metadata.index(shrunkenIndexName) != null;
if (indexExists == false) {
return new Result(false, new Info(false, -1, false));
}
boolean allShardsActive = ActiveShardCount.ALL.enoughShardsActive(clusterState, shrunkenIndexName);
int numShrunkIndexShards = clusterState.metadata().index(shrunkenIndexName).getNumberOfShards();
boolean allShardsActive = ActiveShardCount.ALL.enoughShardsActive(metadata, routingTable, shrunkenIndexName);
int numShrunkIndexShards = metadata.index(shrunkenIndexName).getNumberOfShards();
if (allShardsActive) {
return new Result(true, null);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -51,6 +52,7 @@ public boolean isRetryable() {
@Override
public Result isConditionMet(Index index, ClusterState clusterState) {
Metadata metadata = clusterState.metadata();
RoutingTable routingTable = clusterState.routingTable();
IndexMetadata originalIndexMeta = metadata.index(index);

if (originalIndexMeta == null) {
Expand Down Expand Up @@ -137,9 +139,9 @@ public Result isConditionMet(Index index, ClusterState clusterState) {
}

ActiveShardCount activeShardCount = ActiveShardCount.parseString(waitForActiveShardsSettingValue);
boolean enoughShardsActive = activeShardCount.enoughShardsActive(clusterState, rolledIndexName);
boolean enoughShardsActive = activeShardCount.enoughShardsActive(metadata, routingTable, rolledIndexName);

IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(rolledIndexName);
IndexRoutingTable indexRoutingTable = routingTable.index(rolledIndexName);
int currentActiveShards = 0;
for (int i = 0; i < indexRoutingTable.size(); i++) {
currentActiveShards += indexRoutingTable.shard(i).activeShards().size();
Expand Down

0 comments on commit 5acbfb6

Please sign in to comment.