Skip to content

Commit

Permalink
Do not warm up searcher in engine constructor (#48605)
Browse files Browse the repository at this point in the history
With this change, we won't warm up searchers until we externally refresh
an engine. We explicitly refresh before allowing reading from a shard
(i.e., move to post_recovery state) and during resetting. These
guarantees that we have warmed up the engine before exposing the
external searcher.

Another prerequisite for #47186.
  • Loading branch information
dnhatn committed Oct 30, 2019
1 parent c9ead80 commit f8ef402
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}
Releasable releasable = store::decRef;
try {
assert assertSearcherIsWarmedUp(source, scope);
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
final ElasticsearchDirectoryReader acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
Expand Down Expand Up @@ -705,6 +706,10 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin

protected abstract ReferenceManager<ElasticsearchDirectoryReader> getReferenceManager(SearcherScope scope);

boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
return true;
}

public enum SearcherScope {
EXTERNAL, INTERNAL
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,18 +322,13 @@ private SoftDeletesPolicy newSoftDeletesPolicy() throws IOException {
private static final class ExternalReaderManager extends ReferenceManager<ElasticsearchDirectoryReader> {
private final BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener;
private final ElasticsearchReaderManager internalReaderManager;
private boolean isWarmedUp; //guarded by refreshLock

ExternalReaderManager(ElasticsearchReaderManager internalReaderManager,
BiConsumer<ElasticsearchDirectoryReader, ElasticsearchDirectoryReader> refreshListener) throws IOException {
this.refreshListener = refreshListener;
this.internalReaderManager = internalReaderManager;
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
try {
incrementAndNotify(acquire, null);
current = acquire;
} finally {
internalReaderManager.release(acquire);
}
this.current = internalReaderManager.acquire(); // steal the reference without warming up
}

@Override
Expand All @@ -342,26 +337,25 @@ protected ElasticsearchDirectoryReader refreshIfNeeded(ElasticsearchDirectoryRea
// it's a save operation since we acquire the reader which incs it's reference but then down the road
// steal it by calling incRef on the "stolen" reader
internalReaderManager.maybeRefreshBlocking();
ElasticsearchDirectoryReader acquire = internalReaderManager.acquire();
try {
if (acquire == referenceToRefresh) {
// nothing has changed - both ref managers share the same instance so we can use reference equality
return null;
} else {
incrementAndNotify(acquire, referenceToRefresh);
return acquire;
final ElasticsearchDirectoryReader newReader = internalReaderManager.acquire();
if (isWarmedUp == false || newReader != referenceToRefresh) {
boolean success = false;
try {
refreshListener.accept(newReader, isWarmedUp ? referenceToRefresh : null);
isWarmedUp = true;
success = true;
} finally {
if (success == false) {
internalReaderManager.release(newReader);
}
}
} finally {
internalReaderManager.release(acquire);
}
}

private void incrementAndNotify(ElasticsearchDirectoryReader reader,
ElasticsearchDirectoryReader previousReader) throws IOException {
reader.incRef(); // steal the reference
try (Closeable c = reader::decRef) {
refreshListener.accept(reader, previousReader);
reader.incRef(); // double inc-ref if we were successful
// nothing has changed - both ref managers share the same instance so we can use reference equality
if (referenceToRefresh == newReader) {
internalReaderManager.release(newReader);
return null;
} else {
return newReader; // steal the reference
}
}

Expand All @@ -376,7 +370,24 @@ protected int getRefCount(ElasticsearchDirectoryReader reference) {
}

@Override
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException { reference.decRef(); }
protected void decRef(ElasticsearchDirectoryReader reference) throws IOException {
reference.decRef();
}
}

@Override
final boolean assertSearcherIsWarmedUp(String source, SearcherScope scope) {
if (scope == SearcherScope.EXTERNAL) {
switch (source) {
// we can access segment_stats while a shard is still in the recovering state.
case "segments":
case "segments_stats":
break;
default:
assert externalReaderManager.isWarmedUp : "searcher was not warmed up yet for source[" + source + "]";
}
}
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2702,6 +2702,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) {
final Sort indexSort = indexSortSupplier.get();
final Engine.Warmer warmer = reader -> {
assert Thread.holdsLock(mutex) == false : "warming engine under mutex";
assert reader != null;
if (this.warmer != null) {
this.warmer.warm(reader);
}
Expand Down Expand Up @@ -3413,6 +3414,7 @@ public void close() throws IOException {
// TODO: add a dedicate recovery stats for the reset translog
});
newEngineReference.get().recoverFromTranslog(translogRunner, globalCheckpoint);
newEngineReference.get().refresh("reset_engine");
synchronized (mutex) {
verifyNotClosed();
IOUtils.close(currentEngineReference.getAndSet(newEngineReference.get()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.isIn;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
Expand All @@ -215,6 +216,7 @@
public class InternalEngineTests extends EngineTestCase {

public void testVersionMapAfterAutoIDDocument() throws IOException {
engine.refresh("warm_up");
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField("test"),
new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
Engine.Index operation = randomBoolean() ?
Expand Down Expand Up @@ -926,6 +928,7 @@ public void testConcurrentGetAndFlush() throws Exception {
}

public void testSimpleOperations() throws Exception {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
Expand Down Expand Up @@ -1103,6 +1106,7 @@ public void testSimpleOperations() throws Exception {
}

public void testSearchResultRelease() throws Exception {
engine.refresh("warm_up");
Engine.Searcher searchResult = engine.acquireSearcher("test");
MatcherAssert.assertThat(searchResult, EngineSearcherTotalHitsMatcher.engineSearcherTotalHits(0));
searchResult.close();
Expand Down Expand Up @@ -2175,7 +2179,7 @@ public void testVersioningPromotedReplica() throws IOException {
final int opsOnPrimary = assertOpsOnPrimary(primaryOps, finalReplicaVersion, deletedOnReplica, replicaEngine);
final long currentSeqNo = getSequenceID(replicaEngine,
new Engine.Get(false, false, "type", lastReplicaOp.uid().text(), lastReplicaOp.uid())).v1();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
final TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(new MatchAllDocsQuery(), collector);
if (collector.getTotalHits() > 0) {
Expand Down Expand Up @@ -2740,7 +2744,7 @@ public void testEnableGcDeletes() throws Exception {
}

public void testExtractShardId() {
try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
try (Engine.Searcher test = this.engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
assertNotNull(shardId);
assertEquals(shardId, engine.config().getShardId());
Expand Down Expand Up @@ -3015,7 +3019,7 @@ public void testSkipTranslogReplay() throws IOException {
engine.close();
try (InternalEngine engine = new InternalEngine(config)) {
engine.skipTranslogRecovery();
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
try (Engine.Searcher searcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), randomIntBetween(numDocs, numDocs + 10));
assertThat(topDocs.totalHits.value, equalTo(0L));
}
Expand Down Expand Up @@ -3058,6 +3062,7 @@ public void testTranslogReplay() throws IOException {
// we need to reuse the engine config unless the parser.mappingModified won't work
engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier));
engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
engine.refresh("warm_up");

assertVisibleCount(engine, numDocs, false);
assertEquals(numDocs, translogHandler.appliedOperations());
Expand All @@ -3071,6 +3076,7 @@ public void testTranslogReplay() throws IOException {
engine.close();
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
assertVisibleCount(engine, numDocs, false);
assertEquals(0, translogHandler.appliedOperations());

Expand Down Expand Up @@ -3100,6 +3106,7 @@ public void testTranslogReplay() throws IOException {
engine.close();
translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
engine.refresh("warm_up");
try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), numDocs + 1);
assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L));
Expand Down Expand Up @@ -4491,7 +4498,7 @@ private void index(final InternalEngine engine, final int id) throws IOException
* second is the primary term.
*/
private Tuple<Long, Long> getSequenceID(Engine engine, Engine.Get get) throws EngineException {
try (Engine.Searcher searcher = engine.acquireSearcher("get")) {
try (Engine.Searcher searcher = engine.acquireSearcher("get", Engine.SearcherScope.INTERNAL)) {
final long primaryTerm;
final long seqNo;
DocIdAndSeqNo docIdAndSeqNo = VersionsAndSeqNoResolver.loadDocIdAndSeqNo(searcher.getIndexReader(), get.uid());
Expand Down Expand Up @@ -4673,7 +4680,7 @@ public void testRefreshScopedSearcher() throws IOException {
InternalEngine engine =
// disable merges to make sure that the reader doesn't change unexpectedly during the test
createEngine(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) {

engine.refresh("warm_up");
try (Engine.Searcher getSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL);
Engine.Searcher searchSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSameReader(getSearcher, searchSearcher);
Expand Down Expand Up @@ -5536,7 +5543,7 @@ protected void doRun() throws Exception {

public void testAcquireSearcherOnClosingEngine() throws Exception {
engine.close();
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test"));
expectThrows(AlreadyClosedException.class, () -> engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL));
}

public void testNoOpOnClosingEngine() throws Exception {
Expand Down Expand Up @@ -6195,4 +6202,59 @@ public void afterRefresh(boolean didRefresh) {
}
}
}

public void testNotWarmUpSearcherInEngineCtor() throws Exception {
try (Store store = createStore()) {
List<ElasticsearchDirectoryReader> warmedUpReaders = new ArrayList<>();
Engine.Warmer warmer = reader -> {
assertNotNull(reader);
assertThat(reader, not(in(warmedUpReaders)));
warmedUpReaders.add(reader);
};
EngineConfig config = engine.config();
final TranslogConfig translogConfig = new TranslogConfig(config.getTranslogConfig().getShardId(),
createTempDir(), config.getTranslogConfig().getIndexSettings(), config.getTranslogConfig().getBigArrays());
EngineConfig configWithWarmer = new EngineConfig(config.getShardId(), config.getAllocationId(), config.getThreadPool(),
config.getIndexSettings(), warmer, store, config.getMergePolicy(), config.getAnalyzer(),
config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getQueryCache(),
config.getQueryCachingPolicy(), translogConfig, config.getFlushMergesAfter(),
config.getExternalRefreshListener(), config.getInternalRefreshListener(), config.getIndexSort(),
config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.retentionLeasesSupplier(),
config.getPrimaryTermSupplier(), config.getTombstoneDocSupplier());
try (InternalEngine engine = createEngine(configWithWarmer)) {
assertThat(warmedUpReaders, empty());
assertThat(expectThrows(Throwable.class, () -> engine.acquireSearcher("test")).getMessage(),
equalTo("searcher was not warmed up yet for source[test]"));
int times = randomIntBetween(1, 10);
for (int i = 0; i < times; i++) {
engine.refresh("test");
}
assertThat(warmedUpReaders, hasSize(1));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
assertSame(warmedUpReaders.get(0), externalSearcher.getDirectoryReader());
}
}
index(engine, randomInt());
if (randomBoolean()) {
engine.refresh("test", Engine.SearcherScope.INTERNAL, true);
assertThat(warmedUpReaders, hasSize(1));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertNotSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
}
}
}
engine.refresh("test");
assertThat(warmedUpReaders, hasSize(2));
try (Engine.Searcher internalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.INTERNAL)) {
try (Engine.Searcher externalSearcher = engine.acquireSearcher("test", Engine.SearcherScope.EXTERNAL)) {
assertSame(internalSearcher.getDirectoryReader(), externalSearcher.getDirectoryReader());
assertSame(warmedUpReaders.get(1), externalSearcher.getDirectoryReader());
}
}
}
}
}
}

0 comments on commit f8ef402

Please sign in to comment.