diff --git a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt index 33a311c0..156c2732 100644 --- a/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt +++ b/src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt @@ -518,13 +518,18 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript for (alias in toAdd) { log.info("Adding alias ${alias.alias} from $followerIndexName") // Copying writeIndex from leader doesn't cause any issue as writes will be blocked anyways - request.addAliasAction(AliasActions.add().index(followerIndexName) - .alias(alias.alias) - .indexRouting(alias.indexRouting) - .searchRouting(alias.searchRouting) - .writeIndex(alias.writeIndex()) - .isHidden(alias.isHidden) - ) + var aliasAction = AliasActions.add().index(followerIndexName) + .alias(alias.alias) + .indexRouting(alias.indexRouting) + .searchRouting(alias.searchRouting) + .writeIndex(alias.writeIndex()) + .isHidden(alias.isHidden) + + if (alias.filteringRequired()) { + aliasAction = aliasAction.filter(alias.filter.string()) + } + + request.addAliasAction(aliasAction) } var toRemove = followerAliases - leaderAliases diff --git a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt index 954b637d..f989859c 100644 --- a/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt +++ b/src/test/kotlin/org/opensearch/replication/integ/rest/StartReplicationIT.kt @@ -65,6 +65,7 @@ import org.opensearch.index.mapper.MapperService import org.opensearch.repositories.fs.FsRepository import org.opensearch.test.OpenSearchTestCase.assertBusy import org.junit.Assert +import org.opensearch.cluster.metadata.AliasMetadata import org.opensearch.common.xcontent.DeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry import org.opensearch.replication.ReplicationPlugin.Companion.REPLICATION_INDEX_TRANSLOG_PRUNING_ENABLED_SETTING @@ -344,8 +345,10 @@ class StartReplicationIT: MultiClusterRestTestCase() { createConnectionBetweenClusters(FOLLOWER, LEADER) - val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName).alias(Alias("leaderAlias")), RequestOptions.DEFAULT) - assertThat(createIndexResponse.isAcknowledged).isTrue() + val createIndexResponse = leaderClient.indices().create(CreateIndexRequest(leaderIndexName) + .alias(Alias("leaderAlias").filter("{\"term\":{\"year\":2016}}").routing("1")) + , RequestOptions.DEFAULT) + assertThat(createIndexResponse.isAcknowledged).isTrue try { followerClient.startReplication(StartReplicationRequest("source", leaderIndexName, followerIndexName), waitForRestore = true) @@ -361,6 +364,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { followerClient.indices().getAlias(GetAliasesRequest().indices(followerIndexName), RequestOptions.DEFAULT).aliases[followerIndexName] ) + }, 30L, TimeUnit.SECONDS) } finally { followerClient.stopReplication(followerIndexName) @@ -541,7 +545,7 @@ class StartReplicationIT: MultiClusterRestTestCase() { var indicesAliasesRequest = IndicesAliasesRequest() var aliasAction = IndicesAliasesRequest.AliasActions.add() .index(leaderIndexName) - .alias("alias1") + .alias("alias1").filter("{\"term\":{\"year\":2016}}").routing("1") indicesAliasesRequest.addAliasAction(aliasAction) leaderClient.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT)