diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java index 0493bcf800c97..b65f6f056aae6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/MigrationBaseTestCase.java @@ -186,10 +186,11 @@ private Thread getIndexingThread() { indexSingleDoc(indexName); long currentDocCount = indexedDocs.incrementAndGet(); if (currentDocCount > 0 && currentDocCount % refreshFrequency == 0) { - logger.info("--> [iteration {}] flushing index", currentDocCount); if (rarely()) { + logger.info("--> [iteration {}] flushing index", currentDocCount); client().admin().indices().prepareFlush(indexName).get(); } else { + logger.info("--> [iteration {}] refreshing index", currentDocCount); client().admin().indices().prepareRefresh(indexName).get(); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java index 293691ace2edd..cea653c0ead4b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemotePrimaryRelocationIT.java @@ -8,14 +8,11 @@ package org.opensearch.remotemigration; -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.client.Client; import org.opensearch.client.Requests; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; @@ -66,8 +63,8 @@ public void testRemotePrimaryRelocation() throws Exception { AtomicInteger numAutoGenDocs = new AtomicInteger(); final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); - + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); // add remote node in mixed mode cluster @@ -141,17 +138,19 @@ public void testRemotePrimaryRelocation() throws Exception { logger.info("--> relocation from remote to remote complete"); finished.set(true); - indexingThread.join(); + asyncIndexingService.stopIndexing(); refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test").setTrackTotalHits(true).get(), + asyncIndexingService.getIndexedDocs() + ); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) .get(), - numAutoGenDocs.get() + asyncIndexingService.getIndexedDocs() ); - } public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { @@ -165,9 +164,8 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); ensureGreen("test"); - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getIndexingThread(finished, numAutoGenDocs); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); @@ -209,27 +207,11 @@ public void testMixedModeRelocation_RemoteSeedingFail() throws Exception { assertEquals(actionGet.getRelocatingShards(), 0); assertEquals(docRepNode, primaryNodeName("test")); - finished.set(true); - indexingThread.join(); + asyncIndexingService.stopIndexing(); client().admin() .cluster() .prepareUpdateSettings() .setTransientSettings(Settings.builder().put(RecoverySettings.INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.getKey(), (String) null)) .get(); } - - private static Thread getIndexingThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 10_000) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - } - }); - indexingThread.start(); - return indexingThread; - } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java index 196ecb991bbc0..7270341202990 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotemigration/RemoteReplicaRecoveryIT.java @@ -8,32 +8,27 @@ package org.opensearch.remotemigration; -import com.carrotsearch.randomizedtesting.generators.RandomNumbers; - -import org.opensearch.action.DocWriteResponse; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.opensearch.action.delete.DeleteResponse; -import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.hamcrest.OpenSearchAssertions; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; import static org.opensearch.node.remotestore.RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING; import static org.opensearch.node.remotestore.RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) - public class RemoteReplicaRecoveryIT extends MigrationBaseTestCase { protected int maximumNumberOfShards() { @@ -63,10 +58,8 @@ public void testReplicaRecovery() throws Exception { client().admin().indices().prepareCreate("test").setSettings(indexSettings()).setMapping("field", "type=text").get(); String replicaNode = internalCluster().startNode(); ensureGreen("test"); - - AtomicInteger numAutoGenDocs = new AtomicInteger(); - final AtomicBoolean finished = new AtomicBoolean(false); - Thread indexingThread = getThread(finished, numAutoGenDocs); + AsyncIndexingService asyncIndexingService = new AsyncIndexingService("test"); + asyncIndexingService.startIndexing(); refresh("test"); @@ -78,12 +71,10 @@ public void testReplicaRecovery() throws Exception { updateSettingsRequest.persistentSettings(Settings.builder().put(MIGRATION_DIRECTION_SETTING.getKey(), "remote_store")); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - String remoteNode2 = internalCluster().startNode(); + internalCluster().startNode(); internalCluster().validateClusterFormed(); // identify the primary - - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> relocating primary from {} to {} ", primaryNode, remoteNode); client().admin() .cluster() @@ -102,7 +93,6 @@ public void testReplicaRecovery() throws Exception { assertEquals(0, clusterHealthResponse.getRelocatingShards()); logger.info("--> relocation of primary from docrep to remote complete"); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); logger.info("--> getting up the new replicas now to doc rep node as well as remote node "); // Increase replica count to 3 @@ -129,52 +119,33 @@ public void testReplicaRecovery() throws Exception { logger.info("--> replica is up now on another docrep now as well as remote node"); assertEquals(0, clusterHealthResponse.getRelocatingShards()); + asyncIndexingService.stopIndexing(); + refresh("test"); - Thread.sleep(RandomNumbers.randomIntBetween(random(), 0, 2000)); + // segrep lag should be zero + assertBusy(() -> { + SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin() + .indices() + .prepareSegmentReplicationStats("test") + .setDetailed(true) + .execute() + .actionGet(); + SegmentReplicationPerGroupStats perGroupStats = segmentReplicationStatsResponse.getReplicationStats().get("test").get(0); + assertEquals(segmentReplicationStatsResponse.getReplicationStats().size(), 1); + perGroupStats.getReplicaStats().stream().forEach(e -> assertEquals(e.getCurrentReplicationLagMillis(), 0)); + }, 20, TimeUnit.SECONDS); - // Stop replicas on docrep now. - // ToDo : Remove once we have dual replication enabled - client().admin() - .indices() - .updateSettings( - new UpdateSettingsRequest("test").settings( - Settings.builder() - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) - .put("index.routing.allocation.exclude._name", primaryNode + "," + replicaNode) - .build() - ) - ) - .get(); - - finished.set(true); - indexingThread.join(); - refresh("test"); - OpenSearchAssertions.assertHitCount(client().prepareSearch("test").setTrackTotalHits(true).get(), numAutoGenDocs.get()); + OpenSearchAssertions.assertHitCount( + client().prepareSearch("test").setTrackTotalHits(true).get(), + asyncIndexingService.getIndexedDocs() + ); OpenSearchAssertions.assertHitCount( client().prepareSearch("test") .setTrackTotalHits(true)// extra paranoia ;) .setQuery(QueryBuilders.termQuery("auto", true)) - // .setPreference("_prefer_nodes:" + (remoteNode+ "," + remoteNode2)) .get(), - numAutoGenDocs.get() + asyncIndexingService.getIndexedDocs() ); } - - private Thread getThread(AtomicBoolean finished, AtomicInteger numAutoGenDocs) { - Thread indexingThread = new Thread(() -> { - while (finished.get() == false && numAutoGenDocs.get() < 100) { - IndexResponse indexResponse = client().prepareIndex("test").setId("id").setSource("field", "value").get(); - assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); - DeleteResponse deleteResponse = client().prepareDelete("test", "id").get(); - assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); - client().prepareIndex("test").setSource("auto", true).get(); - numAutoGenDocs.incrementAndGet(); - logger.info("Indexed {} docs here", numAutoGenDocs.get()); - } - }); - indexingThread.start(); - return indexingThread; - } - } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 96e9473663385..794535361f626 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -525,7 +525,6 @@ public boolean shouldSeedRemoteStore() { public Function isShardOnRemoteEnabledNode = nodeId -> { DiscoveryNode node = discoveryNodes.get(nodeId); if (node != null) { - logger.trace("Node {} has remote_enabled as {}", nodeId, node.isRemoteStoreNode()); return node.isRemoteStoreNode(); } return false; diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index fbd7ab7cea346..f6ed113019897 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -384,7 +384,7 @@ private void logReplicationFailure(SegmentReplicationState state, ReplicationFai protected void updateVisibleCheckpoint(long replicationId, IndexShard replicaShard) { // Update replication checkpoint on source via transport call only supported for remote store integration. For node- // node communication, checkpoint update is piggy-backed to GET_SEGMENT_FILES transport call - if (replicaShard.indexSettings().isRemoteStoreEnabled() == false) { + if (replicaShard.indexSettings().isAssignedOnRemoteNode() == false) { return; } ShardRouting primaryShard = clusterService.state().routingTable().shardRoutingTable(replicaShard.shardId()).primaryShard(); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index aaac8998aad37..d25b3f0110b10 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -91,7 +91,7 @@ public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIn // make sure the store is not released until we are done. this.cancellableThreads = new CancellableThreads(); store.incRef(); - if (indexShard.indexSettings().isRemoteStoreEnabled()) { + if (indexShard.indexSettings().isAssignedOnRemoteNode()) { indexShard.remoteStore().incRef(); } } @@ -284,7 +284,7 @@ protected void closeInternal() { try { store.decRef(); } finally { - if (indexShard.indexSettings().isRemoteStoreEnabled()) { + if (indexShard.indexSettings().isAssignedOnRemoteNode()) { indexShard.remoteStore().decRef(); } }