Skip to content

Commit

Permalink
Modified _stop replication API to remove any stale replication settin…
Browse files Browse the repository at this point in the history
…gs on existing index

Signed-off-by: Sai Kumar <karanas@amazon.com>
  • Loading branch information
saikaranam-amazon committed Jun 2, 2022
1 parent 043d860 commit ebb1570
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import org.opensearch.replication.metadata.UpdateMetadataAction
import org.opensearch.replication.metadata.UpdateMetadataRequest
import org.opensearch.replication.metadata.state.REPLICATION_LAST_KNOWN_OVERALL_STATE
import org.opensearch.replication.metadata.state.getReplicationStateParamsForIndex
import org.opensearch.replication.metadata.store.ReplicationMetadata
import org.opensearch.replication.seqno.RemoteClusterRetentionLeaseHelper
import org.opensearch.replication.task.index.IndexReplicationParams
import org.opensearch.replication.util.completeWith
import org.opensearch.replication.util.coroutineContext
import org.opensearch.replication.util.suspendExecute
import org.opensearch.replication.util.suspending
Expand All @@ -39,7 +36,6 @@ import org.opensearch.OpenSearchException
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.open.OpenIndexRequest
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.action.support.master.TransportMasterNodeAction
import org.opensearch.client.Client
Expand All @@ -57,8 +53,6 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.shard.ShardId
import org.opensearch.replication.util.stackTraceToString
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -99,7 +93,7 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Failed to remove index block on ${request.indexName}")
}

validateStopReplicationRequest(request)
validateReplicationStateOfIndex(request)

// Index will be deleted if replication is stopped while it is restoring. So no need to close/reopen
val restoring = clusterService.state().custom<RestoreInProgress>(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).any { entry ->
Expand All @@ -117,8 +111,9 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
throw OpenSearchException("Unable to close index: ${request.indexName}")
}
}
val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)

try {
val replMetadata = replicationMetadataManager.getIndexReplicationMetadata(request.indexName)
val remoteClient = client.getRemoteClusterClient(replMetadata.connectionName)
val retentionLeaseHelper = RemoteClusterRetentionLeaseHelper(clusterService.clusterName.value(), remoteClient)
retentionLeaseHelper.attemptRemoveRetentionLease(clusterService, replMetadata, request.indexName)
Expand All @@ -127,12 +122,12 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
}

val clusterStateUpdateResponse : AcknowledgedResponse =
clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)}
clusterService.waitForClusterStateUpdate("stop_replication") { l -> StopReplicationTask(request, l)}
if (!clusterStateUpdateResponse.isAcknowledged) {
throw OpenSearchException("Failed to update cluster state")
}

// Index will be deleted if stop is called while it is restoring. So no need to reopen
// Index will be deleted if stop is called while it is restoring. So no need to reopen
if (!restoring &&
state.routingTable.hasIndex(request.indexName)) {
val reopenResponse = client.suspending(client.admin().indices()::open, injectSecurityContext = true)(OpenIndexRequest(request.indexName))
Expand All @@ -149,7 +144,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
}
}

