diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 0e915577dc467..efc522a1f9741 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -764,7 +764,7 @@ public void testShardChangesWithDefaultDocType() throws Exception { } IndexShard shard = indexService.getShard(0); try ( - Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true); + Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean()); Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot() ) { List opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true); diff --git a/server/src/main/java/org/opensearch/index/engine/Engine.java b/server/src/main/java/org/opensearch/index/engine/Engine.java index 7cf7b3245c0e5..825d71d6d1024 100644 --- a/server/src/main/java/org/opensearch/index/engine/Engine.java +++ b/server/src/main/java/org/opensearch/index/engine/Engine.java @@ -735,8 +735,22 @@ public enum SearcherScope { * Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive). * This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}. */ - public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) - throws IOException; + public abstract Translog.Snapshot newChangesSnapshot( + String source, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) throws IOException; + + /** + * Counts the number of history operations in the given sequence number range + * @param source source of the request + * @param fromSeqNo from sequence number; included + * @param toSeqNumber to sequence number; included + * @return number of history operations + */ + public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException; public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo); diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index 438bb0b290b9c..649958ff0dcd9 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2772,7 +2772,13 @@ long getNumDocUpdates() { } @Override - public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { + public Translog.Snapshot newChangesSnapshot( + String source, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) throws IOException { ensureOpen(); refreshIfNeeded(source, toSeqNo); Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL); @@ -2782,7 +2788,8 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long LuceneChangesSnapshot.DEFAULT_BATCH_SIZE, fromSeqNo, toSeqNo, - requiredFullRange + requiredFullRange, + accurateCount ); searcher = null; return snapshot; @@ -2798,6 +2805,21 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long } } + public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException { + ensureOpen(); + refreshIfNeeded(source, toSeqNo); + try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) { + return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo); + } catch (Exception e) { + try { + maybeFailEngine(source, e); + } catch (Exception innerException) { + e.addSuppressed(innerException); + } + throw e; + } + } + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { return getMinRetainedSeqNo() <= startingSeqNo; } diff --git a/server/src/main/java/org/opensearch/index/engine/LuceneChangesSnapshot.java b/server/src/main/java/org/opensearch/index/engine/LuceneChangesSnapshot.java index d640cf1468ec3..ae1dc9e647073 100644 --- a/server/src/main/java/org/opensearch/index/engine/LuceneChangesSnapshot.java +++ b/server/src/main/java/org/opensearch/index/engine/LuceneChangesSnapshot.java @@ -38,16 +38,19 @@ import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.search.BooleanClause; import org.apache.lucene.search.BooleanQuery; -import org.apache.lucene.search.DocValuesFieldExistsQuery; +import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.apache.lucene.search.ScoreDoc; import org.apache.lucene.search.Sort; import org.apache.lucene.search.SortField; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.search.TopFieldCollector; import org.apache.lucene.util.ArrayUtil; +import org.opensearch.Version; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.lucene.Lucene; +import org.opensearch.common.lucene.search.Queries; import org.opensearch.core.internal.io.IOUtils; import org.opensearch.index.fieldvisitor.FieldsVisitor; import org.opensearch.index.mapper.SeqNoFieldMapper; @@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { * @param toSeqNo the maximum requesting seq# - inclusive * @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo */ - LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange) - throws IOException { + LuceneChangesSnapshot( + Engine.Searcher engineSearcher, + int searchBatchSize, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) throws IOException { if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) { throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]"); } @@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot { this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader())); this.indexSearcher.setQueryCache(null); this.parallelArray = new ParallelArray(this.searchBatchSize); - final TopDocs topDocs = searchOperations(null); + final TopDocs topDocs = searchOperations(null, accurateCount); this.totalHits = Math.toIntExact(topDocs.totalHits.value); this.scoreDocs = topDocs.scoreDocs; fillParallelArray(scoreDocs, parallelArray); @@ -187,7 +196,7 @@ private int nextDocIndex() throws IOException { // we have processed all docs in the current search - fetch the next batch if (docIndex == scoreDocs.length && docIndex > 0) { final ScoreDoc prev = scoreDocs[scoreDocs.length - 1]; - scoreDocs = searchOperations(prev).scoreDocs; + scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs; fillParallelArray(scoreDocs, parallelArray); docIndex = 0; } @@ -236,16 +245,31 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray } } - private TopDocs searchOperations(ScoreDoc after) throws IOException { - final Query rangeQuery = new BooleanQuery.Builder().add( - LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo), - BooleanClause.Occur.MUST - ) - // exclude non-root nested documents - .add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST) + private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) { + return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST) + .add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs .build(); + } + + static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException { + if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) { + throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]"); + } + IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader())); + return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo)); + } + + private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException { + final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo); final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG)); - return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo); + final TopFieldCollector topFieldCollector = TopFieldCollector.create( + sortedBySeqNo, + searchBatchSize, + after, + accurate ? Integer.MAX_VALUE : 0 + ); + indexSearcher.search(rangeQuery, topFieldCollector); + return topFieldCollector.topDocs(); } private Translog.Operation readDocAsOp(int docIndex) throws IOException { diff --git a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java index 32d6b9b98d169..43fe10c217270 100644 --- a/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/ReadOnlyEngine.java @@ -325,10 +325,23 @@ public Closeable acquireHistoryRetentionLock() { } @Override - public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) { + public Translog.Snapshot newChangesSnapshot( + String source, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) { return newEmptySnapshot(); } + @Override + public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException { + try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) { + return snapshot.totalOperations(); + } + } + public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) { // we can do operation-based recovery if we don't have to replay any operation. return startingSeqNo > seqNoStats.getMaxSeqNo(); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ad370051c53ac..948405f73dbd0 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2230,16 +2230,6 @@ public Closeable acquireHistoryRetentionLock() { return getEngine().acquireHistoryRetentionLock(); } - /** - * - * Creates a new history snapshot for reading operations since - * the provided starting seqno (inclusive) and ending seqno (inclusive) - * The returned snapshot can be retrieved from either Lucene index or translog files. - */ - public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException { - return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true); - } - /** * Checks if we have a completed history of operations since the given starting seqno (inclusive). * This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()} @@ -2257,6 +2247,17 @@ public long getMinRetainedSeqNo() { return getEngine().getMinRetainedSeqNo(); } + /** + * Counts the number of history operations within the provided sequence numbers + * @param source source of the requester (e.g., peer-recovery) + * @param fromSeqNo from sequence number, included + * @param toSeqNo to sequence number, included + * @return number of history operations in the sequence number range + */ + public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException { + return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo); + } + /** * Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive) * and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading. @@ -2268,8 +2269,14 @@ public long getMinRetainedSeqNo() { * if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing. * This parameter should be only enabled when the entire requesting range is below the global checkpoint. */ - public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { - return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange); + public Translog.Snapshot newChangesSnapshot( + String source, + long fromSeqNo, + long toSeqNo, + boolean requiredFullRange, + boolean accurateCount + ) throws IOException { + return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount); } public List segments(boolean verbose) { diff --git a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java index bbdf948af5c32..726d2925177fa 100644 --- a/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java +++ b/server/src/main/java/org/opensearch/index/shard/PrimaryReplicaSyncer.java @@ -104,7 +104,7 @@ public void resync(final IndexShard indexShard, final ActionListener // Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender. // Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible // Also fail the resync early if the shard is shutting down - snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false); + snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true); final Translog.Snapshot originalSnapshot = snapshot; final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() { @Override diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java index 7899b11330a34..77596f50a8a5e 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySourceHandler.java @@ -132,7 +132,7 @@ public class RecoverySourceHandler { private final CancellableThreads cancellableThreads = new CancellableThreads(); private final List resources = new CopyOnWriteArrayList<>(); private final ListenableFuture future = new ListenableFuture<>(); - private static final String PEER_RECOVERY_NAME = "peer-recovery"; + public static final String PEER_RECOVERY_NAME = "peer-recovery"; public RecoverySourceHandler( IndexShard shard, @@ -272,7 +272,7 @@ && isTargetSameHistory() logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo); try { - final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo); + final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo); final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> { @@ -319,7 +319,7 @@ && isTargetSameHistory() sendFileStep.whenComplete(r -> { assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]"); // For a sequence based recovery, the target can keep its local translog - prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); + prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep); }, onFailure); prepareEngineStep.whenComplete(prepareEngineTime -> { @@ -340,9 +340,15 @@ && isTargetSameHistory() final long endingSeqNo = shard.seqNoStats().getMaxSeqNo(); if (logger.isTraceEnabled()) { - logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo)); + logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo)); } - final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false); + final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot( + PEER_RECOVERY_NAME, + startingSeqNo, + Long.MAX_VALUE, + false, + true + ); resources.add(phase2Snapshot); retentionLock.close(); @@ -403,10 +409,13 @@ private boolean isTargetSameHistory() { return targetHistoryUUID.equals(shard.getHistoryUUID()); } - private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException { - try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) { - return snapshot.totalOperations(); - } + /** + * Counts the number of history operations from the starting sequence number + * @param startingSeqNo the starting sequence number to count; included + * @return number of history operations + */ + private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException { + return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE); } static void runUnderPrimaryPermit( diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 3ea7cad528e82..394b093059385 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe private boolean hasUncommittedOperations() throws IOException { long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)); - try ( - Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false) - ) { - return snapshot.totalOperations() > 0; - } + return indexShard.countNumberOfHistoryOperations( + RecoverySourceHandler.PEER_RECOVERY_NAME, + localCheckpointOfCommit + 1, + Long.MAX_VALUE + ) > 0; } @Override diff --git a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java index af9b913b11d56..33f09a3e67db8 100644 --- a/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java +++ b/server/src/test/java/org/opensearch/index/engine/InternalEngineTests.java @@ -6362,8 +6362,12 @@ public void onFailure(Exception e) { @Override protected void doRun() throws Exception { latch.await(); - Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true); - changes.close(); + if (randomBoolean()) { + Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean()); + changes.close(); + } else { + engine.countNumberOfHistoryOperations("test", min, max); + } } }); snapshotThreads[i].start(); diff --git a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java index bd191e235369d..e3117e179e7fa 100644 --- a/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java +++ b/server/src/test/java/org/opensearch/index/engine/LuceneChangesSnapshotTests.java @@ -74,14 +74,14 @@ public void testBasics() throws Exception { long fromSeqNo = randomNonNegativeLong(); long toSeqNo = randomLongBetween(fromSeqNo, Long.MAX_VALUE); // Empty engine - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), containsString("Not all operations between from_seqno [" + fromSeqNo + "] and to_seqno [" + toSeqNo + "] found") ); } - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, false, randomBoolean())) { assertThat(snapshot, SnapshotMatchers.size(0)); } int numOps = between(1, 100); @@ -114,7 +114,8 @@ public void testBasics() throws Exception { between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, - false + false, + randomBoolean() ) ) { searcher = null; @@ -130,7 +131,8 @@ public void testBasics() throws Exception { between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, - true + true, + randomBoolean() ) ) { searcher = null; @@ -152,7 +154,8 @@ public void testBasics() throws Exception { between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, - false + false, + randomBoolean() ) ) { searcher = null; @@ -167,7 +170,8 @@ public void testBasics() throws Exception { between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, - true + true, + randomBoolean() ) ) { searcher = null; @@ -187,7 +191,8 @@ public void testBasics() throws Exception { between(1, LuceneChangesSnapshot.DEFAULT_BATCH_SIZE), fromSeqNo, toSeqNo, - true + true, + randomBoolean() ) ) { searcher = null; @@ -199,7 +204,7 @@ public void testBasics() throws Exception { // Get snapshot via engine will auto refresh fromSeqNo = randomLongBetween(0, numOps - 1); toSeqNo = randomLongBetween(fromSeqNo, numOps - 1); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, randomBoolean())) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, randomBoolean(), randomBoolean())) { assertThat(snapshot, SnapshotMatchers.containsSeqNoRange(fromSeqNo, toSeqNo)); } } @@ -230,8 +235,11 @@ public void testSkipNonRootOfNestedDocuments() throws Exception { long maxSeqNo = engine.getLocalCheckpointTracker().getMaxSeqNo(); engine.refresh("test"); Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL); - try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, between(1, 100), 0, maxSeqNo, false)) { - assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); + final boolean accurateCount = randomBoolean(); + try (Translog.Snapshot snapshot = new LuceneChangesSnapshot(searcher, between(1, 100), 0, maxSeqNo, false, accurateCount)) { + if (accurateCount == true) { + assertThat(snapshot.totalOperations(), equalTo(seqNoToTerm.size())); + } Translog.Operation op; while ((op = snapshot.next()) != null) { assertThat(op.toString(), op.primaryTerm(), equalTo(seqNoToTerm.get(op.seqNo()))); @@ -306,7 +314,7 @@ void pullOperations(InternalEngine follower) throws IOException { long fromSeqNo = followerCheckpoint + 1; long batchSize = randomLongBetween(0, 100); long toSeqNo = Math.min(fromSeqNo + batchSize, leaderCheckpoint); - try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = leader.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) { translogHandler.run(follower, snapshot); } } @@ -352,7 +360,7 @@ private List drainAll(Translog.Snapshot snapshot) throws IOE public void testOverFlow() throws Exception { long fromSeqNo = randomLongBetween(0, 5); long toSeqNo = randomLongBetween(Long.MAX_VALUE - 5, Long.MAX_VALUE); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", fromSeqNo, toSeqNo, true, randomBoolean())) { IllegalStateException error = expectThrows(IllegalStateException.class, () -> drainAll(snapshot)); assertThat( error.getMessage(), diff --git a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java index a88db8473cae0..d262b5abec0f3 100644 --- a/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/IndexLevelReplicationTests.java @@ -499,7 +499,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { assertThat(snapshot.totalOperations(), equalTo(0)); } } - try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } @@ -517,7 +517,7 @@ protected EngineFactory getEngineFactory(ShardRouting routing) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(Collections.singletonList(noop2))); } } - try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = shard.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) { assertThat(snapshot, SnapshotMatchers.containsOperationsInAnyOrder(expectedTranslogOps)); } } @@ -619,7 +619,7 @@ public void testSeqNoCollision() throws Exception { shards.promoteReplicaToPrimary(replica2).get(); logger.info("--> Recover replica3 from replica2"); recoverReplica(replica3, replica2, true); - try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = replica3.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, true)) { assertThat(snapshot.totalOperations(), equalTo(initDocs + 1)); final List expectedOps = new ArrayList<>(initOperations); expectedOps.add(op2); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index c714bd0eb85a2..5e09e0f2253df 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -225,7 +225,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { IndexShard newReplica = shards.addReplicaWithExistingPath(orgPrimary.shardPath(), orgPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); shards.assertAllEqual(3); - try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = newReplica.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) { assertThat(snapshot, SnapshotMatchers.size(6)); } } diff --git a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java index fe810a87358d0..2bce5a7c81794 100644 --- a/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/engine/EngineTestCase.java @@ -1312,7 +1312,7 @@ public static List getDocIds(Engine engine, boolean refresh */ public static List readAllOperationsInLucene(Engine engine) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op); @@ -1326,7 +1326,7 @@ public static List readAllOperationsInLucene(Engine engine) */ public static List readAllOperationsBasedOnSource(Engine engine) throws IOException { final List operations = new ArrayList<>(); - try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false)) { + try (Translog.Snapshot snapshot = engine.newChangesSnapshot("test", 0, Long.MAX_VALUE, false, randomBoolean())) { Translog.Operation op; while ((op = snapshot.next()) != null) { operations.add(op);