From a3c6f000cc3a46f06c4e6189da02eb18d52dc27b Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Mon, 28 Oct 2024 17:46:04 +0530 Subject: [PATCH] create publication repos during join task execution (#16383) * create publication repos during join task Signed-off-by: Rajiv Kumar Vaidyanathan --- .../RemotePublicationConfigurationIT.java | 10 +- .../coordination/JoinTaskExecutor.java | 42 +++- .../remotestore/RemoteStoreNodeService.java | 2 +- .../coordination/JoinTaskExecutorTests.java | 236 +++++++++++++++++- 4 files changed, 275 insertions(+), 15 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java index 98859f517aad4..57bf9eccbf5b4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemotePublicationConfigurationIT.java @@ -14,6 +14,7 @@ import org.opensearch.remotemigration.MigrationBaseTestCase; import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.repositories.fs.FsRepository; import org.opensearch.repositories.fs.ReloadableFsRepository; import org.opensearch.test.InternalSettingsPlugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -97,23 +98,26 @@ public Settings.Builder remotePublishConfiguredNodeSetting() { .put(stateRepoSettingsAttributeKeyPrefix + prefixModeVerificationSuffix, true) .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true) .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, ROUTING_TABLE_REPO_NAME) - .put(routingTableRepoTypeAttributeKey, ReloadableFsRepository.TYPE) + .put(routingTableRepoTypeAttributeKey, FsRepository.TYPE) .put(routingTableRepoSettingsAttributeKeyPrefix + "location", segmentRepoPath); return builder; } public Settings.Builder remoteWithRoutingTableNodeSetting() { // Remote Cluster with Routing table + return Settings.builder() .put( - buildRemoteStoreNodeAttributes( + remoteStoreClusterSettings( REPOSITORY_NAME, segmentRepoPath, + ReloadableFsRepository.TYPE, REPOSITORY_2_NAME, translogRepoPath, + ReloadableFsRepository.TYPE, REPOSITORY_NAME, segmentRepoPath, - false + ReloadableFsRepository.TYPE ) ) .put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true); diff --git a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java index 395807d2fed3e..e52ac11e07532 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/JoinTaskExecutor.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; @@ -59,6 +60,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; @@ -190,11 +192,30 @@ public ClusterTasksResult execute(ClusterState currentState, List jo // for every set of node join task which we can optimize to not compute if cluster state already has // repository information. Optional remoteDN = currentNodes.getNodes().values().stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst(); - DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get()); - RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( - dn, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) - ); + Optional remotePublicationDN = currentNodes.getNodes() + .values() + .stream() + .filter(DiscoveryNode::isRemoteStatePublicationEnabled) + .findFirst(); + RepositoriesMetadata existingRepositoriesMetadata = currentState.getMetadata().custom(RepositoriesMetadata.TYPE); + Map repositories = new LinkedHashMap<>(); + if (existingRepositoriesMetadata != null) { + existingRepositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } + if (remoteDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remoteDN.get(), + existingRepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } + if (remotePublicationDN.isPresent()) { + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + remotePublicationDN.get(), + existingRepositoriesMetadata + ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); + } assert nodesBuilder.isLocalNodeElectedClusterManager(); @@ -224,15 +245,16 @@ public ClusterTasksResult execute(ClusterState currentState, List jo ensureNodeCommissioned(node, currentState.metadata()); nodesBuilder.add(node); - if (remoteDN.isEmpty() && node.isRemoteStoreNode()) { + if ((remoteDN.isEmpty() && node.isRemoteStoreNode()) + || (remotePublicationDN.isEmpty() && node.isRemoteStatePublicationEnabled())) { // This is hit only on cases where we encounter first remote node logger.info("Updating system repository now for remote store"); - repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( + RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata( node, - currentState.getMetadata().custom(RepositoriesMetadata.TYPE) + existingRepositoriesMetadata ); + repositoriesMetadata.repositories().forEach(r -> repositories.putIfAbsent(r.name(), r)); } - nodesChanged = true; minClusterNodeVersion = Version.min(minClusterNodeVersion, node.getVersion()); maxClusterNodeVersion = Version.max(maxClusterNodeVersion, node.getVersion()); @@ -246,7 +268,7 @@ public ClusterTasksResult execute(ClusterState currentState, List jo } results.success(joinTask); } - + RepositoriesMetadata repositoriesMetadata = new RepositoriesMetadata(new ArrayList<>(repositories.values())); if (nodesChanged) { rerouteService.reroute( "post-join reroute", diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java index cc5d8b0e62e90..c1c041ce01198 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java @@ -168,7 +168,7 @@ public void createAndVerifyRepositories(DiscoveryNode localNode) { * node repository metadata an exception will be thrown and the node will not be allowed to join the cluster. */ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode, RepositoriesMetadata existingRepositories) { - if (joiningNode.isRemoteStoreNode()) { + if (joiningNode.isRemoteStoreNode() || joiningNode.isRemoteStatePublicationEnabled()) { List updatedRepositoryMetadataList = new ArrayList<>(); List newRepositoryMetadataList = new RemoteStoreNodeAttribute(joiningNode).getRepositoriesMetadata() .repositories(); diff --git a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java index e5b8d407af868..8ee944646a413 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java @@ -1041,6 +1041,201 @@ public void testUpdatesClusterStateWithMultiNodeClusterAndSameRepository() throw validateRepositoryMetadata(result.resultingState, clusterManagerNode, 2); } + public void testUpdatesRepoRemoteNodeJoinPublicationCluster() throws Exception { + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + null, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remotePublicationNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + + final Settings settings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + + ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build(); + + final DiscoveryNode remoteStoreNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult resultAfterRemoteNodeJoin = joinTaskExecutor.execute( + currentState, + List.of(new JoinTaskExecutor.Task(remoteStoreNode, "test")) + ); + assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next(); + assertTrue(taskResult1.isSuccess()); + validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, remoteStoreNode, clusterManagerNode); + } + + public void testUpdatesRepoPublicationNodeJoinRemoteCluster() throws Exception { + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + null, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes(SEGMENT_REPO, TRANSLOG_REPO), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .build(); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(clusterManagerNode, "elect leader")) + ); + final Settings settings = Settings.builder() + .put(MIGRATION_DIRECTION_SETTING.getKey(), RemoteStoreNodeService.Direction.REMOTE_STORE) + .put(REMOTE_STORE_COMPATIBILITY_MODE_SETTING.getKey(), "mixed") + .build(); + final Settings nodeSettings = Settings.builder().put(REMOTE_STORE_MIGRATION_EXPERIMENTAL, "true").build(); + FeatureFlags.initializeFeatureFlags(nodeSettings); + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + + ClusterState currentState = ClusterState.builder(result.resultingState).metadata(metadata).build(); + + final DiscoveryNode remotePublicationNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remotePublicationNodeAttributes(), + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult resultAfterRemoteNodeJoin = joinTaskExecutor.execute( + currentState, + List.of(new JoinTaskExecutor.Task(remotePublicationNode, "test")) + ); + assertThat(resultAfterRemoteNodeJoin.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult1 = resultAfterRemoteNodeJoin.executionResults.values().iterator().next(); + assertTrue(taskResult1.isSuccess()); + validateRepositoriesMetadata(resultAfterRemoteNodeJoin.resultingState, clusterManagerNode, remotePublicationNode); + } + + public void testUpdatesClusterStateWithMultiplePublicationNodeJoin() throws Exception { + Map remoteStoreNodeAttributes = remotePublicationNodeAttributes(); + final AllocationService allocationService = mock(AllocationService.class); + when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]); + final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null); + final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService( + new SetOnce<>(mock(RepositoriesService.class))::get, + null + ); + + final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor( + Settings.EMPTY, + allocationService, + logger, + rerouteService, + null, + remoteStoreNodeService + ); + + final DiscoveryNode clusterManagerNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + List repositoriesMetadata = new ArrayList<>(); + + final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .nodes( + DiscoveryNodes.builder() + .add(clusterManagerNode) + .localNodeId(clusterManagerNode.getId()) + .clusterManagerNodeId(clusterManagerNode.getId()) + ) + .metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata))) + .build(); + + final DiscoveryNode joiningNode = new DiscoveryNode( + UUIDs.base64UUID(), + buildNewFakeTransportAddress(), + remoteStoreNodeAttributes, + DiscoveryNodeRole.BUILT_IN_ROLES, + Version.CURRENT + ); + + final ClusterStateTaskExecutor.ClusterTasksResult result = joinTaskExecutor.execute( + clusterState, + List.of(new JoinTaskExecutor.Task(joiningNode, "test")) + ); + assertThat(result.executionResults.entrySet(), hasSize(1)); + final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next(); + assertTrue(taskResult.isSuccess()); + validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode); + } + public void testNodeJoinInMixedMode() { List versions = allOpenSearchVersions(); assert versions.size() >= 2 : "test requires at least two open search versions"; @@ -1273,7 +1468,9 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); assertTrue(repositoriesMetadata.repositories().size() == expectedRepositories); - if (repositoriesMetadata.repositories().size() == 2 || repositoriesMetadata.repositories().size() == 3) { + if (repositoriesMetadata.repositories().size() == 2 + || repositoriesMetadata.repositories().size() == 3 + || repositoriesMetadata.repositories().size() == 4) { final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(existingNode, SEGMENT_REPO); final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(existingNode, TRANSLOG_REPO); for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { @@ -1294,6 +1491,43 @@ private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode } } + private void validatePublicationRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode) throws Exception { + final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); + assertTrue(repositoriesMetadata.repositories().size() == 2); + final RepositoryMetadata clusterStateRepoMetadata = buildRepositoryMetadata(existingNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepoMetadata = buildRepositoryMetadata(existingNode, ROUTING_TABLE_REPO); + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (repositoryMetadata.name().equals(clusterStateRepoMetadata.name())) { + assertTrue(clusterStateRepoMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(routingTableRepoMetadata.name())) { + assertTrue(routingTableRepoMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } + } + } + + private void validateRepositoriesMetadata(ClusterState updatedState, DiscoveryNode remoteNode, DiscoveryNode publicationNode) + throws Exception { + + final RepositoriesMetadata repositoriesMetadata = updatedState.metadata().custom(RepositoriesMetadata.TYPE); + assertEquals(4, repositoriesMetadata.repositories().size()); + final RepositoryMetadata segmentRepositoryMetadata = buildRepositoryMetadata(remoteNode, SEGMENT_REPO); + final RepositoryMetadata translogRepositoryMetadata = buildRepositoryMetadata(remoteNode, TRANSLOG_REPO); + final RepositoryMetadata clusterStateRepositoryMetadata = buildRepositoryMetadata(remoteNode, CLUSTER_STATE_REPO); + final RepositoryMetadata routingTableRepositoryMetadata = buildRepositoryMetadata(publicationNode, ROUTING_TABLE_REPO); + + for (RepositoryMetadata repositoryMetadata : repositoriesMetadata.repositories()) { + if (repositoryMetadata.name().equals(segmentRepositoryMetadata.name())) { + assertTrue(segmentRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(translogRepositoryMetadata.name())) { + assertTrue(translogRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(clusterStateRepositoryMetadata.name())) { + assertTrue(clusterStateRepositoryMetadata.equalsIgnoreGenerations(repositoryMetadata)); + } else if (repositoryMetadata.name().equals(routingTableRepositoryMetadata.name())) { + assertTrue(repositoryMetadata.equalsIgnoreGenerations(routingTableRepositoryMetadata)); + } + } + } + private DiscoveryNode newDiscoveryNode(Map attributes) { return new DiscoveryNode( randomAlphaOfLength(10),