From a16a1dedee672d0221ad0b54c467d03c486385ef Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Wed, 17 Jul 2024 18:02:15 +0530 Subject: [PATCH 1/4] Reconcile remote index settings on switch to strict mode Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../MigrationBaseTestCase.java | 27 ++ .../RemoteMigrationIndexMetadataUpdateIT.java | 68 +++++ .../TransportClusterUpdateSettingsAction.java | 100 ++++++- .../RemoteMigrationIndexMetadataUpdater.java | 3 +- ...ransportClusterManagerNodeActionTests.java | 250 ++++++++++++------ 5 files changed, 352 insertions(+), 96 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 2bea36ed80c9f..0b7a6e90b5cb6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING; import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; @@ -277,4 +278,30 @@ protected IndexShard getIndexShard(String dataNode, String indexName) throws Exe IndexService indexService = indicesService.indexService(new Index(indexName, uuid)); return indexService.getShard(0); } + + public void changeReplicaCountAndEnsureGreen(int replicaCount, String indexName) { + assertAcked( + client().admin() + .indices() + .prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount)) + ); + ensureGreen(indexName); + } + + public void completeDocRepToRemoteMigration() { + assertTrue( + internalCluster().client() + .admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings( + Settings.builder() + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict") + .put(MIGRATION_DIRECTION_SETTING.getKey(), "none") + ) + .get() + .isAcknowledged() + ); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index 216c104dfecc1..2cf33f47ce0e6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -546,6 +546,74 @@ public void testRemoteIndexPathFileExistsAfterMigration() throws Exception { assertTrue(Arrays.stream(files).anyMatch(file -> file.toString().contains(fileNamePrefix))); } + /** + * Scenario: + * Creates an index with 1 pri 1 rep setup with 3 docrep nodes (1 cluster manager + 2 data nodes), + * initiate migration and create 3 remote nodes (1 cluster manager + 2 data nodes) and moves over + * only primary shard copy of the index + * After the primary shard copy is relocated, decrease replica count to 0, stop all docrep nodes + * and conclude migration. Remote store index settings should be applied to the index at this point. + */ + public void testIndexSettingsUpdateDuringReplicaCountDecrement() throws Exception { + String indexName = "migration-index-replica-decrement"; + String docrepClusterManager = internalCluster().startClusterManagerOnlyNode(); + + logger.info("---> Starting 2 docrep nodes"); + List docrepNodeNames = internalCluster().startDataOnlyNodes(2); + internalCluster().validateClusterFormed(); + + logger.info("---> Creating index with 1 primary and 1 replica"); + Settings oneReplica = Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + createIndexAndAssertDocrepProperties(indexName, oneReplica); + + int docsToIndex = randomIntBetween(10, 100); + logger.info("---> Indexing {} on both indices", docsToIndex); + indexBulk(indexName, docsToIndex); + + logger.info( + "---> Stopping shard rebalancing to ensure shards do not automatically move over to newer nodes after they are launched" + ); + stopShardRebalancing(); + + logger.info("---> Starting 3 remote store enabled nodes"); + initDocRepToRemoteMigration(); + setAddRemote(true); + internalCluster().startClusterManagerOnlyNode(); + List remoteNodeNames = internalCluster().startDataOnlyNodes(2); + internalCluster().validateClusterFormed(); + + String primaryNode = primaryNodeName(indexName); + + logger.info("---> Moving over primary to remote store enabled nodes"); + assertAcked( + client().admin() + .cluster() + .prepareReroute() + .add(new MoveAllocationCommand(indexName, 0, primaryNode, remoteNodeNames.get(0))) + .execute() + .actionGet() + ); + waitForRelocation(); + waitNoPendingTasksOnAll(); + + logger.info("---> Reducing replica count to 0 for the index"); + changeReplicaCountAndEnsureGreen(0, indexName); + assertDocrepProperties(indexName); + + logger.info("---> Stopping all docrep nodes"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager)); + for (String node : docrepNodeNames) { + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node)); + } + internalCluster().validateClusterFormed(); + completeDocRepToRemoteMigration(); + waitNoPendingTasksOnAll(); + assertRemoteProperties(indexName); + } + private void createIndexAndAssertDocrepProperties(String index, Settings settings) { createIndexAssertHealthAndDocrepProperties(index, settings, this::ensureGreen); } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 216e1fb2ed1cc..5a164435fb9d0 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -66,10 +67,13 @@ import java.io.IOException; import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; -import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasAllRemoteStoreRelatedMetadata; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings; /** * Transport action for updating cluster settings @@ -270,6 +274,13 @@ public ClusterState execute(final ClusterState currentState) { logger ); changed = clusterState != currentState; + // Remote Store migration: Checks if the applied cluster settings + // has switched the cluster to STRICT mode. If so, checks and applies + // appropriate index settings depending on the current set of node types + // in the cluster + if (isSwitchToStrictCompatibilityMode(clusterState.metadata().settings())) { + return finalizeMigration(clusterState); + } return clusterState; } } @@ -284,11 +295,9 @@ public ClusterState execute(final ClusterState currentState) { public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) { Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) { - String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode; validateAllNodesOfSameVersion(clusterState.nodes()); - if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) { + if (isSwitchToStrictCompatibilityMode(settings)) { validateAllNodesOfSameType(clusterState.nodes()); - validateIndexSettings(clusterState); } } } @@ -323,18 +332,83 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { } /** - * Verifies that while trying to switch to STRICT compatibility mode, - * all indices in the cluster have {@link RemoteMigrationIndexMetadataUpdater#indexHasAllRemoteStoreRelatedMetadata(IndexMetadata)} as true. - * If not, throws {@link SettingsException} - * @param clusterState current cluster state + * Finalizes the docrep to remote-store migration process by applying remote store based index settings + * on indices that are missing them. No-Op if all indices already have the settings applied through + * IndexMetadataUpdater + * + * @param incomingState mutated cluster state after cluster settings were applied + * @return new cluster state with index settings updated + */ + public ClusterState finalizeMigration(ClusterState incomingState) { + Map discoveryNodeMap = incomingState.nodes().getNodes(); + if (discoveryNodeMap.isEmpty() == false) { + boolean allNodesRemote = discoveryNodeMap.values().stream().allMatch(discoNode -> discoNode.isRemoteStoreNode() == true); + if (allNodesRemote == true) { + List indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState); + if (indicesWithoutRemoteStoreSettings.isEmpty() == true) { + logger.info("All indices in the cluster has remote store based index settings"); + } else { + Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings); + return ClusterState.builder(incomingState).metadata(mutatedMetadata).build(); + } + } else { + // TODO: Revert all remote store settings for remote store -> docrep migration + logger.debug("All nodes in the cluster are not remote nodes. Skipping."); + } + } + return incomingState; + } + + /** + * Filters out indices which does not have remote store based + * index settings applied even after all shard copies have + * migrated to remote store enabled nodes */ - private void validateIndexSettings(ClusterState clusterState) { + private List getIndicesWithoutRemoteStoreSettings(ClusterState clusterState) { Collection allIndicesMetadata = clusterState.metadata().indices().values(); - if (allIndicesMetadata.isEmpty() == false - && allIndicesMetadata.stream().anyMatch(indexMetadata -> indexHasAllRemoteStoreRelatedMetadata(indexMetadata) == false)) { - throw new SettingsException( - "can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings" + if (allIndicesMetadata.isEmpty() == false) { + List indicesWithoutRemoteSettings = allIndicesMetadata.stream() + .filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false) + .collect(Collectors.toList()); + logger.debug( + "Attempting to switch to strict mode. Count of indices without remote store settings {}", + indicesWithoutRemoteSettings.size() + ); + return indicesWithoutRemoteSettings; + } + return Collections.emptyList(); + } + + /** + * Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater} + */ + private Metadata applyRemoteStoreSettings(ClusterState clusterState, List indicesWithRemoteStoreSettings) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable currentRoutingTable = clusterState.getRoutingTable(); + DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes(); + for (IndexMetadata indexMetadata : indicesWithRemoteStoreSettings) { + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata); + RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + currentDiscoveryNodes, + currentRoutingTable, + indexMetadata, + null, + logger ); + indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName()); + metadataBuilder.put(indexMetadataBuilder); } + return metadataBuilder.build(); + } + + /** + * Checks if the incoming cluster settings payload is attempting to switch + * the cluster to `STRICT` compatibility mode + * Visible only for tests + */ + public boolean isSwitchToStrictCompatibilityMode(Settings settings) { + return RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals( + RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode + ); } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java index cc51fcd2f18f6..2db4616a7180e 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteMigrationIndexMetadataUpdater.java @@ -15,6 +15,7 @@ import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.common.Nullable; import org.opensearch.common.settings.Settings; import org.opensearch.index.remote.RemoteStoreEnums.PathType; import org.opensearch.indices.replication.common.ReplicationType; @@ -49,7 +50,7 @@ public RemoteMigrationIndexMetadataUpdater( DiscoveryNodes discoveryNodes, RoutingTable routingTable, IndexMetadata indexMetadata, - Settings clusterSettings, + @Nullable Settings clusterSettings, Logger logger ) { diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index 35c5c5e605b4d..8ce5d5a4cec6b 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -31,11 +31,17 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; +import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -47,14 +53,16 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; @@ -79,13 +87,13 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.common.util.FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL; import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithDocrepSettings; import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -718,9 +726,6 @@ protected void masterOperation(Task task, Request request, ClusterState state, A } public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { - Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); - FeatureFlags.initializeFeatureFlags(nodeSettings); - // request to change cluster compatibility mode to STRICT Settings currentCompatibilityModeSettings = Settings.builder() .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) @@ -809,84 +814,7 @@ public void testDontAllowSwitchingToStrictCompatibilityModeForMixedCluster() { transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameTypeClusterState); } - public void testDontAllowSwitchingToStrictCompatibilityModeWithoutRemoteIndexSettings() { - Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); - FeatureFlags.initializeFeatureFlags(nodeSettings); - Settings currentCompatibilityModeSettings = Settings.builder() - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.MIXED) - .build(); - Settings intendedCompatibilityModeSettings = Settings.builder() - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), RemoteStoreNodeService.CompatibilityMode.STRICT) - .build(); - ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); - request.persistentSettings(intendedCompatibilityModeSettings); - DiscoveryNode remoteNode1 = new DiscoveryNode( - UUIDs.base64UUID(), - buildNewFakeTransportAddress(), - getRemoteStoreNodeAttributes(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - DiscoveryNode remoteNode2 = new DiscoveryNode( - UUIDs.base64UUID(), - buildNewFakeTransportAddress(), - getRemoteStoreNodeAttributes(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() - .add(remoteNode1) - .localNodeId(remoteNode1.getId()) - .add(remoteNode2) - .localNodeId(remoteNode2.getId()) - .build(); - AllocationService allocationService = new AllocationService( - new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), - new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), - EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE - ); - TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( - transportService, - clusterService, - threadPool, - allocationService, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), - clusterService.getClusterSettings() - ); - - Metadata nonRemoteIndexMd = Metadata.builder(createIndexMetadataWithDocrepSettings("test")) - .persistentSettings(currentCompatibilityModeSettings) - .build(); - final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .metadata(nonRemoteIndexMd) - .nodes(discoveryNodes) - .build(); - final SettingsException exception = expectThrows( - SettingsException.class, - () -> transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterState) - ); - assertEquals( - "can not switch to STRICT compatibility mode since all indices in the cluster does not have remote store based index settings", - exception.getMessage() - ); - - Metadata remoteIndexMd = Metadata.builder(createIndexMetadataWithRemoteStoreSettings("test")) - .persistentSettings(currentCompatibilityModeSettings) - .build(); - ClusterState clusterStateWithRemoteIndices = ClusterState.builder(ClusterName.DEFAULT) - .metadata(remoteIndexMd) - .nodes(discoveryNodes) - .build(); - transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, clusterStateWithRemoteIndices); - } - public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersions() { - Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); - FeatureFlags.initializeFeatureFlags(nodeSettings); - // request to change cluster compatibility mode boolean toStrictMode = randomBoolean(); Settings currentCompatibilityModeSettings = Settings.builder() @@ -982,6 +910,124 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameVersionClusterState); } + public void testIsSwitchToStrictCompatibilityMode() { + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + AllocationService allocationService = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( + transportService, + clusterService, + threadPool, + allocationService, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + clusterService.getClusterSettings() + ); + assertTrue(transportClusterUpdateSettingsAction.isSwitchToStrictCompatibilityMode(mockSettings)); + + mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed").build(); + assertFalse(transportClusterUpdateSettingsAction.isSwitchToStrictCompatibilityMode(mockSettings)); + } + + public void testFinalizeMigrationWithAllRemoteNodes() { + AllocationService allocationService = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( + transportService, + clusterService, + threadPool, + allocationService, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + clusterService.getClusterSettings() + ); + + String migratedIndex = "migrated-index"; + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + DiscoveryNode remoteNode1 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode remoteNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(migratedIndex); + assertDocrepSettingsApplied(docrepIdxMetadata.index(migratedIndex)); + Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); + ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) + .metadata(remoteIndexMd) + .nodes(discoveryNodes) + .routingTable(createRoutingTableAllShardsStarted(migratedIndex, 1, 1, remoteNode1, remoteNode2)) + .build(); + Metadata mutatedMetadata = transportClusterUpdateSettingsAction.finalizeMigration(clusterStateWithDocrepIndexSettings).metadata(); + assertTrue(mutatedMetadata.index(migratedIndex).getVersion() > docrepIdxMetadata.index(migratedIndex).getVersion()); + assertRemoteSettingsApplied(mutatedMetadata.index(migratedIndex)); + } + + public void testFinalizeMigrationWithAllDocrepNodes() { + AllocationService allocationService = new AllocationService( + new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), + new TestGatewayAllocator(), + new BalancedShardsAllocator(Settings.EMPTY), + EmptyClusterInfoService.INSTANCE, + EmptySnapshotsInfoService.INSTANCE + ); + TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( + transportService, + clusterService, + threadPool, + allocationService, + new ActionFilters(Collections.emptySet()), + new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), + clusterService.getClusterSettings() + ); + + String docrepIndex = "docrep-index"; + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + DiscoveryNode docrepNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode docrepNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(docrepNode1) + .localNodeId(docrepNode1.getId()) + .add(docrepNode2) + .localNodeId(docrepNode2.getId()) + .build(); + Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(docrepIndex); + assertDocrepSettingsApplied(docrepIdxMetadata.index(docrepIndex)); + Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); + ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) + .metadata(remoteIndexMd) + .nodes(discoveryNodes) + .routingTable(createRoutingTableAllShardsStarted(docrepIndex, 1, 1, docrepNode1, docrepNode2)) + .build(); + Metadata mutatedMetadata = transportClusterUpdateSettingsAction.finalizeMigration(clusterStateWithDocrepIndexSettings).metadata(); + assertEquals(docrepIdxMetadata.index(docrepIndex).getVersion(), mutatedMetadata.index(docrepIndex).getVersion()); + assertDocrepSettingsApplied(mutatedMetadata.index(docrepIndex)); + } + private Map getRemoteStoreNodeAttributes() { Map remoteStoreNodeAttributes = new HashMap<>(); remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); @@ -989,4 +1035,44 @@ private Map getRemoteStoreNodeAttributes() { return remoteStoreNodeAttributes; } + private void assertRemoteSettingsApplied(IndexMetadata indexMetadata) { + assertTrue(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.SEGMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private void assertDocrepSettingsApplied(IndexMetadata indexMetadata) { + assertFalse(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.DOCUMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private RoutingTable createRoutingTableAllShardsStarted( + String indexName, + int numberOfShards, + int numberOfReplicas, + DiscoveryNode primaryHostingNode, + DiscoveryNode replicaHostingNode + ) { + RoutingTable.Builder builder = RoutingTable.builder(); + Index index = new Index(indexName, UUID.randomUUID().toString()); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, primaryHostingNode.getId(), true, ShardRoutingState.STARTED) + ); + for (int j = 0; j < numberOfReplicas; j++) { + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, replicaHostingNode.getId(), false, ShardRoutingState.STARTED) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); + } + return builder.add(indexRoutingTableBuilder.build()).build(); + } } From d39227b3c6cd1c7c134d1e9d55a708442b38f18b Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Wed, 17 Jul 2024 20:32:05 +0530 Subject: [PATCH 2/4] Addressing comments Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../RemoteMigrationIndexMetadataUpdateIT.java | 1 - .../TransportClusterUpdateSettingsAction.java | 50 +++++++++++++------ .../RemoteMigrationIndexMetadataUpdater.java | 3 +- ...ransportClusterManagerNodeActionTests.java | 7 ++- 4 files changed, 40 insertions(+), 21 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java index 2cf33f47ce0e6..b55219e1cb37f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteMigrationIndexMetadataUpdateIT.java @@ -601,7 +601,6 @@ public void testIndexSettingsUpdateDuringReplicaCountDecrement() throws Exceptio logger.info("---> Reducing replica count to 0 for the index"); changeReplicaCountAndEnsureGreen(0, indexName); - assertDocrepProperties(indexName); logger.info("---> Stopping all docrep nodes"); internalCluster().stopRandomNode(InternalTestCluster.nameFilter(docrepClusterManager)); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 5a164435fb9d0..26c2962fc8e1f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -159,6 +159,8 @@ protected void clusterManagerOperation( private volatile boolean changed = false; + private volatile boolean isSwitchingToStrictMode = false; + @Override public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { return clusterUpdateSettingTaskKey; @@ -266,21 +268,29 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(final ClusterState currentState) { - validateCompatibilityModeSettingRequest(request, state); + boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state); final ClusterState clusterState = updater.updateSettings( currentState, clusterSettings.upgradeSettings(request.transientSettings()), clusterSettings.upgradeSettings(request.persistentSettings()), logger ); - changed = clusterState != currentState; - // Remote Store migration: Checks if the applied cluster settings - // has switched the cluster to STRICT mode. If so, checks and applies - // appropriate index settings depending on the current set of node types - // in the cluster - if (isSwitchToStrictCompatibilityMode(clusterState.metadata().settings())) { - return finalizeMigration(clusterState); + /* + Remote Store migration: Checks if the applied cluster settings + has switched the cluster to STRICT mode. If so, checks and applies + appropriate index settings depending on the current set of node types + in the cluster + + This has been intentionally done after the cluster settings update + flow. That way we are not interfering with the usual settings update + and the cluster state mutation that comes along with it + */ + if (isCompatibilityModeChanging == true && isSwitchToStrictCompatibilityMode(request) == true) { + ClusterState newStateAfterIndexMdChanges = finalizeMigration(clusterState); + changed = newStateAfterIndexMdChanges != currentState; + return newStateAfterIndexMdChanges; } + changed = clusterState != currentState; return clusterState; } } @@ -289,17 +299,22 @@ public ClusterState execute(final ClusterState currentState) { /** * Runs various checks associated with changing cluster compatibility mode + * * @param request cluster settings update request, for settings to be updated and new values * @param clusterState current state of cluster, for information on nodes + * @return true if the incoming cluster settings update request is switching compatibility modes */ - public void validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) { + public boolean validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) { Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) { + String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode; validateAllNodesOfSameVersion(clusterState.nodes()); - if (isSwitchToStrictCompatibilityMode(settings)) { + if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) { validateAllNodesOfSameType(clusterState.nodes()); } + return true; } + return false; } /** @@ -342,8 +357,10 @@ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { public ClusterState finalizeMigration(ClusterState incomingState) { Map discoveryNodeMap = incomingState.nodes().getNodes(); if (discoveryNodeMap.isEmpty() == false) { - boolean allNodesRemote = discoveryNodeMap.values().stream().allMatch(discoNode -> discoNode.isRemoteStoreNode() == true); - if (allNodesRemote == true) { + // At this point, we have already validated that all nodes in the cluster are of uniform type. + // Either all of them are remote store enabled, or all of them are docrep enabled + boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode(); + if (remoteStoreEnabledNodePresent == true) { List indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState); if (indicesWithoutRemoteStoreSettings.isEmpty() == true) { logger.info("All indices in the cluster has remote store based index settings"); @@ -352,7 +369,6 @@ public ClusterState finalizeMigration(ClusterState incomingState) { return ClusterState.builder(incomingState).metadata(mutatedMetadata).build(); } } else { - // TODO: Revert all remote store settings for remote store -> docrep migration logger.debug("All nodes in the cluster are not remote nodes. Skipping."); } } @@ -386,13 +402,14 @@ private Metadata applyRemoteStoreSettings(ClusterState clusterState, List Date: Thu, 18 Jul 2024 14:11:17 +0530 Subject: [PATCH 3/4] Addressing comments Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../org/opensearch/remotemigration/MigrationBaseTestCase.java | 4 ++-- .../settings/TransportClusterUpdateSettingsAction.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0b7a6e90b5cb6..e4e681a5433b5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -297,8 +297,8 @@ public void completeDocRepToRemoteMigration() { .prepareUpdateSettings() .setPersistentSettings( Settings.builder() - .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict") - .put(MIGRATION_DIRECTION_SETTING.getKey(), "none") + .putNull(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey()) + .putNull(MIGRATION_DIRECTION_SETTING.getKey()) ) .get() .isAcknowledged() diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 26c2962fc8e1f..574b013e64c92 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -285,7 +285,7 @@ public ClusterState execute(final ClusterState currentState) { flow. That way we are not interfering with the usual settings update and the cluster state mutation that comes along with it */ - if (isCompatibilityModeChanging == true && isSwitchToStrictCompatibilityMode(request) == true) { + if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) { ClusterState newStateAfterIndexMdChanges = finalizeMigration(clusterState); changed = newStateAfterIndexMdChanges != currentState; return newStateAfterIndexMdChanges; From d77cc2f6053e3ab93228e7825d47fe2efb8b0ce9 Mon Sep 17 00:00:00 2001 From: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> Date: Thu, 18 Jul 2024 15:52:30 +0530 Subject: [PATCH 4/4] Addressing comments Signed-off-by: Shourya Dutta Biswas <114977491+shourya035@users.noreply.github.com> --- .../TransportClusterUpdateSettingsAction.java | 131 ++----------- .../index/remote/RemoteStoreUtils.java | 123 +++++++++++++ ...ransportClusterManagerNodeActionTests.java | 173 ------------------ .../index/remote/RemoteStoreUtilsTests.java | 139 ++++++++++++++ 4 files changed, 275 insertions(+), 291 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 574b013e64c92..3988d50b2ce1e 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -42,12 +42,10 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.service.ClusterManagerTaskKeys; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; @@ -60,20 +58,13 @@ import org.opensearch.common.settings.SettingsException; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings; +import static org.opensearch.index.remote.RemoteStoreUtils.checkAndFinalizeRemoteStoreMigration; /** * Transport action for updating cluster settings @@ -159,8 +150,6 @@ protected void clusterManagerOperation( private volatile boolean changed = false; - private volatile boolean isSwitchingToStrictMode = false; - @Override public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { return clusterUpdateSettingTaskKey; @@ -269,27 +258,13 @@ public void onFailure(String source, Exception e) { @Override public ClusterState execute(final ClusterState currentState) { boolean isCompatibilityModeChanging = validateCompatibilityModeSettingRequest(request, state); - final ClusterState clusterState = updater.updateSettings( + ClusterState clusterState = updater.updateSettings( currentState, clusterSettings.upgradeSettings(request.transientSettings()), clusterSettings.upgradeSettings(request.persistentSettings()), logger ); - /* - Remote Store migration: Checks if the applied cluster settings - has switched the cluster to STRICT mode. If so, checks and applies - appropriate index settings depending on the current set of node types - in the cluster - - This has been intentionally done after the cluster settings update - flow. That way we are not interfering with the usual settings update - and the cluster state mutation that comes along with it - */ - if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) { - ClusterState newStateAfterIndexMdChanges = finalizeMigration(clusterState); - changed = newStateAfterIndexMdChanges != currentState; - return newStateAfterIndexMdChanges; - } + clusterState = checkAndFinalizeRemoteStoreMigration(isCompatibilityModeChanging, request, clusterState, logger); changed = clusterState != currentState; return clusterState; } @@ -307,9 +282,10 @@ public ClusterState execute(final ClusterState currentState) { public boolean validateCompatibilityModeSettingRequest(ClusterUpdateSettingsRequest request, ClusterState clusterState) { Settings settings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.exists(settings)) { - String value = RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(settings).mode; validateAllNodesOfSameVersion(clusterState.nodes()); - if (RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals(value)) { + if (RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get( + settings + ) == RemoteStoreNodeService.CompatibilityMode.STRICT) { validateAllNodesOfSameType(clusterState.nodes()); } return true; @@ -334,99 +310,18 @@ private void validateAllNodesOfSameVersion(DiscoveryNodes discoveryNodes) { * @param discoveryNodes current discovery nodes in the cluster */ private void validateAllNodesOfSameType(DiscoveryNodes discoveryNodes) { - Set nodeTypes = discoveryNodes.getNodes() + boolean allNodesDocrepEnabled = discoveryNodes.getNodes() + .values() + .stream() + .allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode() == false); + boolean allNodesRemoteStoreEnabled = discoveryNodes.getNodes() .values() .stream() - .map(DiscoveryNode::isRemoteStoreNode) - .collect(Collectors.toSet()); - if (nodeTypes.size() != 1) { + .allMatch(discoveryNode -> discoveryNode.isRemoteStoreNode()); + if (allNodesDocrepEnabled == false && allNodesRemoteStoreEnabled == false) { throw new SettingsException( "can not switch to STRICT compatibility mode when the cluster contains both remote and non-remote nodes" ); } } - - /** - * Finalizes the docrep to remote-store migration process by applying remote store based index settings - * on indices that are missing them. No-Op if all indices already have the settings applied through - * IndexMetadataUpdater - * - * @param incomingState mutated cluster state after cluster settings were applied - * @return new cluster state with index settings updated - */ - public ClusterState finalizeMigration(ClusterState incomingState) { - Map discoveryNodeMap = incomingState.nodes().getNodes(); - if (discoveryNodeMap.isEmpty() == false) { - // At this point, we have already validated that all nodes in the cluster are of uniform type. - // Either all of them are remote store enabled, or all of them are docrep enabled - boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode(); - if (remoteStoreEnabledNodePresent == true) { - List indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState); - if (indicesWithoutRemoteStoreSettings.isEmpty() == true) { - logger.info("All indices in the cluster has remote store based index settings"); - } else { - Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings); - return ClusterState.builder(incomingState).metadata(mutatedMetadata).build(); - } - } else { - logger.debug("All nodes in the cluster are not remote nodes. Skipping."); - } - } - return incomingState; - } - - /** - * Filters out indices which does not have remote store based - * index settings applied even after all shard copies have - * migrated to remote store enabled nodes - */ - private List getIndicesWithoutRemoteStoreSettings(ClusterState clusterState) { - Collection allIndicesMetadata = clusterState.metadata().indices().values(); - if (allIndicesMetadata.isEmpty() == false) { - List indicesWithoutRemoteSettings = allIndicesMetadata.stream() - .filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false) - .collect(Collectors.toList()); - logger.debug( - "Attempting to switch to strict mode. Count of indices without remote store settings {}", - indicesWithoutRemoteSettings.size() - ); - return indicesWithoutRemoteSettings; - } - return Collections.emptyList(); - } - - /** - * Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater} - */ - private Metadata applyRemoteStoreSettings(ClusterState clusterState, List indicesWithRemoteStoreSettings) { - Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); - RoutingTable currentRoutingTable = clusterState.getRoutingTable(); - DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes(); - Settings currentClusterSettings = clusterState.metadata().settings(); - for (IndexMetadata indexMetadata : indicesWithRemoteStoreSettings) { - IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata); - RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( - currentDiscoveryNodes, - currentRoutingTable, - indexMetadata, - currentClusterSettings, - logger - ); - indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName()); - metadataBuilder.put(indexMetadataBuilder); - } - return metadataBuilder.build(); - } - - /** - * Checks if the incoming cluster settings payload is attempting to switch - * the cluster to `STRICT` compatibility mode - * Visible only for tests - */ - public boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) { - Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); - return RemoteStoreNodeService.CompatibilityMode.STRICT.mode.equals( - RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get(incomingSettings).mode - ); - } } diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java index 9a9de6c819424..cfa2d546314d4 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStoreUtils.java @@ -11,17 +11,23 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.node.remotestore.RemoteStoreNodeService; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -29,7 +35,9 @@ import java.util.Objects; import java.util.Optional; import java.util.function.Function; +import java.util.stream.Collectors; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING; import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA; @@ -250,4 +258,119 @@ public static Map getRemoteStoreRepoName(DiscoveryNodes discover .findFirst(); return remoteNode.map(RemoteStoreNodeAttribute::getDataRepoNames).orElseGet(HashMap::new); } + + /** + * Invoked after a cluster settings update. + * Checks if the applied cluster settings has switched the cluster to STRICT mode. + * If so, checks and applies appropriate index settings depending on the current set + * of node types in the cluster + * This has been intentionally done after the cluster settings update + * flow. That way we are not interfering with the usual settings update + * and the cluster state mutation that comes along with it + * + * @param isCompatibilityModeChanging flag passed from cluster settings update call to denote if a compatibility mode change has been done + * @param request request payload passed from cluster settings update + * @param currentState cluster state generated after changing cluster settings were applied + * @param logger Logger reference + * @return Mutated cluster state with remote store index settings applied, no-op if the cluster is not switching to `STRICT` compatibility mode + */ + public static ClusterState checkAndFinalizeRemoteStoreMigration( + boolean isCompatibilityModeChanging, + ClusterUpdateSettingsRequest request, + ClusterState currentState, + Logger logger + ) { + if (isCompatibilityModeChanging && isSwitchToStrictCompatibilityMode(request)) { + return finalizeMigration(currentState, logger); + } + return currentState; + } + + /** + * Finalizes the docrep to remote-store migration process by applying remote store based index settings + * on indices that are missing them. No-Op if all indices already have the settings applied through + * IndexMetadataUpdater + * + * @param incomingState mutated cluster state after cluster settings were applied + * @return new cluster state with index settings updated + */ + public static ClusterState finalizeMigration(ClusterState incomingState, Logger logger) { + Map discoveryNodeMap = incomingState.nodes().getNodes(); + if (discoveryNodeMap.isEmpty() == false) { + // At this point, we have already validated that all nodes in the cluster are of uniform type. + // Either all of them are remote store enabled, or all of them are docrep enabled + boolean remoteStoreEnabledNodePresent = discoveryNodeMap.values().stream().findFirst().get().isRemoteStoreNode(); + if (remoteStoreEnabledNodePresent == true) { + List indicesWithoutRemoteStoreSettings = getIndicesWithoutRemoteStoreSettings(incomingState, logger); + if (indicesWithoutRemoteStoreSettings.isEmpty() == true) { + logger.info("All indices in the cluster has remote store based index settings"); + } else { + Metadata mutatedMetadata = applyRemoteStoreSettings(incomingState, indicesWithoutRemoteStoreSettings, logger); + return ClusterState.builder(incomingState).metadata(mutatedMetadata).build(); + } + } else { + logger.debug("All nodes in the cluster are not remote nodes. Skipping."); + } + } + return incomingState; + } + + /** + * Filters out indices which does not have remote store based + * index settings applied even after all shard copies have + * migrated to remote store enabled nodes + */ + private static List getIndicesWithoutRemoteStoreSettings(ClusterState clusterState, Logger logger) { + Collection allIndicesMetadata = clusterState.metadata().indices().values(); + if (allIndicesMetadata.isEmpty() == false) { + List indicesWithoutRemoteSettings = allIndicesMetadata.stream() + .filter(idxMd -> indexHasRemoteStoreSettings(idxMd.getSettings()) == false) + .collect(Collectors.toList()); + logger.debug( + "Attempting to switch to strict mode. Count of indices without remote store settings {}", + indicesWithoutRemoteSettings.size() + ); + return indicesWithoutRemoteSettings; + } + return Collections.emptyList(); + } + + /** + * Applies remote store index settings through {@link RemoteMigrationIndexMetadataUpdater} + */ + private static Metadata applyRemoteStoreSettings( + ClusterState clusterState, + List indicesWithoutRemoteStoreSettings, + Logger logger + ) { + Metadata.Builder metadataBuilder = Metadata.builder(clusterState.getMetadata()); + RoutingTable currentRoutingTable = clusterState.getRoutingTable(); + DiscoveryNodes currentDiscoveryNodes = clusterState.getNodes(); + Settings currentClusterSettings = clusterState.metadata().settings(); + for (IndexMetadata indexMetadata : indicesWithoutRemoteStoreSettings) { + IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexMetadata); + RemoteMigrationIndexMetadataUpdater indexMetadataUpdater = new RemoteMigrationIndexMetadataUpdater( + currentDiscoveryNodes, + currentRoutingTable, + indexMetadata, + currentClusterSettings, + logger + ); + indexMetadataUpdater.maybeAddRemoteIndexSettings(indexMetadataBuilder, indexMetadata.getIndex().getName()); + metadataBuilder.put(indexMetadataBuilder); + } + return metadataBuilder.build(); + } + + /** + * Checks if the incoming cluster settings payload is attempting to switch + * the cluster to `STRICT` compatibility mode + * Visible only for tests + */ + public static boolean isSwitchToStrictCompatibilityMode(ClusterUpdateSettingsRequest request) { + Settings incomingSettings = Settings.builder().put(request.persistentSettings()).put(request.transientSettings()).build(); + return RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING.get( + incomingSettings + ) == RemoteStoreNodeService.CompatibilityMode.STRICT; + } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index a10ed966aa899..37e884502b613 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -31,17 +31,11 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; -import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.IndexRoutingTable; -import org.opensearch.cluster.routing.IndexShardRoutingTable; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.TestShardRouting; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -58,11 +52,8 @@ import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; -import org.opensearch.core.index.Index; -import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; -import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.NodeClosedException; import org.opensearch.node.remotestore.RemoteStoreNodeService; import org.opensearch.snapshots.EmptySnapshotsInfoService; @@ -87,14 +78,12 @@ import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.UUID; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithDocrepSettings; import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithRemoteStoreSettings; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; @@ -910,172 +899,10 @@ public void testDontAllowSwitchingCompatibilityModeForClusterWithMultipleVersion transportClusterUpdateSettingsAction.validateCompatibilityModeSettingRequest(request, sameVersionClusterState); } - public void testIsSwitchToStrictCompatibilityMode() { - Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); - ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); - request.persistentSettings(mockSettings); - AllocationService allocationService = new AllocationService( - new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), - new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), - EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE - ); - TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( - transportService, - clusterService, - threadPool, - allocationService, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), - clusterService.getClusterSettings() - ); - assertTrue(transportClusterUpdateSettingsAction.isSwitchToStrictCompatibilityMode(request)); - - mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed").build(); - request.persistentSettings(mockSettings); - assertFalse(transportClusterUpdateSettingsAction.isSwitchToStrictCompatibilityMode(request)); - } - - public void testFinalizeMigrationWithAllRemoteNodes() { - AllocationService allocationService = new AllocationService( - new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), - new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), - EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE - ); - TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( - transportService, - clusterService, - threadPool, - allocationService, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), - clusterService.getClusterSettings() - ); - - String migratedIndex = "migrated-index"; - Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); - DiscoveryNode remoteNode1 = new DiscoveryNode( - UUIDs.base64UUID(), - buildNewFakeTransportAddress(), - getRemoteStoreNodeAttributes(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - DiscoveryNode remoteNode2 = new DiscoveryNode( - UUIDs.base64UUID(), - buildNewFakeTransportAddress(), - getRemoteStoreNodeAttributes(), - DiscoveryNodeRole.BUILT_IN_ROLES, - Version.CURRENT - ); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() - .add(remoteNode1) - .localNodeId(remoteNode1.getId()) - .add(remoteNode2) - .localNodeId(remoteNode2.getId()) - .build(); - Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(migratedIndex); - assertDocrepSettingsApplied(docrepIdxMetadata.index(migratedIndex)); - Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); - ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) - .metadata(remoteIndexMd) - .nodes(discoveryNodes) - .routingTable(createRoutingTableAllShardsStarted(migratedIndex, 1, 1, remoteNode1, remoteNode2)) - .build(); - Metadata mutatedMetadata = transportClusterUpdateSettingsAction.finalizeMigration(clusterStateWithDocrepIndexSettings).metadata(); - assertTrue(mutatedMetadata.index(migratedIndex).getVersion() > docrepIdxMetadata.index(migratedIndex).getVersion()); - assertRemoteSettingsApplied(mutatedMetadata.index(migratedIndex)); - } - - public void testFinalizeMigrationWithAllDocrepNodes() { - AllocationService allocationService = new AllocationService( - new AllocationDeciders(Collections.singleton(new MaxRetryAllocationDecider())), - new TestGatewayAllocator(), - new BalancedShardsAllocator(Settings.EMPTY), - EmptyClusterInfoService.INSTANCE, - EmptySnapshotsInfoService.INSTANCE - ); - TransportClusterUpdateSettingsAction transportClusterUpdateSettingsAction = new TransportClusterUpdateSettingsAction( - transportService, - clusterService, - threadPool, - allocationService, - new ActionFilters(Collections.emptySet()), - new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)), - clusterService.getClusterSettings() - ); - - String docrepIndex = "docrep-index"; - Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); - DiscoveryNode docrepNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNode docrepNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); - DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() - .add(docrepNode1) - .localNodeId(docrepNode1.getId()) - .add(docrepNode2) - .localNodeId(docrepNode2.getId()) - .build(); - Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(docrepIndex); - assertDocrepSettingsApplied(docrepIdxMetadata.index(docrepIndex)); - Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); - ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) - .metadata(remoteIndexMd) - .nodes(discoveryNodes) - .routingTable(createRoutingTableAllShardsStarted(docrepIndex, 1, 1, docrepNode1, docrepNode2)) - .build(); - Metadata mutatedMetadata = transportClusterUpdateSettingsAction.finalizeMigration(clusterStateWithDocrepIndexSettings).metadata(); - assertEquals(docrepIdxMetadata.index(docrepIndex).getVersion(), mutatedMetadata.index(docrepIndex).getVersion()); - assertDocrepSettingsApplied(mutatedMetadata.index(docrepIndex)); - } - private Map getRemoteStoreNodeAttributes() { Map remoteStoreNodeAttributes = new HashMap<>(); remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1"); return remoteStoreNodeAttributes; } - - private void assertRemoteSettingsApplied(IndexMetadata indexMetadata) { - assertTrue(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); - assertTrue(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); - assertTrue(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); - assertEquals(ReplicationType.SEGMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); - } - - private void assertDocrepSettingsApplied(IndexMetadata indexMetadata) { - assertFalse(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); - assertFalse(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); - assertFalse(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); - assertEquals(ReplicationType.DOCUMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); - } - - private RoutingTable createRoutingTableAllShardsStarted( - String indexName, - int numberOfShards, - int numberOfReplicas, - DiscoveryNode primaryHostingNode, - DiscoveryNode replicaHostingNode - ) { - RoutingTable.Builder builder = RoutingTable.builder(); - Index index = new Index(indexName, UUID.randomUUID().toString()); - - IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); - for (int i = 0; i < numberOfShards; i++) { - ShardId shardId = new ShardId(index, i); - IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); - indexShardRoutingTable.addShard( - TestShardRouting.newShardRouting(shardId, primaryHostingNode.getId(), true, ShardRoutingState.STARTED) - ); - for (int j = 0; j < numberOfReplicas; j++) { - indexShardRoutingTable.addShard( - TestShardRouting.newShardRouting(shardId, replicaHostingNode.getId(), false, ShardRoutingState.STARTED) - ); - } - indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); - } - return builder.add(indexRoutingTableBuilder.build()).build(); - } } diff --git a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java index 15915ee431972..ec48032df4a15 100644 --- a/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java +++ b/server/src/test/java/org/opensearch/index/remote/RemoteStoreUtilsTests.java @@ -9,15 +9,29 @@ package org.opensearch.index.remote; import org.opensearch.Version; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.IndexShardRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.ShardRoutingState; +import org.opensearch.cluster.routing.TestShardRouting; +import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.support.PlainBlobMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.core.index.Index; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShardTestUtils; import org.opensearch.index.store.RemoteSegmentStoreDirectory; import org.opensearch.index.translog.transfer.TranslogTransferMetadata; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.test.OpenSearchTestCase; @@ -28,11 +42,15 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; import static org.opensearch.cluster.metadata.IndexMetadata.REMOTE_STORE_CUSTOM_KEY; +import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdaterTests.createIndexMetadataWithDocrepSettings; import static org.opensearch.index.remote.RemoteStoreUtils.URL_BASE64_CHARSET; import static org.opensearch.index.remote.RemoteStoreUtils.determineTranslogMetadataEnabled; +import static org.opensearch.index.remote.RemoteStoreUtils.finalizeMigration; +import static org.opensearch.index.remote.RemoteStoreUtils.isSwitchToStrictCompatibilityMode; import static org.opensearch.index.remote.RemoteStoreUtils.longToCompositeBase64AndBinaryEncoding; import static org.opensearch.index.remote.RemoteStoreUtils.longToUrlBase64; import static org.opensearch.index.remote.RemoteStoreUtils.urlBase64ToLong; @@ -42,6 +60,9 @@ import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.METADATA_PREFIX; import static org.opensearch.index.store.RemoteSegmentStoreDirectory.MetadataFilenameUtils.SEPARATOR; import static org.opensearch.index.translog.transfer.TranslogTransferMetadata.METADATA_SEPARATOR; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY; +import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; public class RemoteStoreUtilsTests extends OpenSearchTestCase { @@ -398,4 +419,122 @@ private static Map getCustomDataMap(int option) { ); } + public void testFinalizeMigrationWithAllRemoteNodes() { + String migratedIndex = "migrated-index"; + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + DiscoveryNode remoteNode1 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNode remoteNode2 = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + getRemoteStoreNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(remoteNode1) + .localNodeId(remoteNode1.getId()) + .add(remoteNode2) + .localNodeId(remoteNode2.getId()) + .build(); + Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(migratedIndex); + assertDocrepSettingsApplied(docrepIdxMetadata.index(migratedIndex)); + Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); + ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) + .metadata(remoteIndexMd) + .nodes(discoveryNodes) + .routingTable(createRoutingTableAllShardsStarted(migratedIndex, 1, 1, remoteNode1, remoteNode2)) + .build(); + Metadata mutatedMetadata = finalizeMigration(clusterStateWithDocrepIndexSettings, logger).metadata(); + assertTrue(mutatedMetadata.index(migratedIndex).getVersion() > docrepIdxMetadata.index(migratedIndex).getVersion()); + assertRemoteSettingsApplied(mutatedMetadata.index(migratedIndex)); + } + + public void testFinalizeMigrationWithAllDocrepNodes() { + String docrepIndex = "docrep-index"; + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + DiscoveryNode docrepNode1 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNode docrepNode2 = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT); + DiscoveryNodes discoveryNodes = DiscoveryNodes.builder() + .add(docrepNode1) + .localNodeId(docrepNode1.getId()) + .add(docrepNode2) + .localNodeId(docrepNode2.getId()) + .build(); + Metadata docrepIdxMetadata = createIndexMetadataWithDocrepSettings(docrepIndex); + assertDocrepSettingsApplied(docrepIdxMetadata.index(docrepIndex)); + Metadata remoteIndexMd = Metadata.builder(docrepIdxMetadata).persistentSettings(mockSettings).build(); + ClusterState clusterStateWithDocrepIndexSettings = ClusterState.builder(ClusterName.DEFAULT) + .metadata(remoteIndexMd) + .nodes(discoveryNodes) + .routingTable(createRoutingTableAllShardsStarted(docrepIndex, 1, 1, docrepNode1, docrepNode2)) + .build(); + Metadata mutatedMetadata = finalizeMigration(clusterStateWithDocrepIndexSettings, logger).metadata(); + assertEquals(docrepIdxMetadata.index(docrepIndex).getVersion(), mutatedMetadata.index(docrepIndex).getVersion()); + assertDocrepSettingsApplied(mutatedMetadata.index(docrepIndex)); + } + + public void testIsSwitchToStrictCompatibilityMode() { + Settings mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "strict").build(); + ClusterUpdateSettingsRequest request = new ClusterUpdateSettingsRequest(); + request.persistentSettings(mockSettings); + assertTrue(isSwitchToStrictCompatibilityMode(request)); + + mockSettings = Settings.builder().put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed").build(); + request.persistentSettings(mockSettings); + assertFalse(isSwitchToStrictCompatibilityMode(request)); + } + + private void assertRemoteSettingsApplied(IndexMetadata indexMetadata) { + assertTrue(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertTrue(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.SEGMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private void assertDocrepSettingsApplied(IndexMetadata indexMetadata) { + assertFalse(IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING.get(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertFalse(IndexMetadata.INDEX_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING.exists(indexMetadata.getSettings())); + assertEquals(ReplicationType.DOCUMENT, IndexMetadata.INDEX_REPLICATION_TYPE_SETTING.get(indexMetadata.getSettings())); + } + + private RoutingTable createRoutingTableAllShardsStarted( + String indexName, + int numberOfShards, + int numberOfReplicas, + DiscoveryNode primaryHostingNode, + DiscoveryNode replicaHostingNode + ) { + RoutingTable.Builder builder = RoutingTable.builder(); + Index index = new Index(indexName, UUID.randomUUID().toString()); + + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index); + for (int i = 0; i < numberOfShards; i++) { + ShardId shardId = new ShardId(index, i); + IndexShardRoutingTable.Builder indexShardRoutingTable = new IndexShardRoutingTable.Builder(shardId); + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, primaryHostingNode.getId(), true, ShardRoutingState.STARTED) + ); + for (int j = 0; j < numberOfReplicas; j++) { + indexShardRoutingTable.addShard( + TestShardRouting.newShardRouting(shardId, replicaHostingNode.getId(), false, ShardRoutingState.STARTED) + ); + } + indexRoutingTableBuilder.addIndexShard(indexShardRoutingTable.build()); + } + return builder.add(indexRoutingTableBuilder.build()).build(); + } + + private Map getRemoteStoreNodeAttributes() { + Map remoteStoreNodeAttributes = new HashMap<>(); + remoteStoreNodeAttributes.put(REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-segment-repo-1"); + remoteStoreNodeAttributes.put(REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY, "my-translog-repo-1"); + return remoteStoreNodeAttributes; + } }