Skip to content

Commit

Permalink
Ensure version map is cleared after refresh.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jul 8, 2023
1 parent db72e96 commit 9ac24b4
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;

Expand Down Expand Up @@ -88,6 +90,8 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
for (ReferenceManager.RefreshListener listener : engineConfig.getInternalRefreshListener()) {
this.readerManager.addListener(listener);
}
// add versionMap as a listener to our reader,
this.readerManager.addListener(versionMap);
final Map<String, String> userData = store.readLastCommittedSegmentsInfo().getUserData();
final String translogUUID = Objects.requireNonNull(userData.get(Translog.TRANSLOG_UUID_KEY));
translogManagerRef = new WriteOnlyTranslogManager(
Expand Down Expand Up @@ -219,41 +223,40 @@ public boolean isThrottled() {
@Override
public IndexResult index(Index index) throws IOException {
ensureOpen();
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
try (
Releasable ignored = versionMap.acquireLock(index.uid().bytes());
) {
final VersionValue versionFromMap = getVersionFromMap(index.uid().bytes());
IndexResult indexResult = new IndexResult(index.version(), index.primaryTerm(), index.seqNo(), false);
final Translog.Location location = translogManager.add(new Translog.Index(index, indexResult));
indexResult.setTranslogLocation(location);
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();

final VersionValue versionFromMap = versionMap.getUnderLock(index.uid().bytes());
long version = versionFromMap != null ? versionFromMap.version : Versions.NOT_FOUND;
versionMap.enforceSafeAccess();
versionMap.maybePutIndexUnderLock(index.uid().bytes(),
new IndexVersionValue(indexResult.getTranslogLocation(), version, index.seqNo(), index.primaryTerm())
);
localCheckpointTracker.advanceMaxSeqNo(index.seqNo());
return indexResult;
}
localCheckpointTracker.advanceMaxSeqNo(index.seqNo());
return indexResult;
}

@Override
public DeleteResult delete(Delete delete) throws IOException {
ensureOpen();
versionMap.enforceSafeAccess();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
deleteResult.freeze();
try (
Releasable ignored = versionMap.acquireLock(delete.uid().bytes());
) {
try (Releasable ignored = versionMap.acquireLock(delete.uid().bytes())) {
versionMap.enforceSafeAccess();
DeleteResult deleteResult = new DeleteResult(delete.version(), delete.primaryTerm(), delete.seqNo(), true);
final Translog.Location location = translogManager.add(new Translog.Delete(delete, deleteResult));
deleteResult.setTranslogLocation(location);
deleteResult.setTook(System.nanoTime() - delete.startTime());
versionMap.putDeleteUnderLock(delete.uid().bytes(), new DeleteVersionValue(delete.version(), delete.seqNo(), delete.primaryTerm(), engineConfig.getThreadPool().relativeTimeInMillis()));
deleteResult.freeze();
localCheckpointTracker.advanceMaxSeqNo(delete.seqNo());
return deleteResult;
}
localCheckpointTracker.advanceMaxSeqNo(delete.seqNo());
return deleteResult;
}

@Override
Expand All @@ -268,65 +271,60 @@ public NoOpResult noOp(NoOp noOp) throws IOException {
return noOpResult;
}

private VersionValue getVersionFromMap(BytesRef id) {
if (versionMap.isUnsafe()) {
synchronized (versionMap) {
// we are switching from an unsafe map to a safe map. This might happen concurrently
// but we only need to do this once since the last operation per ID is to add to the version
// map so once we pass this point we can safely lookup from the version map.
if (versionMap.isUnsafe()) {
// refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
}
versionMap.enforceSafeAccess();
}
}
return versionMap.getUnderLock(id);
// for testing
final Map<BytesRef, VersionValue> getVersionMap() {
return Stream.concat(versionMap.getAllCurrent().entrySet().stream(), versionMap.getAllTombstones().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException {
assert Objects.equals(get.uid().field(), IdFieldMapper.NAME) : get.uid().field();
try (ReleasableLock ignored = readLock.acquire()) {
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
final VersionValue versionFromMap = getVersionFromMap(get.uid().bytes());
logger.info(versionMap.getAllCurrent());
if (versionFromMap.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (versionFromMap.getLocation() != null) {
try {
final Translog.Operation operation = translogManager.readOperation(versionFromMap.getLocation());
if (operation != null) {
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(
new Engine.Searcher(
"realtime_get",
reader,
IndexSearcher.getDefaultSimilarity(),
null,
IndexSearcher.getDefaultQueryCachingPolicy(),
reader
),
new VersionsAndSeqNoResolver.DocIdAndVersion(
0,
index.version(),
index.seqNo(),
index.primaryTerm(),
reader,
0
),
true
);
if (get.realtime()) {
try (ReleasableLock ignored = readLock.acquire()) {
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
final VersionValue versionFromMap = versionMap.getUnderLock(get.uid().bytes());
if (versionFromMap != null) {
if (versionFromMap.isDelete()) {
return GetResult.NOT_EXISTS;
}
if (versionFromMap.getLocation() != null) {
try {
final Translog.Operation operation = translogManager.readOperation(versionFromMap.getLocation());
if (operation != null) {
final Translog.Index index = (Translog.Index) operation;
TranslogLeafReader reader = new TranslogLeafReader(index);
return new GetResult(
new Engine.Searcher(
"realtime_get",
reader,
IndexSearcher.getDefaultSimilarity(),
null,
IndexSearcher.getDefaultQueryCachingPolicy(),
reader
),
new VersionsAndSeqNoResolver.DocIdAndVersion(
0,
index.version(),
index.seqNo(),
index.primaryTerm(),
reader,
0
),
true
);
}
} catch (IOException e) {
maybeFailEngine("realtime_get", e);
throw new EngineException(shardId, "failed to read operation from translog", e);
}
}
} catch (IOException e) {
logger.error("WTF", e);
}
}
// if doc is not in translog search the index
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}
}
// if realtime=false or doc is not in translog search the index
return getFromSearcher(get, searcherFactory, SearcherScope.EXTERNAL);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.IOContext;
import org.hamcrest.MatcherAssert;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.bytes.BytesReference;
Expand All @@ -37,6 +39,7 @@

import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -87,60 +90,24 @@ public void testGet() throws IOException {
final Store nrtEngineStore = createStore(INDEX_SETTINGS, newDirectory());
final NRTReplicationEngine nrtEngine = buildNrtReplicaEngine(globalCheckpoint, nrtEngineStore)
) {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

nrtEngine.refresh("warm_up");
searchResult = nrtEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();

// create a document
ParseContext.Document document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
ParsedDocument doc = testParsedDocument("1", null, document, B_1, null);
engine.index(indexForDoc(doc));
nrtEngine.index(indexForDoc(doc));

searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(
searchResult,
EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)
);
searchResult.close();

searchResult = nrtEngine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
MatcherAssert.assertThat(
searchResult,
EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(new TermQuery(new Term("value", "test")), 0)
);
searchResult.close();

final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = engine::acquireSearcher;
final BiFunction<String, Engine.SearcherScope, Engine.Searcher> nrtSearcherFactory = nrtEngine::acquireSearcher;

final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = nrtEngine::acquireSearcher;

// but, not there non realtime
try (Engine.GetResult getResult = engine.get(newGet(false, doc), searcherFactory)) {
// not present if realtime=false
try (Engine.GetResult getResult = nrtEngine.get(newGet(false, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(false));
}

// but, we can still get it (in realtime)
try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
logger.info(getResult.docIdAndVersion().docId);
}

// but, we can still get it (in realtime)
// present with realtime=true
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
logger.info(getResult.docIdAndVersion().docBase);
}

// now do an update
Expand All @@ -151,19 +118,71 @@ public void testGet() throws IOException {
engine.index(indexForDoc(doc));
nrtEngine.index(indexForDoc(doc));

// but, we can still get it (in realtime)
try (Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory)) {
// update present with realtime=true
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.docIdAndVersion(), notNullValue());
}

// refresh primary to flush new segments to disk.
engine.refresh("test");
// copy segments from primary to replica's store.
Lucene.cleanLuceneIndex(nrtEngine.store.directory());
for (String file : engine.store.directory().listAll()) {
// skip segments_n
if (file.startsWith(IndexFileNames.SEGMENTS)) {
continue;
}
nrtEngine.store.directory().copyFrom(engine.store.directory(), file, file, IOContext.DEFAULT);
}
// update nrtEngine with primary's latest infos.
nrtEngine.updateSegments(engine.getLatestSegmentInfos());

// doc is now present non realtime
try (Engine.GetResult getResult = nrtEngine.get(newGet(false, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
}

assertTrue("VersionMap is empty post refresh", engine.getVersionMap().isEmpty());
assertTrue("VersionMap empty post segment update", nrtEngine.getVersionMap().isEmpty());

// now delete
nrtEngine.delete(new Engine.Delete("1", newUid(doc), primaryTerm.get()));

// get should not see it (in realtime)
// doc is no longer present realtime=true
try (Engine.GetResult getResult = nrtEngine.get(newGet(true, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(false));
}

// doc is still present realtime=false
try (Engine.GetResult getResult = nrtEngine.get(newGet(false, doc), searcherFactory)) {
assertThat(getResult.exists(), equalTo(true));
}

// create a document
document = testDocumentWithTextField();
document.add(new Field(SourceFieldMapper.NAME, BytesReference.toBytes(B_1), SourceFieldMapper.Defaults.FIELD_TYPE));
doc = testParsedDocument("1", null, document, B_1, null);
engine.index(indexForDoc(doc));
nrtEngine.index(indexForDoc(doc));

// last received gen should be same and live map should not be wiped
// refresh primary to flush new segments to disk.
engine.refresh("test");
// wipe anything the replica has in its dir and copy all files from primary's directory.
// copy segments from primary to replica's store.
Lucene.cleanLuceneIndex(nrtEngine.store.directory());
for (String file : engine.store.directory().listAll()) {
// skip segments_n
if (file.startsWith(IndexFileNames.SEGMENTS)) {
continue;
}
nrtEngine.store.directory().copyFrom(engine.store.directory(), file, file, IOContext.DEFAULT);
}
// update nrtEngine with primary's latest infos.
nrtEngine.updateSegments(engine.getLatestSegmentInfos());
assertEquals("VersionMap is empty post refresh", Collections.emptyMap(), engine.getVersionMap());
assertEquals("VersionMap is empty post refresh", Collections.emptyMap(), nrtEngine.getVersionMap());
}
}

Expand Down Expand Up @@ -196,7 +215,6 @@ public void testEngineWritesOpsToTranslog() throws Exception {

final BiFunction<String, Engine.SearcherScope, Engine.Searcher> searcherFactory = engine::acquireSearcher;
final Engine.GetResult getResult = engine.get(newGet(true, doc), searcherFactory);
logger.info(getResult);

// we don't index into nrtEngine, so get the doc ids from the regular engine.
final List<DocIdSeqNoAndSource> docs = getDocIds(engine, true);
Expand Down

0 comments on commit 9ac24b4

Please sign in to comment.