From bd6484a2018ba17bb6ea6c4c542c9eec79da5257 Mon Sep 17 00:00:00 2001 From: Marc Handalian Date: Thu, 2 Feb 2023 11:33:26 -0700 Subject: [PATCH] Update replicationCheckpoint to include accurate seqNo from SegmentInfos. (#6122) (#6156) * Update replicationCheckpoint to include accurate seqNo from provided SegmentInfos. This change updates IndexShard#getlatestSegmentInfos to include the latest max seqNo from the primary's segmentInfos snapshot. It also updates the method to return a Tuple so that both can be fetched. This change also updates replicas to not bump their SegmentInfos version when performing a commit. Signed-off-by: Marc Handalian * Add missing test to StoreTests verifying version and userData on commitSegmentInfos. Signed-off-by: Marc Handalian --------- Signed-off-by: Marc Handalian (cherry picked from commit c5a1bdfbfa2f2baab6f100cccd47db2e709e733a) --- CHANGELOG.md | 1 + .../opensearch/index/shard/IndexShard.java | 49 +++++++++++++---- .../org/opensearch/index/store/Store.java | 2 +- .../indices/replication/common/CopyState.java | 13 ++--- .../SegmentReplicationIndexShardTests.java | 55 +++++++++++++++++++ .../opensearch/index/store/StoreTests.java | 25 +++++++++ .../replication/common/CopyStateTests.java | 23 +++++--- 7 files changed, 138 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5345ee66e58e..564b9c77dde43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Fixed - [Segment Replication] Fix for peer recovery ([#5344](https://github.com/opensearch-project/OpenSearch/pull/5344)) +- [Segment Replication] Fix bug where inaccurate sequence numbers are sent during replication ([#6122](https://github.com/opensearch-project/OpenSearch/pull/6122)) ### Security diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index de2cef415559e..8ee5367830877 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1444,31 +1444,56 @@ public GatedCloseable acquireSafeIndexCommit() throws EngineExcepti } /** - * Returns the latest ReplicationCheckpoint that shard received. + * Compute and return the latest ReplicationCheckpoint for a particular shard. * @return EMPTY checkpoint before the engine is opened and null for non-segrep enabled indices */ public ReplicationCheckpoint getLatestReplicationCheckpoint() { + final Tuple, ReplicationCheckpoint> infosAndCheckpoint = getLatestSegmentInfosAndCheckpoint(); + if (infosAndCheckpoint == null) { + return null; + } + try (final GatedCloseable ignored = infosAndCheckpoint.v1()) { + return infosAndCheckpoint.v2(); + } catch (IOException e) { + throw new OpenSearchException("Error Closing SegmentInfos Snapshot", e); + } + } + + /** + * Compute and return the latest ReplicationCheckpoint for a shard and a GatedCloseable containing the corresponding SegmentInfos. + * The segments referenced by the SegmentInfos will remain on disk until the GatedCloseable is closed. + * + * Primary shards compute the seqNo used in the replication checkpoint from the fetched SegmentInfos. + * Replica shards compute the seqNo from its latest processed checkpoint, which only increases when refreshing on new segments. + * + * @return A {@link Tuple} containing SegmentInfos wrapped in a {@link GatedCloseable} and the {@link ReplicationCheckpoint} computed from the infos. + * + */ + public Tuple, ReplicationCheckpoint> getLatestSegmentInfosAndCheckpoint() { if (indexSettings.isSegRepEnabled() == false) { return null; } if (getEngineOrNull() == null) { - return ReplicationCheckpoint.empty(shardId); + return new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId)); } - try (final GatedCloseable snapshot = getSegmentInfosSnapshot()) { - return Optional.ofNullable(snapshot.get()) - .map( - segmentInfos -> new ReplicationCheckpoint( + // do not close the snapshot - caller will close it. + final GatedCloseable snapshot = getSegmentInfosSnapshot(); + return Optional.ofNullable(snapshot.get()).map(segmentInfos -> { + try { + return new Tuple<>( + snapshot, + new ReplicationCheckpoint( this.shardId, getOperationPrimaryTerm(), segmentInfos.getGeneration(), - getProcessedLocalCheckpoint(), + shardRouting.primary() ? getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos) : getProcessedLocalCheckpoint(), segmentInfos.getVersion() ) - ) - .orElse(ReplicationCheckpoint.empty(shardId)); - } catch (IOException ex) { - throw new OpenSearchException("Error Closing SegmentInfos Snapshot", ex); - } + ); + } catch (IOException e) { + throw new OpenSearchException("Error Fetching SegmentInfos and latest checkpoint", e); + } + }).orElseGet(() -> new Tuple<>(new GatedCloseable<>(null, () -> {}), ReplicationCheckpoint.empty(shardId))); } /** diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index de12d9b75dc12..69de85cd23820 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -899,7 +899,7 @@ public void commitSegmentInfos(SegmentInfos latestSegmentInfos, long maxSeqNo, l final Map userData = new HashMap<>(latestSegmentInfos.getUserData()); userData.put(LOCAL_CHECKPOINT_KEY, String.valueOf(processedCheckpoint)); userData.put(SequenceNumbers.MAX_SEQ_NO, Long.toString(maxSeqNo)); - latestSegmentInfos.setUserData(userData, true); + latestSegmentInfos.setUserData(userData, false); latestSegmentInfos.commit(directory()); directory.sync(latestSegmentInfos.files(true)); directory.syncMetaData(); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java index 1dd0886fd2f36..a6aa39e7cb074 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/CopyState.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.ByteBuffersDataOutput; import org.apache.lucene.store.ByteBuffersIndexOutput; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.index.shard.IndexShard; @@ -44,16 +45,12 @@ public CopyState(ReplicationCheckpoint requestedReplicationCheckpoint, IndexShar super("CopyState-" + shard.shardId()); this.requestedReplicationCheckpoint = requestedReplicationCheckpoint; this.shard = shard; - this.segmentInfosRef = shard.getSegmentInfosSnapshot(); + final Tuple, ReplicationCheckpoint> latestSegmentInfosAndCheckpoint = shard + .getLatestSegmentInfosAndCheckpoint(); + this.segmentInfosRef = latestSegmentInfosAndCheckpoint.v1(); + this.replicationCheckpoint = latestSegmentInfosAndCheckpoint.v2(); SegmentInfos segmentInfos = this.segmentInfosRef.get(); this.metadataMap = shard.store().getSegmentMetadataMap(segmentInfos); - this.replicationCheckpoint = new ReplicationCheckpoint( - shard.shardId(), - shard.getOperationPrimaryTerm(), - segmentInfos.getGeneration(), - shard.getProcessedLocalCheckpoint(), - segmentInfos.getVersion() - ); this.commitRef = shard.acquireLastIndexCommit(false); ByteBuffersDataOutput buffer = new ByteBuffersDataOutput(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 31cf2f84ddf01..1dcfabda4d92d 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -19,6 +19,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingHelper; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.ClusterSettings; @@ -104,6 +105,60 @@ public void testReplicationCheckpointNotNullForSegRep() throws IOException { closeShards(indexShard); } + public void testSegmentInfosAndReplicationCheckpointTuple() throws Exception { + try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) { + shards.startAll(); + final IndexShard primary = shards.getPrimary(); + final IndexShard replica = shards.getReplicas().get(0); + + // assert before any indexing: + // replica: + Tuple, ReplicationCheckpoint> replicaTuple = replica.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable gatedCloseable = replicaTuple.v1()) { + assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2()); + } + + // primary: + Tuple, ReplicationCheckpoint> primaryTuple = primary.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable gatedCloseable = primaryTuple.v1()) { + assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2()); + } + // We use compareTo here instead of equals because we ignore segments gen with replicas performing their own commits. + // However infos version we expect to be equal. + assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint())); + + // index and copy segments to replica. + int numDocs = randomIntBetween(10, 100); + shards.indexDocs(numDocs); + primary.refresh("test"); + replicateSegments(primary, List.of(replica)); + + replicaTuple = replica.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable gatedCloseable = replicaTuple.v1()) { + assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2()); + } + + primaryTuple = primary.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable gatedCloseable = primaryTuple.v1()) { + assertReplicationCheckpoint(primary, gatedCloseable.get(), primaryTuple.v2()); + } + + replicaTuple = replica.getLatestSegmentInfosAndCheckpoint(); + try (final GatedCloseable gatedCloseable = replicaTuple.v1()) { + assertReplicationCheckpoint(replica, gatedCloseable.get(), replicaTuple.v2()); + } + assertEquals(1, primary.getLatestReplicationCheckpoint().compareTo(replica.getLatestReplicationCheckpoint())); + } + } + + private void assertReplicationCheckpoint(IndexShard shard, SegmentInfos segmentInfos, ReplicationCheckpoint checkpoint) + throws IOException { + assertNotNull(segmentInfos); + assertEquals(checkpoint.getSeqNo(), shard.getEngine().getMaxSeqNoFromSegmentInfos(segmentInfos)); + assertEquals(checkpoint.getSegmentInfosVersion(), segmentInfos.getVersion()); + assertEquals(checkpoint.getSegmentsGen(), segmentInfos.getGeneration()); + } + public void testIsSegmentReplicationAllowed_WrongEngineType() throws IOException { final IndexShard indexShard = newShard(false, settings, new InternalEngineFactory()); assertFalse(indexShard.isSegmentReplicationAllowed()); diff --git a/server/src/test/java/org/opensearch/index/store/StoreTests.java b/server/src/test/java/org/opensearch/index/store/StoreTests.java index 7f5340096ab86..3bcc244bf7fc5 100644 --- a/server/src/test/java/org/opensearch/index/store/StoreTests.java +++ b/server/src/test/java/org/opensearch/index/store/StoreTests.java @@ -83,6 +83,7 @@ import org.opensearch.index.engine.Engine; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; +import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.index.shard.ShardId; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; @@ -120,6 +121,7 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; +import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY; import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION; import static org.opensearch.test.VersionUtils.randomVersion; @@ -1305,6 +1307,29 @@ public void testReadSegmentsFromOldIndicesFailure() throws IOException { store.close(); } + public void testCommitSegmentInfos() throws IOException { + final ShardId shardId = new ShardId("index", "_na_", 1); + Store store = new Store( + shardId, + SEGMENT_REPLICATION_INDEX_SETTINGS, + StoreTests.newDirectory(random()), + new DummyShardLock(shardId) + ); + commitRandomDocs(store); + final SegmentInfos lastCommittedInfos = store.readLastCommittedSegmentsInfo(); + final long expectedLocalCheckpoint = 1; + final long expectedMaxSeqNo = 2; + store.commitSegmentInfos(lastCommittedInfos, expectedMaxSeqNo, expectedLocalCheckpoint); + + final SegmentInfos updatedInfos = store.readLastCommittedSegmentsInfo(); + assertEquals(lastCommittedInfos.getVersion(), updatedInfos.getVersion()); + final Map userData = updatedInfos.getUserData(); + assertEquals(expectedLocalCheckpoint, Long.parseLong(userData.get(LOCAL_CHECKPOINT_KEY))); + assertEquals(expectedMaxSeqNo, Long.parseLong(userData.get(SequenceNumbers.MAX_SEQ_NO))); + deleteContent(store.directory()); + IOUtils.close(store); + } + private void commitRandomDocs(Store store) throws IOException { IndexWriter writer = indexRandomDocs(store); writer.commit(); diff --git a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java index 77a4a6d22039e..6f2be9db6b2dd 100644 --- a/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/common/CopyStateTests.java @@ -13,6 +13,7 @@ import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.util.Version; import org.opensearch.common.collect.Map; +import org.opensearch.common.collect.Tuple; import org.opensearch.common.concurrent.GatedCloseable; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardTestCase; @@ -48,14 +49,7 @@ public class CopyStateTests extends IndexShardTestCase { public void testCopyStateCreation() throws IOException { final IndexShard mockIndexShard = createMockIndexShard(); - ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( - mockIndexShard.shardId(), - mockIndexShard.getOperationPrimaryTerm(), - 0L, - mockIndexShard.getProcessedLocalCheckpoint(), - 0L - ); - CopyState copyState = new CopyState(testCheckpoint, mockIndexShard); + CopyState copyState = new CopyState(ReplicationCheckpoint.empty(mockIndexShard.shardId()), mockIndexShard); ReplicationCheckpoint checkpoint = copyState.getCheckpoint(); assertEquals(TEST_SHARD_ID, checkpoint.getShardId()); // version was never set so this should be zero @@ -73,7 +67,18 @@ public static IndexShard createMockIndexShard() throws IOException { when(mockShard.store()).thenReturn(mockStore); SegmentInfos testSegmentInfos = new SegmentInfos(Version.LATEST.major); - when(mockShard.getSegmentInfosSnapshot()).thenReturn(new GatedCloseable<>(testSegmentInfos, () -> {})); + ReplicationCheckpoint testCheckpoint = new ReplicationCheckpoint( + mockShard.shardId(), + mockShard.getOperationPrimaryTerm(), + 0L, + mockShard.getProcessedLocalCheckpoint(), + 0L + ); + final Tuple, ReplicationCheckpoint> gatedCloseableReplicationCheckpointTuple = new Tuple<>( + new GatedCloseable<>(testSegmentInfos, () -> {}), + testCheckpoint + ); + when(mockShard.getLatestSegmentInfosAndCheckpoint()).thenReturn(gatedCloseableReplicationCheckpointTuple); when(mockStore.getSegmentMetadataMap(testSegmentInfos)).thenReturn(SI_SNAPSHOT.asMap()); IndexCommit mockIndexCommit = mock(IndexCommit.class);