Skip to content

Commit

Permalink
[Backport 2.x] Add method to Engine to fetch max seq no of last refre…
Browse files Browse the repository at this point in the history
…sh (#6038)

* Add method to Engine to fetch max seq no of last refresh

Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
sachinpkale authored Feb 2, 2023
1 parent 651803d commit e6a4ddc
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 0 deletions.
44 changes: 44 additions & 0 deletions server/src/main/java/org/opensearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.StandardDirectoryReader;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.Directory;
Expand All @@ -63,6 +67,7 @@
import org.opensearch.common.logging.Loggers;
import org.opensearch.common.lucene.Lucene;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.lucene.uid.Versions;
import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver;
import org.opensearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
Expand All @@ -71,9 +76,11 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.index.VersionType;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.Mapping;
import org.opensearch.index.mapper.ParseContext.Document;
import org.opensearch.index.mapper.ParsedDocument;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.merge.MergeStats;
import org.opensearch.index.seqno.SeqNoStats;
import org.opensearch.index.seqno.SequenceNumbers;
Expand Down Expand Up @@ -276,6 +283,42 @@ public void verifyEngineBeforeIndexClosing() throws IllegalStateException {
}
}

/**
* Get max sequence number from segments that are referenced by given SegmentInfos
*/
public long getMaxSeqNoFromSegmentInfos(SegmentInfos segmentInfos) throws IOException {
try (DirectoryReader innerReader = StandardDirectoryReader.open(store.directory(), segmentInfos, null, null)) {
final IndexSearcher searcher = new IndexSearcher(innerReader);
return getMaxSeqNoFromSearcher(searcher);
}
}

/**
* Get max sequence number that is part of given searcher. Sequence number is part of each document that is indexed.
* This method fetches the _id of last indexed document that was part of the given searcher and
* retrieves the _seq_no of the retrieved document.
*/
protected long getMaxSeqNoFromSearcher(IndexSearcher searcher) throws IOException {
searcher.setQueryCache(null);
ScoreDoc[] docs = searcher.search(
Queries.newMatchAllQuery(),
1,
new Sort(new SortField(SeqNoFieldMapper.NAME, SortField.Type.DOC, true))
).scoreDocs;
if (docs.length == 0) {
return SequenceNumbers.NO_OPS_PERFORMED;
}
org.apache.lucene.document.Document document = searcher.doc(docs[0].doc);
Term uidTerm = new Term(IdFieldMapper.NAME, document.getField(IdFieldMapper.NAME).binaryValue());
VersionsAndSeqNoResolver.DocIdAndVersion docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(
searcher.getIndexReader(),
uidTerm,
true
);
assert docIdAndVersion != null;
return docIdAndVersion.seqNo;
}

/**
* A throttling class that can be activated, causing the
* {@code acquireThrottle} method to block on a lock when throttling
Expand Down Expand Up @@ -2110,4 +2153,5 @@ public interface TranslogRecoveryRunner {
* to advance this marker to at least the given sequence number.
*/
public abstract void advanceMaxSeqNoOfUpdatesOrDeletes(long maxSeqNoOfUpdatesOnPrimary);

}
Original file line number Diff line number Diff line change
Expand Up @@ -7579,4 +7579,106 @@ public void testGetProcessedLocalCheckpoint() throws IOException {
store.close();
engine.close();
}

public void testGetMaxSeqNoRefreshedWithoutRefresh() throws IOException {
IOUtils.close(store, engine);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy());

int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}

try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(NO_OPS_PERFORMED, engine.getMaxSeqNoFromSearcher(searcher));
}

store.close();
engine.close();
}

public void testGetMaxSeqNoRefreshed() throws IOException {
IOUtils.close(store, engine);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy());

int totalNumberOfDocsRefreshed = 0;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
int numDocs = randomIntBetween(10, 100);
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + numDocs); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
// this is just to make sure that refresh post flush has the same impact.
if (randomBoolean()) {
engine.refresh("test");
} else {
engine.flush();
}
totalNumberOfDocsRefreshed += numDocs;
}
// Optionally, index more docs without refreshing. These should not be part of getMaxSeqNoRefreshed
if (randomBoolean()) {
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + randomIntBetween(10, 100)); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
}

try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSearcher(searcher));
}

store.close();
engine.close();
}

public void testGetMaxSeqNoFromSegmentInfos() throws IOException {
IOUtils.close(store, engine);

final Settings.Builder settings = Settings.builder().put(defaultSettings.getSettings()).put("index.refresh_interval", "300s");
final IndexMetadata indexMetadata = IndexMetadata.builder(defaultSettings.getIndexMetadata()).settings(settings).build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(indexMetadata);

Store store = createStore();
InternalEngine engine = createEngine(indexSettings, store, createTempDir(), newMergePolicy());

int totalNumberOfDocsRefreshed = 0;
for (int j = 0; j < randomIntBetween(1, 10); j++) {
int numDocs = randomIntBetween(10, 100);
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + numDocs); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
// this is just to make sure that refresh post flush has the same impact.
if (randomBoolean()) {
engine.refresh("test");
} else {
engine.flush();
}
totalNumberOfDocsRefreshed += numDocs;
}
// Optionally, index more docs without refreshing. These should not be part of getMaxSeqNoRefreshed
if (randomBoolean()) {
for (int i = totalNumberOfDocsRefreshed; i < (totalNumberOfDocsRefreshed + randomIntBetween(10, 100)); i++) {
engine.index(indexForDoc(createParsedDoc(Integer.toString(i), null)));
}
}

try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = engine.getSegmentInfosSnapshot()) {
assertEquals(totalNumberOfDocsRefreshed - 1, engine.getMaxSeqNoFromSegmentInfos(segmentInfosGatedCloseable.get()));
}

store.close();
engine.close();
}

}

0 comments on commit e6a4ddc

Please sign in to comment.