Skip to content

Commit

Permalink
[Refactor] LuceneChangesSnapshot to use accurate ops history
Browse files Browse the repository at this point in the history
Improves the LuceneChangesSnapshot to get an accurate count of recovery
operations using sort by sequence number optimization.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
  • Loading branch information
nknize committed Mar 14, 2022
1 parent bdcaec5 commit 1c67d0b
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public void testShardChangesWithDefaultDocType() throws Exception {
}
IndexShard shard = indexService.getShard(0);
try (
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true);
Translog.Snapshot luceneSnapshot = shard.newChangesSnapshot("test", 0, numOps - 1, true, randomBoolean());
Translog.Snapshot translogSnapshot = getTranslog(shard).newSnapshot()
) {
List<Translog.Operation> opsFromLucene = TestTranslog.drainSnapshot(luceneSnapshot, true);
Expand Down
18 changes: 16 additions & 2 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -735,8 +735,22 @@ public enum SearcherScope {
* Creates a new history snapshot from Lucene for reading operations whose seqno in the requesting seqno range (both inclusive).
* This feature requires soft-deletes enabled. If soft-deletes are disabled, this method will throw an {@link IllegalStateException}.
*/
public abstract Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException;
public abstract Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException;

/**
* Counts the number of history operations in the given sequence number range
* @param source source of the request
* @param fromSeqNo from sequence number; included
* @param toSeqNumber to sequence number; included
* @return number of history operations
*/
public abstract int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNumber) throws IOException;

public abstract boolean hasCompleteOperationHistory(String reason, long startingSeqNo);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2772,7 +2772,13 @@ long getNumDocUpdates() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
Expand All @@ -2782,7 +2788,8 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
LuceneChangesSnapshot.DEFAULT_BATCH_SIZE,
fromSeqNo,
toSeqNo,
requiredFullRange
requiredFullRange,
accurateCount
);
searcher = null;
return snapshot;
Expand All @@ -2798,6 +2805,21 @@ public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long
}
}

public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
ensureOpen();
refreshIfNeeded(source, toSeqNo);
try (Searcher s = acquireSearcher(source, SearcherScope.INTERNAL)) {
return LuceneChangesSnapshot.countNumberOfHistoryOperations(s, fromSeqNo, toSeqNo);
} catch (Exception e) {
try {
maybeFailEngine(source, e);
} catch (Exception innerException) {
e.addSuppressed(innerException);
}
throw e;
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
return getMinRetainedSeqNo() <= startingSeqNo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldCollector;
import org.apache.lucene.util.ArrayUtil;
import org.opensearch.Version;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.fieldvisitor.FieldsVisitor;
import org.opensearch.index.mapper.SeqNoFieldMapper;
Expand Down Expand Up @@ -88,8 +91,14 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
* @param toSeqNo the maximum requesting seq# - inclusive
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
*/
LuceneChangesSnapshot(Engine.Searcher engineSearcher, int searchBatchSize, long fromSeqNo, long toSeqNo, boolean requiredFullRange)
throws IOException {
LuceneChangesSnapshot(
Engine.Searcher engineSearcher,
int searchBatchSize,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
}
Expand All @@ -111,7 +120,7 @@ final class LuceneChangesSnapshot implements Translog.Snapshot {
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
this.indexSearcher.setQueryCache(null);
this.parallelArray = new ParallelArray(this.searchBatchSize);
final TopDocs topDocs = searchOperations(null);
final TopDocs topDocs = searchOperations(null, accurateCount);
this.totalHits = Math.toIntExact(topDocs.totalHits.value);
this.scoreDocs = topDocs.scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
Expand Down Expand Up @@ -187,7 +196,7 @@ private int nextDocIndex() throws IOException {
// we have processed all docs in the current search - fetch the next batch
if (docIndex == scoreDocs.length && docIndex > 0) {
final ScoreDoc prev = scoreDocs[scoreDocs.length - 1];
scoreDocs = searchOperations(prev).scoreDocs;
scoreDocs = searchOperations((FieldDoc) prev, false).scoreDocs;
fillParallelArray(scoreDocs, parallelArray);
docIndex = 0;
}
Expand Down Expand Up @@ -236,16 +245,31 @@ private void fillParallelArray(ScoreDoc[] scoreDocs, ParallelArray parallelArray
}
}

private TopDocs searchOperations(ScoreDoc after) throws IOException {
final Query rangeQuery = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo),
BooleanClause.Occur.MUST
)
// exclude non-root nested documents
.add(new DocValuesFieldExistsQuery(SeqNoFieldMapper.PRIMARY_TERM_NAME), BooleanClause.Occur.MUST)
private static Query operationsRangeQuery(long fromSeqNo, long toSeqNo) {
return new BooleanQuery.Builder().add(LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo), BooleanClause.Occur.MUST)
.add(Queries.newNonNestedFilter(Version.CURRENT), BooleanClause.Occur.MUST) // exclude non-root nested docs
.build();
}

static int countNumberOfHistoryOperations(Engine.Searcher searcher, long fromSeqNo, long toSeqNo) throws IOException {
if (fromSeqNo > toSeqNo || fromSeqNo < 0 || toSeqNo < 0) {
throw new IllegalArgumentException("Invalid sequence range; fromSeqNo [" + fromSeqNo + "] toSeqNo [" + toSeqNo + "]");
}
IndexSearcher indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(searcher.getDirectoryReader()));
return indexSearcher.count(operationsRangeQuery(fromSeqNo, toSeqNo));
}

