diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java new file mode 100644 index 0000000000000..5f65d6647f26d --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.allocation; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; +import org.opensearch.indices.replication.common.ReplicationType; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; +import java.util.stream.Collectors; + +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class SearchReplicaFilteringAllocationIT extends OpenSearchIntegTestCase { + + @Override + protected Settings featureFlagSettings() { + return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build(); + } + + public void testSearchReplicaDedicatedIncludes() { + List nodesIds = internalCluster().startNodes(3); + final String node_0 = nodesIds.get(0); + final String node_1 = nodesIds.get(1); + final String node_2 = nodesIds.get(2); + assertEquals(3, cluster().size()); + + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0) + ) + .execute() + .actionGet(); + + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen("test"); + // ensure primary is not on node 0 or 1, + IndexShardRoutingTable routingTable = getRoutingTable(); + assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); + + String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()); + String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0; + + // set the included nodes to the other open node, search replica should relocate to that node. + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode)) + .execute() + .actionGet(); + ensureGreen("test"); + + routingTable = getRoutingTable(); + assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId())); + assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId())); + } + + public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() { + List nodesIds = internalCluster().startNodes(3); + final String node_0 = nodesIds.get(0); + final String node_1 = nodesIds.get(1); + final String node_2 = nodesIds.get(2); + assertEquals(3, cluster().size()); + + // set filter on 1 node and set search replica count to 2 - should leave 1 unassigned + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1)) + .execute() + .actionGet(); + + logger.info("--> creating an index with no replicas"); + createIndex( + "test", + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 2) + .put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureYellowAndNoInitializingShards("test"); + IndexShardRoutingTable routingTable = getRoutingTable(); + assertEquals(2, routingTable.searchOnlyReplicas().size()); + List assignedSearchShards = routingTable.searchOnlyReplicas() + .stream() + .filter(ShardRouting::assignedToNode) + .collect(Collectors.toList()); + assertEquals(1, assignedSearchShards.size()); + assertEquals(node_1, getNodeName(assignedSearchShards.get(0).currentNodeId())); + assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count()); + } + + private IndexShardRoutingTable getRoutingTable() { + IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0); + return routingTable; + } + + private String getNodeName(String id) { + return getClusterState().nodes().get(id).getName(); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java index 374743a8e4a17..40341a210d472 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/settings/SearchOnlyReplicaFeatureFlagIT.java @@ -16,6 +16,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE; +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1) public class SearchOnlyReplicaFeatureFlagIT extends OpenSearchIntegTestCase { @@ -52,4 +53,15 @@ public void testUpdateFeatureFlagDisabled() { }); assertTrue(exception.getMessage().contains("unknown setting")); } + + public void testFilterAllocationSettingNotRegistered() { + expectThrows(IllegalArgumentException.class, () -> { + client().admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", "node")) + .execute() + .actionGet(); + }); + } } diff --git a/server/src/main/java/org/opensearch/cluster/ClusterModule.java b/server/src/main/java/org/opensearch/cluster/ClusterModule.java index 87dde796b6a0b..0d15158f31e34 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterModule.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterModule.java @@ -74,6 +74,7 @@ import org.opensearch.cluster.routing.allocation.decider.ResizeAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.RestoreInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SnapshotInProgressAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.TargetPoolAllocationDecider; @@ -84,6 +85,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.util.set.Sets; import org.opensearch.core.ParseField; @@ -376,6 +378,9 @@ public static Collection createAllocationDeciders( addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new RestoreInProgressAllocationDecider()); addAllocationDecider(deciders, new FilterAllocationDecider(settings, clusterSettings)); + if (FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(settings)) { + addAllocationDecider(deciders, new SearchReplicaAllocationDecider(settings, clusterSettings)); + } addAllocationDecider(deciders, new SameShardAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new DiskThresholdDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ThrottlingAllocationDecider(settings, clusterSettings)); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java new file mode 100644 index 0000000000000..955c396bee4da --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDecider.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeFilters; +import org.opensearch.cluster.routing.RoutingNode; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Setting.Property; +import org.opensearch.common.settings.Settings; +import org.opensearch.node.remotestore.RemoteStoreNodeService; + +import java.util.Map; + +import static org.opensearch.cluster.node.DiscoveryNodeFilters.IP_VALIDATOR; +import static org.opensearch.cluster.node.DiscoveryNodeFilters.OpType.OR; + +/** + * This allocation decider is similar to FilterAllocationDecider but provides + * the option to filter specifically for search replicas. + * The filter behaves similar to an include for any defined node attribute. + * A search replica can be allocated to only nodes with one of the specified attributes while + * other shard types will be rejected from nodes with any othe attributes. + * @opensearch.internal + */ +public class SearchReplicaAllocationDecider extends AllocationDecider { + + public static final String NAME = "filter"; + private static final String SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX = "cluster.routing.allocation.search.replica.dedicated.include"; + public static final Setting.AffixSetting SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING = Setting.prefixKeySetting( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX + ".", + key -> Setting.simpleString(key, value -> IP_VALIDATOR.accept(key, value), Property.Dynamic, Property.NodeScope) + ); + + private volatile DiscoveryNodeFilters searchReplicaIncludeFilters; + + private volatile RemoteStoreNodeService.Direction migrationDirection; + private volatile RemoteStoreNodeService.CompatibilityMode compatibilityMode; + + public SearchReplicaAllocationDecider(Settings settings, ClusterSettings clusterSettings) { + setSearchReplicaIncludeFilters(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getAsMap(settings)); + clusterSettings.addAffixMapUpdateConsumer( + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING, + this::setSearchReplicaIncludeFilters, + (a, b) -> {} + ); + } + + @Override + public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + @Override + public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return shouldFilter(shardRouting, node.node(), allocation); + } + + private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { + if (searchReplicaIncludeFilters != null) { + final boolean match = searchReplicaIncludeFilters.match(node); + if (match == false && shardRouting.isSearchOnly()) { + return allocation.decision( + Decision.NO, + NAME, + "node does not match shard setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + // filter will only apply to search replicas + if (shardRouting.isSearchOnly() == false && match) { + return allocation.decision( + Decision.NO, + NAME, + "only search replicas can be allocated to node with setting [%s] filters [%s]", + SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_PREFIX, + searchReplicaIncludeFilters + ); + } + } + return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); + } + + private void setSearchReplicaIncludeFilters(Map filters) { + searchReplicaIncludeFilters = DiscoveryNodeFilters.trimTier( + DiscoveryNodeFilters.buildOrUpdateFromKeyValue(searchReplicaIncludeFilters, OR, filters) + ); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 04c5c1008eb5a..d73e769147e48 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -77,6 +77,7 @@ import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterApplierService; @@ -816,6 +817,8 @@ public void apply(Settings value, Settings current, Settings previous) { OpenSearchOnHeapCacheSettings.EXPIRE_AFTER_ACCESS_SETTING.getConcreteSettingForNamespace( CacheType.INDICES_REQUEST_CACHE.getSettingPrefix() ) - ) + ), + List.of(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL), + List.of(SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING) ); } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index 195cc65d414fd..bfb143b09881b 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -138,12 +138,10 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_ROUTING_SHARDS_SETTING; -import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_READ_ONLY_BLOCK; import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_REPLICATION_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; -import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REMOTE_SEGMENT_STORE_REPOSITORY; @@ -157,7 +155,6 @@ import static org.opensearch.cluster.metadata.MetadataCreateIndexService.getIndexNumberOfRoutingShards; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.parseV1Mappings; import static org.opensearch.cluster.metadata.MetadataCreateIndexService.resolveAndValidateAliases; -import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.IndexModule.INDEX_STORE_TYPE_SETTING; import static org.opensearch.index.IndexSettings.INDEX_MERGE_POLICY; @@ -2254,71 +2251,6 @@ public void testIndexCreationWithIndexStoreTypeRemoteStoreThrowsException() { ); } - public void testDefaultSearchReplicasSetting() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.EMPTY; - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder(); - request.settings(requestSettings.build()); - Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ); - assertFalse(INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING.exists(indexSettings)); - } - - public void testSearchReplicasValidationWithSegmentReplication() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); - request.settings(requestSettings.build()); - Settings indexSettings = aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ); - assertEquals("2", indexSettings.get(SETTING_NUMBER_OF_SEARCH_REPLICAS)); - assertEquals(ReplicationType.SEGMENT.toString(), indexSettings.get(SETTING_REPLICATION_TYPE)); - } - - public void testSearchReplicasValidationWithDocumentReplication() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build()); - Settings templateSettings = Settings.builder().put(SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build(); - request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); - final Settings.Builder requestSettings = Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2); - request.settings(requestSettings.build()); - - IllegalArgumentException exception = expectThrows( - IllegalArgumentException.class, - () -> aggregateIndexSettings( - ClusterState.EMPTY_STATE, - request, - templateSettings, - null, - Settings.EMPTY, - IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, - randomShardLimitService(), - Collections.emptySet(), - clusterSettings - ) - ); - assertEquals("To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", exception.getMessage()); - } - public void testCreateIndexWithContextDisabled() throws Exception { request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test").context(new Context(randomAlphaOfLength(5))); withTemporaryClusterService((clusterService, threadPool) -> { diff --git a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java index b1dd397c97218..3d11193a07884 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/SearchOnlyReplicaTests.java @@ -24,9 +24,11 @@ import org.opensearch.indices.ShardLimitValidator; import org.opensearch.indices.cluster.ClusterStateChanges; import org.opensearch.indices.replication.common.ReplicationType; -import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; import java.util.ArrayList; import java.util.Collections; @@ -40,157 +42,194 @@ import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS; import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; -import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL; -public class SearchOnlyReplicaTests extends OpenSearchTestCase { +public class SearchOnlyReplicaTests extends OpenSearchSingleNodeTestCase { - public void testUpdateSearchReplicaCount() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private ThreadPool threadPool; + + @Before + public void setUp() throws Exception { + super.setUp(); + this.threadPool = new TestThreadPool(getClass().getName()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + terminate(threadPool); + } + + @Override + protected Settings featureFlagSettings() { + return Settings.builder() + .put(super.featureFlagSettings()) + .put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.getKey(), true) + .build(); + } + + public void testCreateWithDefaultSearchReplicasSetting() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + ClusterState state = createIndexWithSettings(cluster, Settings.builder().build()); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(1, indexShardRoutingTable.replicaShards().size()); + assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(1, indexShardRoutingTable.writerReplicas().size()); + } - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - // node for search replicas - we'll start with 1 and add another - for (int i = 0; i < 2; i++) { - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - } - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - - CreateIndexRequest request = new CreateIndexRequest( - "index", + public void testSearchReplicasValidationWithDocumentReplication() { + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); + RuntimeException exception = expectThrows( + RuntimeException.class, + () -> createIndexWithSettings( + cluster, Settings.builder() .put(SETTING_NUMBER_OF_SHARDS, 1) .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(1, indexShardRoutingTable.replicaShards().size()); - assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - - // add another replica - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) - ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(2, indexShardRoutingTable.replicaShards().size()); - assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - - // remove all replicas - state = cluster.updateSettings( - state, - new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) - ); - rerouteUntilActive(state, cluster); - indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); - assertEquals(0, indexShardRoutingTable.replicaShards().size()); - assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); - assertEquals(0, indexShardRoutingTable.writerReplicas().size()); - } finally { - terminate(threadPool); - } + ) + ); + assertEquals( + "To set index.number_of_search_only_replicas, index.replication.type must be set to SEGMENT", + exception.getCause().getMessage() + ); } - public void testUpdateSearchReplicasOverShardLimit() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + public void testUpdateSearchReplicaCount() { final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); + ClusterState state = createIndexWithSettings( + cluster, + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + IndexShardRoutingTable indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(1, indexShardRoutingTable.replicaShards().size()); + assertEquals(1, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // add another replica + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 2).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(2, indexShardRoutingTable.replicaShards().size()); + assertEquals(2, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + + // remove all replicas + state = cluster.updateSettings( + state, + new UpdateSettingsRequest("index").settings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 0).build()) + ); + rerouteUntilActive(state, cluster); + indexShardRoutingTable = state.getRoutingTable().index("index").getShards().get(0); + assertEquals(0, indexShardRoutingTable.replicaShards().size()); + assertEquals(0, indexShardRoutingTable.searchOnlyReplicas().size()); + assertEquals(0, indexShardRoutingTable.writerReplicas().size()); + } + private ClusterState createIndexWithSettings(ClusterStateChanges cluster, Settings settings) { + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + // node for search replicas - we'll start with 1 and add another + for (int i = 0; i < 2; i++) { allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + } + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + CreateIndexRequest request = new CreateIndexRequest("index", settings).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + return state; + } - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) - .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - - // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) - ) - ); + public void testUpdateSearchReplicasOverShardLimit() { + final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - } finally { - terminate(threadPool); - } + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.SEGMENT) + .put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() + ) + ) + ); } public void testUpdateSearchReplicasOnDocrepCluster() { - FeatureFlags.initializeFeatureFlags(Settings.builder().put(READER_WRITER_SPLIT_EXPERIMENTAL, "true").build()); - final ThreadPool threadPool = new TestThreadPool(getClass().getName()); final ClusterStateChanges cluster = new ClusterStateChanges(xContentRegistry(), threadPool); - try { - List allNodes = new ArrayList<>(); - // node for primary/local - DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); - allNodes.add(localNode); - - allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); - - ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); - - CreateIndexRequest request = new CreateIndexRequest( - "index", - Settings.builder() - .put(SETTING_NUMBER_OF_SHARDS, 1) - .put(SETTING_NUMBER_OF_REPLICAS, 0) - .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) - .build() - ).waitForActiveShards(ActiveShardCount.NONE); - state = cluster.createIndex(state, request); - assertTrue(state.metadata().hasIndex("index")); - rerouteUntilActive(state, cluster); - - // add another replica - ClusterState finalState = state; - Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); - expectThrows( - RuntimeException.class, - () -> cluster.updateSettings( - finalState, - new UpdateSettingsRequest("index").settings( - Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() - ) + List allNodes = new ArrayList<>(); + // node for primary/local + DiscoveryNode localNode = createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE); + allNodes.add(localNode); + + allNodes.add(createNode(Version.CURRENT, DiscoveryNodeRole.CLUSTER_MANAGER_ROLE, DiscoveryNodeRole.DATA_ROLE)); + + ClusterState state = ClusterStateCreationUtils.state(localNode, localNode, allNodes.toArray(new DiscoveryNode[0])); + + CreateIndexRequest request = new CreateIndexRequest( + "index", + Settings.builder() + .put(SETTING_NUMBER_OF_SHARDS, 1) + .put(SETTING_NUMBER_OF_REPLICAS, 0) + .put(INDEX_REPLICATION_TYPE_SETTING.getKey(), ReplicationType.DOCUMENT) + .build() + ).waitForActiveShards(ActiveShardCount.NONE); + state = cluster.createIndex(state, request); + assertTrue(state.metadata().hasIndex("index")); + rerouteUntilActive(state, cluster); + + // add another replica + ClusterState finalState = state; + Integer maxShardPerNode = ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getDefault(Settings.EMPTY); + expectThrows( + RuntimeException.class, + () -> cluster.updateSettings( + finalState, + new UpdateSettingsRequest("index").settings( + Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, maxShardPerNode * 2).build() ) - ); - } finally { - terminate(threadPool); - } + ) + ); + } private static void rerouteUntilActive(ClusterState state, ClusterStateChanges cluster) { diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java index 2e303887e0f1b..f226c45553d57 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/FilterAllocationDeciderTests.java @@ -323,7 +323,7 @@ private ClusterState createInitialClusterState(AllocationService service, Settin return createInitialClusterState(service, indexSettings, Settings.EMPTY); } - private ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) { + static ClusterState createInitialClusterState(AllocationService service, Settings idxSettings, Settings clusterSettings) { Metadata.Builder metadata = Metadata.builder(); metadata.persistentSettings(clusterSettings); final Settings.Builder indexSettings = settings(Version.CURRENT).put(idxSettings); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java new file mode 100644 index 0000000000000..8d4f4cdee26cc --- /dev/null +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/decider/SearchReplicaAllocationDeciderTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.routing.allocation.decider; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.EmptyClusterInfoService; +import org.opensearch.cluster.OpenSearchAllocationTestCase; +import org.opensearch.cluster.routing.RecoverySource; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.cluster.routing.UnassignedInfo; +import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.RoutingAllocation; +import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.snapshots.EmptySnapshotsInfoService; +import org.opensearch.test.gateway.TestGatewayAllocator; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING; + +public class SearchReplicaAllocationDeciderTests extends OpenSearchAllocationTestCase { + + public void testSearchReplicaRoutingDedicatedIncludes() { + // we aren't using a settingsModule here so we need to set feature flag gated setting + Set> settings = new HashSet<>(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + settings.add(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING); + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), settings); + Settings initialSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node1,node2") + .build(); + + SearchReplicaAllocationDecider filterAllocationDecider = new SearchReplicaAllocationDecider(initialSettings, clusterSettings); + AllocationDeciders allocationDeciders = new AllocationDeciders( + Arrays.asList( + filterAllocationDecider, + new SameShardAllocationDecider(Settings.EMPTY, clusterSettings), + new ReplicaAfterPrimaryActiveAllocationDecider() + ) + ); + AllocationService service = new AllocationService( + allocationDeciders, + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + ClusterState state = FilterAllocationDeciderTests.createInitialClusterState(service, Settings.EMPTY, Settings.EMPTY); + RoutingTable routingTable = state.routingTable(); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state.getRoutingNodes(), state, null, null, 0); + allocation.debugDecision(true); + + ShardRouting searchReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + true, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "") + ); + + ShardRouting regularReplica = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + false, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + ShardRouting primary = ShardRouting.newUnassigned( + routingTable.index("sourceIndex").shard(0).shardId(), + true, + false, + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "") + ); + + Decision.Single decision = (Decision.Single) filterAllocationDecider.canAllocate( + searchReplica, + state.getRoutingNodes().node("node2"), + allocation + ); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + Settings updatedSettings = Settings.builder() + .put("cluster.routing.allocation.search.replica.dedicated.include._id", "node2") + .build(); + clusterSettings.applySettings(updatedSettings); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(searchReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(regularReplica, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canAllocate(primary, state.getRoutingNodes().node("node2"), allocation); + assertEquals(decision.toString(), Decision.Type.NO, decision.type()); + decision = (Decision.Single) filterAllocationDecider.canRemain(primary, state.getRoutingNodes().node("node1"), allocation); + assertEquals(decision.toString(), Decision.Type.YES, decision.type()); + } +}