From 0bad3074ea75527af227cd9228e088b1877dae95 Mon Sep 17 00:00:00 2001 From: Sai Kumar Date: Wed, 24 Aug 2022 12:21:13 +0530 Subject: [PATCH] Modified _stop replication API to remove any stale replication settings on existing index (#410) Signed-off-by: Sai Kumar --- .../TransportStopIndexReplicationAction.kt | 29 ++++--- .../replication/MultiClusterRestTestCase.kt | 31 ++++++- .../integ/rest/StopReplicationIT.kt | 82 ++++++++++++++++++- 3 files changed, 128 insertions(+), 14 deletions(-) diff --git a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt index dcc5e9e5..fd82bba4 100644 --- a/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt +++ b/src/main/kotlin/org/opensearch/replication/action/stop/TransportStopIndexReplicationAction.kt @@ -22,10 +22,7 @@ import org.opensearch.replication.metadata.UpdateMetadataAction import org.opensearch.replication.metadata.UpdateMetadataRequest import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex -import org.opensearch.replication.metadata.store.ReplicationMetadata import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper -import org.opensearch.replication.task.index.IndexReplicationParams -import org.opensearch.replication.util.completeWith import org.opensearch.replication.util.coroutineContext import org.opensearch.replication.util.suspendExecute import org.opensearch.replication.util.suspending @@ -39,7 +36,6 @@ import org.opensearch.OpenSearchException import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.open.OpenIndexRequest import org.opensearch.action.support.ActionFilters -import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.action.support.master.TransportMasterNodeAction import org.opensearch.client.Client @@ -57,8 +53,6 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.settings.Settings -import org.opensearch.index.IndexNotFoundException -import org.opensearch.index.shard.ShardId import org.opensearch.replication.util.stackTraceToString import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService @@ -99,7 +93,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: throw OpenSearchException("Failed to remove index block on ${request.indexName}") } - validateStopReplicationRequest(request) + validateReplicationStateOfIndex(request) // Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen val restoring = clusterService.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry -> @@ -117,8 +111,9 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: throw OpenSearchException("Unable to close index: ${request.indexName}") } } - val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName) + try { + val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName) val remoteClient = client.getRemoteClusterClient(replMetadata.connectionName) val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient) retentionLeaseHelper.attemptRemoveRetentionLease(clusterService, replMetadata, request.indexName) @@ -127,12 +122,12 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } val clusterStateUpdateResponse : AcknowledgedResponse = - clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)} + clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)} if (!clusterStateUpdateResponse.isAcknowledged) { throw OpenSearchException("Failed to update cluster state") } - // Index will be deleted if stop is called while it is restoring. So no need to reopen + // Index will be deleted if stop is called while it is restoring. So no need to reopen if (!restoring && state.routingTable.hasIndex(request.indexName)) { val reopenResponse = client.suspending(client.admin().indices()::open, injectSecurityContext = true)(OpenIndexRequest(request.indexName)) @@ -149,7 +144,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: } } - private fun validateStopReplicationRequest(request: StopIndexReplicationRequest) { + private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) { + // If replication blocks/settings are present, Stop action should proceed with the clean-up + // This can happen during settings of follower index are carried over in the snapshot and the restore is + // performed using this snapshot. + if (clusterService.state().blocks.hasIndexBlock(request.indexName, INDEX_REPLICATION_BLOCK) + || clusterService.state().metadata.index(request.indexName)?.settings?.get(REPLICATED_INDEX_SETTING.key) != null) { + return + } + val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName) ?: throw IllegalArgumentException("No replication in progress for index:${request.indexName}") @@ -187,13 +190,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService: val mdBuilder = Metadata.builder(currentState.metadata) // remove replicated index setting val currentIndexMetadata = currentState.metadata.index(request.indexName) - if (currentIndexMetadata != null) { + if (currentIndexMetadata != null && + currentIndexMetadata.settings[REPLICATED_INDEX_SETTING.key] != null) { val newIndexMetadata = IndexMetadata.builder(currentIndexMetadata) .settings(Settings.builder().put(currentIndexMetadata.settings).putNull(REPLICATED_INDEX_SETTING.key)) .settingsVersion(1 + currentIndexMetadata.settingsVersion) mdBuilder.put(newIndexMetadata) } newState.metadata(mdBuilder) + return newState.build() } diff --git a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt index 8f09b766..888d8af9 100644 --- a/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/replication/MultiClusterRestTestCase.kt @@ -108,6 +108,9 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { val lowLevelClient = restClient.lowLevelClient!! var defaultSecuritySetupCompleted = false + companion object { + const val FS_SNAPSHOT_REPO = "repl_repo" + } } companion object { @@ -253,7 +256,33 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() { */ @Before fun setup() { - testClusters.values.forEach { if(it.securityEnabled && !it.defaultSecuritySetupCompleted) setupDefaultSecurityRoles(it) } + testClusters.values.forEach { + registerSnapshotRepository(it) + if(it.securityEnabled && !it.defaultSecuritySetupCompleted) + setupDefaultSecurityRoles(it) + } + } + + /** + * Register snapshot repo - "fs" type on all the clusters + */ + private fun registerSnapshotRepository(testCluster: TestCluster) { + val getResponse: Map = OpenSearchRestTestCase.entityAsMap(testCluster.lowLevelClient.performRequest( + Request("GET", "/_cluster/settings?include_defaults=true&flat_settings=true"))) + val configuredRepositories = (getResponse["defaults"] as Map<*, *>)["path.repo"] as List<*> + if(configuredRepositories.isEmpty()) { + return + } + val repo = configuredRepositories[0] as String + val repoConfig = """ + { + "type": "fs", + "settings": { + "location": "$repo" + } + } + """.trimIndent() + triggerRequest(testCluster.lowLevelClient, "PUT", "_snapshot/${TestCluster.FS_SNAPSHOT_REPO}", repoConfig) } /** diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt index 970d6109..09b797ae 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StopReplicationIT.kt @@ -17,22 +17,30 @@ import org.opensearch.replication.MultiClusterRestTestCase import org.opensearch.replication.StartReplicationRequest import org.opensearch.replication.startReplication import org.opensearch.replication.stopReplication +import org.opensearch.replication.replicationStatus +import org.opensearch.replication.getShardReplicationTasks +import org.opensearch.replication.`validate status syncing response` import org.apache.http.util.EntityUtils import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.Assert import org.opensearch.OpenSearchStatusException import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest +import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest +import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest +import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest import org.opensearch.action.index.IndexRequest import org.opensearch.client.Request import org.opensearch.client.RequestOptions import org.opensearch.client.ResponseException import org.opensearch.client.indices.CreateIndexRequest import org.opensearch.client.indices.GetIndexRequest +import org.opensearch.cluster.SnapshotsInProgress import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.index.mapper.MapperService -import org.opensearch.test.OpenSearchTestCase.assertBusy +import java.util.Random import java.util.concurrent.TimeUnit @@ -234,4 +242,76 @@ class StopReplicationIT: MultiClusterRestTestCase() { val sourceMap = mapOf("name" to randomAlphaOfLength(5)) followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT) } + + fun `test stop replication with stale replication settings at leader cluster`() { + val followerClient = getClientForCluster(FOLLOWER) + val leaderClient = getClientForCluster(LEADER) + createConnectionBetweenClusters(FOLLOWER, LEADER, "source") + + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue() + val snapshotSuffix = Random().nextInt(1000).toString() + + try { + followerClient.startReplication( + StartReplicationRequest("source", leaderIndexName, followerIndexName), + TimeValue.timeValueSeconds(10), + true + ) + + assertBusy({ + var statusResp = followerClient.replicationStatus(followerIndexName) + `validate status syncing response`(statusResp) + assertThat(followerClient.getShardReplicationTasks(followerIndexName)).isNotEmpty() + }, 60, TimeUnit.SECONDS) + + // Trigger snapshot on the follower cluster + val createSnapshotRequest = CreateSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") + createSnapshotRequest.waitForCompletion(true) + followerClient.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT) + + assertBusy { + var snapshotStatusResponse = followerClient.snapshot().status(SnapshotsStatusRequest(TestCluster.FS_SNAPSHOT_REPO, + arrayOf("test-$snapshotSuffix")), RequestOptions.DEFAULT) + for (snapshotStatus in snapshotStatusResponse.snapshots) { + Assert.assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.state) + } + } + + // Restore follower index on leader cluster + val restoreSnapshotRequest = RestoreSnapshotRequest(TestCluster.FS_SNAPSHOT_REPO, "test-$snapshotSuffix") + restoreSnapshotRequest.indices(followerIndexName) + restoreSnapshotRequest.waitForCompletion(true) + restoreSnapshotRequest.renamePattern("(.+)") + restoreSnapshotRequest.renameReplacement("restored-\$1") + leaderClient.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT) + + assertBusy { + assertThat(leaderClient.indices().exists(GetIndexRequest("restored-$followerIndexName"), RequestOptions.DEFAULT)).isEqualTo(true) + } + + // Invoke stop on the new leader cluster index + assertThatThrownBy { leaderClient.stopReplication("restored-$followerIndexName") } + .isInstanceOf(ResponseException::class.java) + .hasMessageContaining("Metadata for restored-$followerIndexName doesn't exist") + + // Start replication on the new leader index + followerClient.startReplication( + StartReplicationRequest("source", "restored-$followerIndexName", "restored-$followerIndexName"), + TimeValue.timeValueSeconds(10), + true, true + ) + + assertBusy({ + var statusResp = followerClient.replicationStatus("restored-$followerIndexName") + `validate status syncing response`(statusResp) + assertThat(followerClient.getShardReplicationTasks("restored-$followerIndexName")).isNotEmpty() + }, 60, TimeUnit.SECONDS) + + } finally { + followerClient.stopReplication("restored-$followerIndexName") + followerClient.stopReplication(followerIndexName) + } + + } }