private TopDocs searchOperations(FieldDoc after, boolean accurate) throws IOException {
final Query rangeQuery = operationsRangeQuery(Math.max(fromSeqNo, lastSeenSeqNo), toSeqNo);
final Sort sortedBySeqNo = new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.LONG));
return indexSearcher.searchAfter(after, rangeQuery, searchBatchSize, sortedBySeqNo);
final TopFieldCollector topFieldCollector = TopFieldCollector.create(
sortedBySeqNo,
searchBatchSize,
after,
accurate ? Integer.MAX_VALUE : 0
);
indexSearcher.search(rangeQuery, topFieldCollector);
return topFieldCollector.topDocs();
}

private Translog.Operation readDocAsOp(int docIndex) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,23 @@ public Closeable acquireHistoryRetentionLock() {
}

@Override
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) {
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) {
return newEmptySnapshot();
}

@Override
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
try (Translog.Snapshot snapshot = newChangesSnapshot(source, fromSeqNo, toSeqNo, false, true)) {
return snapshot.totalOperations();
}
}

public boolean hasCompleteOperationHistory(String reason, long startingSeqNo) {
// we can do operation-based recovery if we don't have to replay any operation.
return startingSeqNo > seqNoStats.getMaxSeqNo();
Expand Down
31 changes: 19 additions & 12 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -2230,16 +2230,6 @@ public Closeable acquireHistoryRetentionLock() {
return getEngine().acquireHistoryRetentionLock();
}

/**
*
* Creates a new history snapshot for reading operations since
* the provided starting seqno (inclusive) and ending seqno (inclusive)
* The returned snapshot can be retrieved from either Lucene index or translog files.
*/
public Translog.Snapshot getHistoryOperations(String reason, long startingSeqNo, long endSeqNo) throws IOException {
return getEngine().newChangesSnapshot(reason, startingSeqNo, endSeqNo, true);
}

/**
* Checks if we have a completed history of operations since the given starting seqno (inclusive).
* This method should be called after acquiring the retention lock; See {@link #acquireHistoryRetentionLock()}
Expand All @@ -2257,6 +2247,17 @@ public long getMinRetainedSeqNo() {
return getEngine().getMinRetainedSeqNo();
}

/**
* Counts the number of history operations within the provided sequence numbers
* @param source source of the requester (e.g., peer-recovery)
* @param fromSeqNo from sequence number, included
* @param toSeqNo to sequence number, included
* @return number of history operations in the sequence number range
*/
public int countNumberOfHistoryOperations(String source, long fromSeqNo, long toSeqNo) throws IOException {
return getEngine().countNumberOfHistoryOperations(source, fromSeqNo, toSeqNo);
}

/**
* Creates a new changes snapshot for reading operations whose seq_no are between {@code fromSeqNo}(inclusive)
* and {@code toSeqNo}(inclusive). The caller has to close the returned snapshot after finishing the reading.
Expand All @@ -2268,8 +2269,14 @@ public long getMinRetainedSeqNo() {
* if any operation between {@code fromSeqNo} and {@code toSeqNo} is missing.
* This parameter should be only enabled when the entire requesting range is below the global checkpoint.
*/
public Translog.Snapshot newChangesSnapshot(String source, long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange);
public Translog.Snapshot newChangesSnapshot(
String source,
long fromSeqNo,
long toSeqNo,
boolean requiredFullRange,
boolean accurateCount
) throws IOException {
return getEngine().newChangesSnapshot(source, fromSeqNo, toSeqNo, requiredFullRange, accurateCount);
}

public List<Segment> segments(boolean verbose) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void resync(final IndexShard indexShard, final ActionListener<ResyncTask>
// Wrap translog snapshot to make it synchronized as it is accessed by different threads through SnapshotSender.
// Even though those calls are not concurrent, snapshot.next() uses non-synchronized state and is not multi-thread-compatible
// Also fail the resync early if the shard is shutting down
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false);
snapshot = indexShard.newChangesSnapshot("resync", startingSeqNo, Long.MAX_VALUE, false, true);
final Translog.Snapshot originalSnapshot = snapshot;
final Translog.Snapshot wrappedSnapshot = new Translog.Snapshot() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public class RecoverySourceHandler {
private final CancellableThreads cancellableThreads = new CancellableThreads();
private final List<Closeable> resources = new CopyOnWriteArrayList<>();
private final ListenableFuture<RecoveryResponse> future = new ListenableFuture<>();
private static final String PEER_RECOVERY_NAME = "peer-recovery";
public static final String PEER_RECOVERY_NAME = "peer-recovery";

public RecoverySourceHandler(
IndexShard shard,
Expand Down Expand Up @@ -272,7 +272,7 @@ && isTargetSameHistory()
logger.trace("performing file-based recovery followed by history replay starting at [{}]", startingSeqNo);

try {
final int estimateNumOps = estimateNumberOfHistoryOperations(startingSeqNo);
final int estimateNumOps = countNumberOfHistoryOperations(startingSeqNo);
final Releasable releaseStore = acquireStore(shard.store());
resources.add(releaseStore);
sendFileStep.whenComplete(r -> IOUtils.close(wrappedSafeCommit, releaseStore), e -> {
Expand Down Expand Up @@ -319,7 +319,7 @@ && isTargetSameHistory()
sendFileStep.whenComplete(r -> {
assert Transports.assertNotTransportThread(RecoverySourceHandler.this + "[prepareTargetForTranslog]");
// For a sequence based recovery, the target can keep its local translog
prepareTargetForTranslog(estimateNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
prepareTargetForTranslog(countNumberOfHistoryOperations(startingSeqNo), prepareEngineStep);
}, onFailure);

prepareEngineStep.whenComplete(prepareEngineTime -> {
Expand All @@ -340,9 +340,15 @@ && isTargetSameHistory()

final long endingSeqNo = shard.seqNoStats().getMaxSeqNo();
if (logger.isTraceEnabled()) {
logger.trace("snapshot translog for recovery; current size is [{}]", estimateNumberOfHistoryOperations(startingSeqNo));
logger.trace("snapshot translog for recovery; current size is [{}]", countNumberOfHistoryOperations(startingSeqNo));
}
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false);
final Translog.Snapshot phase2Snapshot = shard.newChangesSnapshot(
PEER_RECOVERY_NAME,
startingSeqNo,
Long.MAX_VALUE,
false,
true
);
resources.add(phase2Snapshot);
retentionLock.close();

Expand Down Expand Up @@ -403,10 +409,13 @@ private boolean isTargetSameHistory() {
return targetHistoryUUID.equals(shard.getHistoryUUID());
}

private int estimateNumberOfHistoryOperations(long startingSeqNo) throws IOException {
try (Translog.Snapshot snapshot = shard.newChangesSnapshot(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE, false)) {
return snapshot.totalOperations();
}
/**
* Counts the number of history operations from the starting sequence number
* @param startingSeqNo the starting sequence number to count; included
* @return number of history operations
*/
private int countNumberOfHistoryOperations(long startingSeqNo) throws IOException {
return shard.countNumberOfHistoryOperations(PEER_RECOVERY_NAME, startingSeqNo, Long.MAX_VALUE);
}

static void runUnderPrimaryPermit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,11 @@ public void finalizeRecovery(final long globalCheckpoint, final long trimAboveSe

private boolean hasUncommittedOperations() throws IOException {
long localCheckpointOfCommit = Long.parseLong(indexShard.commitStats().getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
try (
Translog.Snapshot snapshot = indexShard.newChangesSnapshot("peer-recovery", localCheckpointOfCommit + 1, Long.MAX_VALUE, false)
) {
return snapshot.totalOperations() > 0;
}
return indexShard.countNumberOfHistoryOperations(
RecoverySourceHandler.PEER_RECOVERY_NAME,
localCheckpointOfCommit + 1,
Long.MAX_VALUE
) > 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6362,8 +6362,12 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
latch.await();
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true);
changes.close();
if (randomBoolean()) {
Translog.Snapshot changes = engine.newChangesSnapshot("test", min, max, true, randomBoolean());
changes.close();
} else {
engine.countNumberOfHistoryOperations("test", min, max);
}
}
});
snapshotThreads[i].start();
Expand Down
Loading

0 comments on commit 1c67d0b

Please sign in to comment.