private fun validateStopReplicationRequest(request: StopIndexReplicationRequest) {
private fun validateReplicationStateOfIndex(request: StopIndexReplicationRequest) {
// If replication blocks/settings are present, Stop action should proceed with the clean-up
// This can happen during settings of follower index are carried over in the snapshot and the restore is
// performed using this snapshot.
if (clusterService.state().blocks.hasIndexBlock(request.indexName, INDEX_REPLICATION_BLOCK)
|| clusterService.state().metadata.index(request.indexName)?.settings?.get(REPLICATED_INDEX_SETTING.key) != null) {
return
}

val replicationStateParams = getReplicationStateParamsForIndex(clusterService, request.indexName)
?:
throw IllegalArgumentException("No replication in progress for index:${request.indexName}")
Expand Down Expand Up @@ -187,13 +190,15 @@ class TransportStopIndexReplicationAction @Inject constructor(transportService:
val mdBuilder = Metadata.builder(currentState.metadata)
// remove replicated index setting
val currentIndexMetadata = currentState.metadata.index(request.indexName)
if (currentIndexMetadata != null) {
if (currentIndexMetadata != null &&
currentIndexMetadata.settings[REPLICATED_INDEX_SETTING.key] != null) {
val newIndexMetadata = IndexMetadata.builder(currentIndexMetadata)
.settings(Settings.builder().put(currentIndexMetadata.settings).putNull(REPLICATED_INDEX_SETTING.key))
.settingsVersion(1 + currentIndexMetadata.settingsVersion)
mdBuilder.put(newIndexMetadata)
}
newState.metadata(mdBuilder)

return newState.build()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,33 @@ abstract class MultiClusterRestTestCase : OpenSearchTestCase() {
*/
@Before
fun setup() {
testClusters.values.forEach { if(it.securityEnabled && !it.defaultSecuritySetupCompleted) setupDefaultSecurityRoles(it) }
testClusters.values.forEach {
registerSnapshotRepository(it)
if(it.securityEnabled && !it.defaultSecuritySetupCompleted)
setupDefaultSecurityRoles(it)
}
}

/**
* Register snapshot repo - "fs" type on all the clusters
*/
private fun registerSnapshotRepository(testCluster: TestCluster) {
val getResponse: Map<String, Any> = OpenSearchRestTestCase.entityAsMap(testCluster.lowLevelClient.performRequest(
Request("GET", "/_cluster/settings?include_defaults=true&flat_settings=true")))
val configuredRepositories = (getResponse["defaults"] as Map<*, *>)["path.repo"] as List<*>
if(configuredRepositories.isEmpty()) {
return
}
val repo = configuredRepositories[0] as String
val repoConfig = """
{
"type": "fs",
"settings": {
"location": "$repo"
}
}
""".trimIndent()
triggerRequest(testCluster.lowLevelClient, "PUT", "_snapshot/repl_repo", repoConfig)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,30 @@ import org.opensearch.replication.MultiClusterRestTestCase
import org.opensearch.replication.StartReplicationRequest
import org.opensearch.replication.startReplication
import org.opensearch.replication.stopReplication
import org.opensearch.replication.replicationStatus
import org.opensearch.replication.getShardReplicationTasks
import org.opensearch.replication.`validate status syncing response`
import org.apache.http.util.EntityUtils
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.Assert
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.client.Request
import org.opensearch.client.RequestOptions
import org.opensearch.client.ResponseException
import org.opensearch.client.indices.CreateIndexRequest
import org.opensearch.client.indices.GetIndexRequest
import org.opensearch.cluster.SnapshotsInProgress
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.index.mapper.MapperService
import org.opensearch.test.OpenSearchTestCase.assertBusy
import java.util.Random
import java.util.concurrent.TimeUnit


Expand Down Expand Up @@ -234,4 +242,78 @@ class StopReplicationIT: MultiClusterRestTestCase() {
val sourceMap = mapOf("name" to randomAlphaOfLength(5))
followerClient.index(IndexRequest(followerIndexName).id("2").source(sourceMap), RequestOptions.DEFAULT)
}

fun `test stop replication with stale replication settings at leader cluster`() {
val followerClient = getClientForCluster(FOLLOWER)
val leaderClient = getClientForCluster(LEADER)
createConnectionBetweenClusters(FOLLOWER, LEADER, "source")

val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName), RequestOptions.DEFAULT)
assertThat(createIndexResponse.isAcknowledged).isTrue()
val snapshotSuffix = Random().nextInt(1000).toString()

try {
followerClient.startReplication(
StartReplicationRequest("source", leaderIndexName, followerIndexName),
TimeValue.timeValueSeconds(10),
true
)

assertBusy({
var statusResp = followerClient.replicationStatus(followerIndexName)
`validate status syncing response`(statusResp)
assertThat(followerClient.getShardReplicationTasks(followerIndexName)).isNotEmpty()
}, 60, TimeUnit.SECONDS)

// Trigger snapshot on the follower cluster
val createSnapshotRequest = CreateSnapshotRequest("repl_repo", "test-$snapshotSuffix")
createSnapshotRequest.waitForCompletion(true)
followerClient.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT)

assertBusy {
var snapshotStatusResponse = followerClient.snapshot().status(SnapshotsStatusRequest("repl_repo",
arrayOf("test-$snapshotSuffix")), RequestOptions.DEFAULT)
for (snapshotStatus in snapshotStatusResponse.snapshots) {
Assert.assertEquals(SnapshotsInProgress.State.SUCCESS, snapshotStatus.state)
}
}

// Restore follower index on leader cluster
val restoreSnapshotRequest = RestoreSnapshotRequest("repl_repo", "test-$snapshotSuffix")
restoreSnapshotRequest.indices(followerIndexName)
restoreSnapshotRequest.waitForCompletion(true)
restoreSnapshotRequest.renamePattern("(.+)")
restoreSnapshotRequest.renameReplacement("restored-\$1")
leaderClient.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT)

assertBusy {
assertThat(leaderClient.indices().exists(GetIndexRequest("restored-$followerIndexName"), RequestOptions.DEFAULT)).isEqualTo(true)
}

// Invoke stop on the new leader cluster index
assertThatThrownBy { leaderClient.stopReplication("restored-$followerIndexName") }
.isInstanceOf(ResponseException::class.java)
.hasMessageContaining("Metadata for restored-$followerIndexName doesn't exist")

// stop replicaton on the original index
followerClient.stopReplication(followerIndexName)

// Start replication on the new leader index
followerClient.startReplication(
StartReplicationRequest("source", "restored-$followerIndexName", "restored-$followerIndexName"),
TimeValue.timeValueSeconds(10),
true, true
)

assertBusy({
var statusResp = followerClient.replicationStatus("restored-$followerIndexName")
`validate status syncing response`(statusResp)
assertThat(followerClient.getShardReplicationTasks("restored-$followerIndexName")).isNotEmpty()
}, 60, TimeUnit.SECONDS)

} finally {
followerClient.stopReplication("restored-$followerIndexName")
}

}
}

0 comments on commit ebb1570

Please sign in to comment.