diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index cf753e3360c39..b7392c1f34bb0 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine { private final LocalCheckpointTracker localCheckpointTracker; private final WriteOnlyTranslogManager translogManager; + private volatile long lastReceivedGen = SequenceNumbers.NO_OPS_PERFORMED; + private static final int SI_COUNTER_INCREMENT = 10; public NRTReplicationEngine(EngineConfig engineConfig) { @@ -120,14 +122,16 @@ public TranslogManager translogManager() { public synchronized void updateSegments(final SegmentInfos infos, long seqNo) throws IOException { // Update the current infos reference on the Engine's reader. + long incomingGeneration = infos.getGeneration(); readerManager.updateSegments(infos); - // only update the persistedSeqNo and "lastCommitted" infos reference if the incoming segments have a higher - // generation. We can still refresh with incoming SegmentInfos that are not part of a commit point. - if (infos.getGeneration() > lastCommittedSegmentInfos.getGeneration()) { - this.lastCommittedSegmentInfos = infos; + // Commit and roll the xlog when we receive a different generation than what was last received. + // lower/higher gens are possible from a new primary that was just elected. + if (incomingGeneration != lastReceivedGen) { + commitSegmentInfos(); translogManager.rollTranslogGeneration(); } + lastReceivedGen = incomingGeneration; localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } @@ -141,20 +145,16 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th * * @throws IOException - When there is an IO error committing the SegmentInfos. */ - public void commitSegmentInfos() throws IOException { - // TODO: This method should wait for replication events to finalize. - final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); - /* - This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied - from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is - used to generate new segment file names. The ideal solution is to identify the counter from previous primary. - */ - latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; - latestSegmentInfos.changed(); - store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + private void commitSegmentInfos(SegmentInfos infos) throws IOException { + store.commitSegmentInfos(infos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint()); + this.lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo(); translogManager.syncTranslog(); } + protected void commitSegmentInfos() throws IOException { + commitSegmentInfos(getLatestSegmentInfos()); + } + @Override public String getHistoryUUID() { return loadHistoryUUID(lastCommittedSegmentInfos.userData); @@ -350,10 +350,20 @@ public SafeCommitInfo getSafeCommitInfo() { @Override protected final void closeNoLock(String reason, CountDownLatch closedLatch) { + logger.info("Closing"); if (isClosed.compareAndSet(false, true)) { assert rwl.isWriteLockedByCurrentThread() || failEngineLock.isHeldByCurrentThread() : "Either the write lock must be held or the engine must be currently be failing itself"; try { + final SegmentInfos latestSegmentInfos = getLatestSegmentInfos(); + /* + This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied + from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is + used to generate new segment file names. The ideal solution is to identify the counter from previous primary. + */ + latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT; + latestSegmentInfos.changed(); + commitSegmentInfos(latestSegmentInfos); IOUtils.close(readerManager, translogManager, store::decRef); } catch (Exception e) { logger.warn("failed to close engine", e); diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 16e615672a26f..8fbb24720aedc 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -74,6 +74,9 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re * @throws IOException - When Refresh fails with an IOException. */ public synchronized void updateSegments(SegmentInfos infos) throws IOException { + // roll over the currentInfo's generation, this ensures the on-disk gen + // is always increased. + infos.updateGeneration(currentInfos); currentInfos = infos; maybeRefresh(); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 670af1f1c6fd9..28dc0ad49d4ec 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -623,7 +623,7 @@ public void updateShardState( if (indexSettings.isSegRepEnabled()) { // this Shard's engine was read only, we need to update its engine before restoring local history from xlog. assert newRouting.primary() && currentRouting.primary() == false; - promoteNRTReplicaToPrimary(); + resetEngineToGlobalCheckpoint(); } replicationTracker.activatePrimaryMode(getLocalCheckpoint()); ensurePeerRecoveryRetentionLeasesExist(); @@ -3557,7 +3557,9 @@ private void innerAcquireReplicaOperationPermit( currentGlobalCheckpoint, maxSeqNo ); - if (currentGlobalCheckpoint < maxSeqNo) { + // With Segment Replication enabled, we never want to reset a replica's engine unless + // it is promoted to primary. + if (currentGlobalCheckpoint < maxSeqNo && indexSettings.isSegRepEnabled() == false) { resetEngineToGlobalCheckpoint(); } else { getEngine().translogManager().rollTranslogGeneration(); @@ -4120,26 +4122,4 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() { public GatedCloseable getSegmentInfosSnapshot() { return getEngine().getSegmentInfosSnapshot(); } - - /** - * With segment replication enabled - prepare the shard's engine to be promoted as the new primary. - * - * If this shard is currently using a replication engine, this method: - * 1. Invokes {@link NRTReplicationEngine#commitSegmentInfos()} to ensure the engine can be reopened as writeable from the latest refresh point. - * InternalEngine opens its IndexWriter from an on-disk commit point, but this replica may have recently synced from a primary's refresh point, meaning it has documents searchable in its in-memory SegmentInfos - * that are not part of a commit point. This ensures that those documents are made part of a commit and do not need to be reindexed after promotion. - * 2. Invokes resetEngineToGlobalCheckpoint - This call performs the engine swap, opening up as a writeable engine and replays any operations in the xlog. The operations indexed from xlog here will be - * any ack'd writes that were not copied to this replica before promotion. - */ - private void promoteNRTReplicaToPrimary() { - assert shardRouting.primary() && indexSettings.isSegRepEnabled(); - getReplicationEngine().ifPresentOrElse(engine -> { - try { - engine.commitSegmentInfos(); - resetEngineToGlobalCheckpoint(); - } catch (IOException e) { - throw new EngineException(shardId, "Unable to update replica to writeable engine, failing shard", e); - } - }, () -> { throw new EngineException(shardId, "Expected replica engine to be of type NRTReplicationEngine"); }); - } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java index 3509615052707..ec3986017afac 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java +++ b/server/src/main/java/org/opensearch/indices/recovery/MultiFileWriter.java @@ -156,7 +156,10 @@ private void innerWriteFileChunk(StoreFileMetadata fileMetadata, long position, + temporaryFileName + "] in " + Arrays.toString(store.directory().listAll()); - store.directory().sync(Collections.singleton(temporaryFileName)); + // With Segment Replication, we will fsync after a full commit has been received. + if (store.indexSettings().isSegRepEnabled() == false) { + store.directory().sync(Collections.singleton(temporaryFileName)); + } IndexOutput remove = removeOpenIndexOutputs(name); assert remove == null || remove == indexOutput; // remove maybe null if we got finished } diff --git a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java index 540054782133a..96d5573621683 100644 --- a/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java @@ -11,14 +11,11 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.SegmentInfos; -import org.hamcrest.MatcherAssert; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lucene.Lucene; -import org.opensearch.common.lucene.search.Queries; import org.opensearch.common.settings.Settings; import org.opensearch.index.IndexSettings; -import org.opensearch.index.mapper.ParsedDocument; import org.opensearch.index.seqno.LocalCheckpointTracker; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.store.Store; @@ -36,17 +33,21 @@ import java.util.stream.Collectors; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED; import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO; public class NRTReplicationEngineTests extends EngineTestCase { + private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings( + "index", + Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() + ); + public void testCreateEngine() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos(); @@ -70,7 +71,7 @@ public void testEngineWritesOpsToTranslog() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica( @@ -93,6 +94,9 @@ public void testEngineWritesOpsToTranslog() throws Exception { // we don't index into nrtEngine, so get the doc ids from the regular engine. final List docs = getDocIds(engine, true); + // close the NRTEngine, it will commit on close and we'll reuse its store for an IE. + nrtEngine.close(); + // recover a new engine from the nrtEngine's xlog. nrtEngine.translogManager().syncTranslog(); try (InternalEngine engine = new InternalEngine(nrtEngine.config())) { @@ -104,88 +108,77 @@ public void testEngineWritesOpsToTranslog() throws Exception { } } - public void testUpdateSegments() throws Exception { + public void testUpdateSegments_replicaReceivesSISWithHigherGen() throws IOException { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { - // add docs to the primary engine. - List operations = generateHistoryOnReplica( - between(1, 500), - randomBoolean(), - randomBoolean(), - randomBoolean(), - Engine.Operation.TYPE.INDEX - ); - - for (Engine.Operation op : operations) { - applyOperation(engine, op); - applyOperation(nrtEngine, op); - } - - engine.refresh("test"); - - final SegmentInfos latestPrimaryInfos = engine.getLatestSegmentInfos(); - nrtEngine.updateSegments(latestPrimaryInfos, engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine, latestPrimaryInfos); - - // assert a doc from the operations exists. - final ParsedDocument parsedDoc = createParsedDoc(operations.stream().findFirst().get().id(), null); - try (Engine.GetResult getResult = engine.get(newGet(true, parsedDoc), engine::acquireSearcher)) { - assertThat(getResult.exists(), equalTo(true)); - assertThat(getResult.docIdAndVersion(), notNullValue()); - } - - try (Engine.GetResult getResult = nrtEngine.get(newGet(true, parsedDoc), nrtEngine::acquireSearcher)) { - assertThat(getResult.exists(), equalTo(true)); - assertThat(getResult.docIdAndVersion(), notNullValue()); - } - - // Flush the primary and update the NRTEngine with the latest committed infos. - engine.flush(); - nrtEngine.translogManager().syncTranslog(); // to advance persisted checkpoint + // assume we start at the same gen. + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(nrtEngine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(engine.getLatestSegmentInfos().getGeneration(), nrtEngine.getLatestSegmentInfos().getGeneration()); + + // flush the primary engine - we don't need any segments, just force a new commit point. + engine.flush(true, true); + assertEquals(3, engine.getLatestSegmentInfos().getGeneration()); + nrtEngine.updateSegments(engine.getLatestSegmentInfos(), engine.getProcessedLocalCheckpoint()); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); + } + } - Set seqNos = operations.stream().map(Engine.Operation::seqNo).collect(Collectors.toSet()); + public void testUpdateSegments_replicaReceivesSISWithLowerGen() throws IOException { + // if the replica is already at segments_N that is received, it will commit segments_N+1. + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - nrtEngine.ensureOpen(); - try ( - Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() - ) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); - assertThat( - TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), - equalTo(seqNos) - ); - } + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + nrtEngine.getLatestSegmentInfos().changed(); + nrtEngine.getLatestSegmentInfos().changed(); + // commit the infos to push us to segments_3. + nrtEngine.commitSegmentInfos(); + assertEquals(3, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(3, nrtEngine.getLatestSegmentInfos().getGeneration()); - final SegmentInfos primaryInfos = engine.getLastCommittedSegmentInfos(); + // update the replica with segments_2 from the primary. + final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + assertEquals(2, primaryInfos.getGeneration()); nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint()); - assertMatchingSegmentsAndCheckpoints(nrtEngine, primaryInfos); + assertEquals(4, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(4, nrtEngine.getLatestSegmentInfos().getGeneration()); + assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); + assertEquals(primaryInfos.getVersion(), nrtEngine.getLastCommittedSegmentInfos().getVersion()); - assertEquals( - assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().getGeneration().translogFileGeneration, - assertAndGetInternalTranslogManager(engine.translogManager()).getTranslog().getGeneration().translogFileGeneration - ); + nrtEngine.close(); + assertEquals(5, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + } + } - try ( - Translog.Snapshot snapshot = assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getTranslog().newSnapshot() - ) { - assertThat(snapshot.totalOperations(), equalTo(operations.size())); - assertThat( - TestTranslog.drainSnapshot(snapshot, false).stream().map(Translog.Operation::seqNo).collect(Collectors.toSet()), - equalTo(seqNos) - ); - } + public void testUpdateSegments_replicaCommitsFirstReceivedInfos() throws IOException { + final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - // Ensure the same hit count between engines. - int expectedDocCount; - try (final Engine.Searcher test = engine.acquireSearcher("test")) { - expectedDocCount = test.count(Queries.newMatchAllQuery()); - assertSearcherHits(nrtEngine, expectedDocCount); - } - assertEngineCleanedUp(nrtEngine, assertAndGetInternalTranslogManager(nrtEngine.translogManager()).getDeletionPolicy()); + try ( + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); + final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) + ) { + assertEquals(2, nrtEngine.getLastCommittedSegmentInfos().getGeneration()); + assertEquals(2, nrtEngine.getLatestSegmentInfos().getGeneration()); + // bump the latest infos version a couple of times so that we can assert the correct version after commit. + engine.getLatestSegmentInfos().changed(); + engine.getLatestSegmentInfos().changed(); + assertNotEquals(nrtEngine.getLatestSegmentInfos().getVersion(), engine.getLatestSegmentInfos().getVersion()); + + // update replica with the latest primary infos, it will be the same gen, segments_2, ensure it is also committed. + final SegmentInfos primaryInfos = engine.getLatestSegmentInfos(); + assertEquals(2, primaryInfos.getGeneration()); + nrtEngine.updateSegments(primaryInfos, engine.getProcessedLocalCheckpoint()); + final SegmentInfos lastCommittedSegmentInfos = nrtEngine.getLastCommittedSegmentInfos(); + assertEquals(primaryInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); + assertEquals(primaryInfos.getVersion(), lastCommittedSegmentInfos.getVersion()); } } @@ -193,7 +186,7 @@ public void testTrimTranslogOps() throws Exception { final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); try ( - final Store nrtEngineStore = createStore(); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore); ) { List operations = generateHistoryOnReplica( @@ -227,12 +220,9 @@ public void testCommitSegmentInfos() throws Exception { // This test asserts that NRTReplication#commitSegmentInfos creates a new commit point with the latest checkpoints // stored in user data. final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED); - final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings( - "index", - Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build() - ); + try ( - final Store nrtEngineStore = createStore(indexSettings, newDirectory()); + final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory()); final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore) ) { List operations = generateHistoryOnReplica(between(1, 500), randomBoolean(), randomBoolean(), randomBoolean()) @@ -268,22 +258,6 @@ public void testCommitSegmentInfos() throws Exception { } } - private void assertMatchingSegmentsAndCheckpoints(NRTReplicationEngine nrtEngine, SegmentInfos expectedSegmentInfos) - throws IOException { - assertEquals(engine.getPersistedLocalCheckpoint(), nrtEngine.getPersistedLocalCheckpoint()); - assertEquals(engine.getProcessedLocalCheckpoint(), nrtEngine.getProcessedLocalCheckpoint()); - assertEquals(engine.getLocalCheckpointTracker().getMaxSeqNo(), nrtEngine.getLocalCheckpointTracker().getMaxSeqNo()); - assertEquals(expectedSegmentInfos.files(true), nrtEngine.getLatestSegmentInfos().files(true)); - assertEquals(expectedSegmentInfos.getUserData(), nrtEngine.getLatestSegmentInfos().getUserData()); - assertEquals(expectedSegmentInfos.getVersion(), nrtEngine.getLatestSegmentInfos().getVersion()); - } - - private void assertSearcherHits(Engine engine, int hits) { - try (final Engine.Searcher test = engine.acquireSearcher("test")) { - MatcherAssert.assertThat(test, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(hits)); - } - } - private NRTReplicationEngine buildNrtReplicaEngine(AtomicLong globalCheckpoint, Store store) throws IOException { Lucene.cleanLuceneIndex(store.directory()); final Path translogDir = createTempDir(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 3af882a8087ec..007317f6e71cd 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -8,6 +8,8 @@ package org.opensearch.index.shard; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; @@ -15,6 +17,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -48,6 +51,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -66,7 +70,7 @@ public class SegmentReplicationIndexShardTests extends OpenSearchIndexLevelRepli .build(); /** - * Test that latestReplicationCheckpoint returns null only for docrep enabled indices + * Test that latestReplicationCheckpoint returns null only for docrep enabled indices */ public void testReplicationCheckpointNullForDocRep() throws IOException { Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "DOCUMENT").put(Settings.EMPTY).build(); @@ -76,11 +80,10 @@ public void testReplicationCheckpointNullForDocRep() throws IOException { } /** - * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices + * Test that latestReplicationCheckpoint returns ReplicationCheckpoint for segrep enabled indices */ - public void testReplicationCheckpointNotNullForSegReb() throws IOException { - Settings indexSettings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, "SEGMENT").put(Settings.EMPTY).build(); - final IndexShard indexShard = newStartedShard(indexSettings); + public void testReplicationCheckpointNotNullForSegRep() throws IOException { + final IndexShard indexShard = newStartedShard(randomBoolean(), settings, new NRTReplicationEngineFactory()); final ReplicationCheckpoint replicationCheckpoint = indexShard.getLatestReplicationCheckpoint(); assertNotNull(replicationCheckpoint); closeShards(indexShard); @@ -205,6 +208,132 @@ public void testPublishCheckpointAfterRelocationHandOff() throws IOException { closeShards(shard); } + public void testReplicaReceivesGenIncrease() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + final int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + flushShard(primary, true); + replicateSegments(primary, shards.getReplicas()); + + final int totalDocs = numDocs + shards.indexDocs(randomIntBetween(numDocs + 1, numDocs + 10)); + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + assertEqualCommittedSegments(primary, replica); + assertDocCount(primary, totalDocs); + assertDocCount(replica, totalDocs); + } + } + + public void testReplicaReceivesLowerGeneration() throws Exception { + // when a replica gets incoming segments that are lower than what it currently has on disk. + + // start 3 nodes Gens: P [2], R [2], R[2] + // index some docs and flush twice, push to only 1 replica. + // State Gens: P [4], R-1 [3], R-2 [2] + // Promote R-2 as the new primary and demote the old primary. + // State Gens: R[4], R-1 [3], P [4] - *commit on close of NRTEngine, xlog replayed and commit made. + // index docs on new primary and flush + // replicate to all. + // Expected result: State Gens: P[4], R-1 [4], R-2 [4] + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica_1 = shards.getReplicas().get(0); + final IndexShard replica_2 = shards.getReplicas().get(1); + int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + flushShard(primary, false); + replicateSegments(primary, List.of(replica_1)); + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(primary, false); + assertLatestCommitGen(4, primary); + replicateSegments(primary, List.of(replica_1)); + + assertEqualCommittedSegments(primary, replica_1); + assertLatestCommitGen(4, primary, replica_1); + assertLatestCommitGen(2, replica_2); + + shards.promoteReplicaToPrimary(replica_2).get(); + primary.close("demoted", false); + primary.store().close(); + IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); + shards.recoverReplica(oldPrimary); + assertLatestCommitGen(4, oldPrimary); + assertEqualCommittedSegments(oldPrimary, replica_1); + + assertLatestCommitGen(4, replica_2); + + numDocs = randomIntBetween(numDocs + 1, numDocs + 10); + shards.indexDocs(numDocs); + flushShard(replica_2, false); + replicateSegments(replica_2, shards.getReplicas()); + assertEqualCommittedSegments(replica_2, oldPrimary, replica_1); + } + } + + public void testReplicaRestarts() throws Exception { + try (ReplicationGroup shards = createGroup(3, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + IndexShard primary = shards.getPrimary(); + // 1. Create ops that are in the index and xlog of both shards but not yet part of a commit point. + final int numDocs = shards.indexDocs(randomInt(10)); + + // refresh and copy the segments over. + if (randomBoolean()) { + flushShard(primary); + } + primary.refresh("Test"); + replicateSegments(primary, shards.getReplicas()); + + // at this point both shards should have numDocs persisted and searchable. + assertDocCounts(primary, numDocs, numDocs); + for (IndexShard shard : shards.getReplicas()) { + assertDocCounts(shard, numDocs, numDocs); + } + + final int i1 = randomInt(5); + for (int i = 0; i < i1; i++) { + shards.indexDocs(randomInt(10)); + + // randomly resetart a replica + final IndexShard replicaToRestart = getRandomReplica(shards); + replicaToRestart.close("restart", false); + replicaToRestart.store().close(); + shards.removeReplica(replicaToRestart); + final IndexShard newReplica = shards.addReplicaWithExistingPath( + replicaToRestart.shardPath(), + replicaToRestart.routingEntry().currentNodeId() + ); + shards.recoverReplica(newReplica); + + // refresh and push segments to our other replicas. + if (randomBoolean()) { + failAndPromoteRandomReplica(shards); + } + flushShard(shards.getPrimary()); + replicateSegments(shards.getPrimary(), shards.getReplicas()); + } + primary = shards.getPrimary(); + + // refresh and push segments to our other replica. + flushShard(primary); + replicateSegments(primary, shards.getReplicas()); + + for (IndexShard shard : shards) { + assertConsistentHistoryBetweenTranslogAndLucene(shard); + } + final List docsAfterReplication = getDocIdAndSeqNos(shards.getPrimary()); + for (IndexShard shard : shards.getReplicas()) { + assertThat(shard.routingEntry().toString(), getDocIdAndSeqNos(shard), equalTo(docsAfterReplication)); + } + } + } + public void testNRTReplicaPromotedAsPrimary() throws Exception { try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { shards.startAll(); @@ -523,4 +652,39 @@ public void onReplicationFailure(SegmentReplicationState state, OpenSearchExcept assertEquals("Should have resolved listener with failure", 0, latch.getCount()); assertNull(targetService.get(target.getId())); } + + private IndexShard getRandomReplica(ReplicationGroup shards) { + return shards.getReplicas().get(randomInt(shards.getReplicas().size() - 1)); + } + + private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws IOException { + IndexShard primary = shards.getPrimary(); + final IndexShard newPrimary = getRandomReplica(shards); + shards.promoteReplicaToPrimary(newPrimary); + primary.close("demoted", true); + primary.store().close(); + primary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); + shards.recoverReplica(primary); + return newPrimary; + } + + private void assertLatestCommitGen(long expected, IndexShard... shards) throws IOException { + for (IndexShard indexShard : shards) { + try (final GatedCloseable commit = indexShard.acquireLastIndexCommit(false)) { + assertEquals(expected, commit.get().getGeneration()); + } + } + } + + private void assertEqualCommittedSegments(IndexShard primary, IndexShard... replicas) throws IOException { + for (IndexShard replica : replicas) { + final SegmentInfos replicaInfos = replica.store().readLastCommittedSegmentsInfo(); + final SegmentInfos primaryInfos = primary.store().readLastCommittedSegmentsInfo(); + final Map latestReplicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); + final Map latestPrimaryMetadata = primary.store().getSegmentMetadataMap(primaryInfos); + final Store.RecoveryDiff diff = Store.segmentReplicationDiff(latestPrimaryMetadata, latestReplicaMetadata); + assertTrue(diff.different.isEmpty()); + assertTrue(diff.missing.isEmpty()); + } + } } diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 89b11d604d7a1..4dea6ca7face0 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -658,16 +658,16 @@ public void testRecoveryDiff() throws IOException, InterruptedException { Store.RecoveryDiff newCommitDiff = newCommitMetadata.recoveryDiff(metadata); if (delFile != null) { assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 5)); // segments_N, del file, cfs, cfe, si for the - // new segment + // new segment assertThat(newCommitDiff.different.size(), equalTo(1)); // the del file must be different assertThat(newCommitDiff.different.get(0).name(), endsWith(".liv")); assertThat(newCommitDiff.missing.size(), equalTo(4)); // segments_N,cfs, cfe, si for the new segment } else { assertThat(newCommitDiff.identical.size(), equalTo(newCommitMetadata.size() - 4)); // segments_N, cfs, cfe, si for the new - // segment + // segment assertThat(newCommitDiff.different.size(), equalTo(0)); assertThat(newCommitDiff.missing.size(), equalTo(4)); // an entire segment must be missing (single doc segment got dropped) plus - // the commit is different + // the commit is different } deleteContent(store.directory()); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 3ea74dbf38919..cc5100fba9010 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -61,6 +61,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.index.engine.InternalEngineFactory; import org.opensearch.index.engine.InternalEngineTests; +import org.opensearch.index.engine.NRTReplicationEngineFactory; import org.opensearch.index.mapper.SourceToParse; import org.opensearch.index.replication.OpenSearchIndexLevelReplicationTestCase; import org.opensearch.index.replication.RecoveryDuringReplicationTests; @@ -106,7 +107,7 @@ public void testTranslogHistoryTransferred() throws Exception { public void testWithSegmentReplication_ReplicaUsesPrimaryTranslogUUID() throws Exception { Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build(); - try (ReplicationGroup shards = createGroup(2, settings)) { + try (ReplicationGroup shards = createGroup(2, settings, new NRTReplicationEngineFactory())) { shards.startAll(); final String expectedUUID = getTranslog(shards.getPrimary()).getTranslogUUID(); assertTrue( diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index af754d77560cc..f4a9f51789679 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1507,10 +1507,10 @@ public static MapperService createMapperService() throws IOException { * Exposes a translog associated with the given engine for testing purpose. */ public static Translog getTranslog(Engine engine) { - assert engine instanceof InternalEngine : "only InternalEngines have translogs, got: " + engine.getClass(); - InternalEngine internalEngine = (InternalEngine) engine; - internalEngine.ensureOpen(); - TranslogManager translogManager = internalEngine.translogManager(); + assert engine instanceof InternalEngine || engine instanceof NRTReplicationEngine + : "only InternalEngines or NRTReplicationEngines have translogs, got: " + engine.getClass(); + engine.ensureOpen(); + TranslogManager translogManager = engine.translogManager(); assert translogManager instanceof InternalTranslogManager : "only InternalTranslogManager have translogs, got: " + engine.getClass(); InternalTranslogManager internalTranslogManager = (InternalTranslogManager) translogManager; diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 073dc4b84472e..09eca006d600a 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -139,6 +139,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -1252,10 +1253,10 @@ public final List replicateSegments( List replicaShards ) throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(replicaShards.size()); - Store.MetadataSnapshot primaryMetadata; + Map primaryMetadata; try (final GatedCloseable segmentInfosSnapshot = primaryShard.getSegmentInfosSnapshot()) { final SegmentInfos primarySegmentInfos = segmentInfosSnapshot.get(); - primaryMetadata = primaryShard.store().getMetadata(primarySegmentInfos); + primaryMetadata = primaryShard.store().getSegmentMetadataMap(primarySegmentInfos); } List ids = new ArrayList<>(); for (IndexShard replica : replicaShards) { @@ -1267,12 +1268,11 @@ public final List replicateSegments( public void onReplicationDone(SegmentReplicationState state) { try (final GatedCloseable snapshot = replica.getSegmentInfosSnapshot()) { final SegmentInfos replicaInfos = snapshot.get(); - final Store.MetadataSnapshot replicaMetadata = replica.store().getMetadata(replicaInfos); - final Store.RecoveryDiff recoveryDiff = primaryMetadata.recoveryDiff(replicaMetadata); + final Map replicaMetadata = replica.store().getSegmentMetadataMap(replicaInfos); + final Store.RecoveryDiff recoveryDiff = Store.segmentReplicationDiff(primaryMetadata, replicaMetadata); assertTrue(recoveryDiff.missing.isEmpty()); assertTrue(recoveryDiff.different.isEmpty()); assertEquals(recoveryDiff.identical.size(), primaryMetadata.size()); - assertEquals(primaryMetadata.getCommitUserData(), replicaMetadata.getCommitUserData()); } catch (Exception e) { throw ExceptionsHelper.convertToRuntime(e); } finally {