Skip to content

Commit

Permalink
Adds the ability to acquire readers in IndexShard (#54966)
Browse files Browse the repository at this point in the history
This change adds the ability to acquire a point in time reader on an engine.
This is needed for frozen indices that lazily loads the reader on every
phase of the search requests. Acquiring a reader on a frozen index ensures
that the engine will not be closed until the reader is released, leaving the
directory reader unopened until a call to acquire a searcher is made.
When the searcher is closed, the underlyinng directory reader is also closed
unless another requests on the same frozen shard is in-flight. This ensures
that the directory reader of frozen indices is opened only when requests are
executed (they consume a thread in the search throttled pool).
  • Loading branch information
jimczi authored Apr 27, 2020
1 parent 15f18b1 commit 51f9542
Show file tree
Hide file tree
Showing 26 changed files with 847 additions and 1,014 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(SHARD_OPEN_READER_NAME, ThreadPool.Names.SAME,
TransportOpenReaderAction.ShardOpenReaderRequest::new,
(request, channel, task) -> {
searchService.openReaderContext(request.searchShardTarget.getShardId(), request.keepAlive,
searchService.openReaderContext(request.getShardId(), request.keepAlive,
ActionListener.map(new ChannelActionListener<>(channel, SHARD_OPEN_READER_NAME, request),
contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId, request.searchShardTarget)));
contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId)));
});
TransportActionProxy.registerProxyAction(
transportService, SHARD_OPEN_READER_NAME, TransportOpenReaderAction.ShardOpenReaderResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -93,42 +95,49 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
}

static final class ShardOpenReaderRequest extends TransportRequest implements IndicesRequest {
final SearchShardTarget searchShardTarget;
final ShardId shardId;
final OriginalIndices originalIndices;
final TimeValue keepAlive;

ShardOpenReaderRequest(SearchShardTarget searchShardTarget, TimeValue keepAlive) {
this.searchShardTarget = searchShardTarget;
ShardOpenReaderRequest(ShardId shardId, OriginalIndices originalIndices, TimeValue keepAlive) {
this.shardId = shardId;
this.originalIndices = originalIndices;
this.keepAlive = keepAlive;
}

ShardOpenReaderRequest(StreamInput in) throws IOException {
super(in);
searchShardTarget = new SearchShardTarget(in);
shardId = new ShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
keepAlive = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
searchShardTarget.writeTo(out);
shardId.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
out.writeTimeValue(keepAlive);
}

public ShardId getShardId() {
return shardId;
}

@Override
public String[] indices() {
return searchShardTarget.getOriginalIndices().indices();
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return searchShardTarget.getOriginalIndices().indicesOptions();
return originalIndices.indicesOptions();
}
}

static final class ShardOpenReaderResponse extends SearchPhaseResult {
ShardOpenReaderResponse(SearchContextId contextId, SearchShardTarget searchShardTarget) {
ShardOpenReaderResponse(SearchContextId contextId) {
this.contextId = contextId;
setSearchShardTarget(searchShardTarget);
}

ShardOpenReaderResponse(StreamInput in) throws IOException {
Expand Down Expand Up @@ -163,7 +172,8 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
SearchActionListener<SearchPhaseResult> listener) {
final Transport.Connection connection = getConnection(shardIt.getClusterAlias(), shard.currentNodeId());
final SearchShardTarget searchShardTarget = shardIt.newSearchShardTarget(shard.currentNodeId());
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget, openReaderRequest.keepAlive());
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget.getShardId(),
searchShardTarget.getOriginalIndices(), openReaderRequest.keepAlive());
getSearchTransport().sendShardOpenReader(connection, getTask(), shardRequest, ActionListener.map(listener, r -> r));
}

Expand Down
109 changes: 70 additions & 39 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -588,31 +589,17 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc

public abstract GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException;


/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
*
* @see Searcher#close()
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.EXTERNAL);
public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL);
}

/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
*
* @see Searcher#close()
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -621,35 +608,64 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}
Releasable releasable = store::decRef;
try {
assert assertSearcherIsWarmedUp(source, scope);
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
final ElasticsearchDirectoryReader acquire = referenceManager.acquire();
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
Searcher engineSearcher = new Searcher(source, acquire,
engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(),
() -> {
if (released.compareAndSet(false, true)) {
try {
referenceManager.release(acquire);
} finally {
store.decRef();
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(),
engineConfig.getQueryCachingPolicy(), () -> {});
}

@Override
public void close() {
if (released.compareAndSet(false, true)) {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
} else {
/* In general, readers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
* amount of time, this is why we only log a warning instead of throwing an exception. */
logger.warn("Reader was released twice", new IllegalStateException("Double release"));
}
} else {
/* In general, readers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
* amount of time, this is why we only log a warning instead of throwing an exception. */
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
}
});
};
releasable = null; // success - hand over the reference to the engine reader
return engineSearcher;
return reader;
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_searcher", ex);
maybeFailEngine("acquire_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex);
throw new EngineException(shardId, "failed to acquire reader", ex);
} finally {
Releasables.close(releasable);
}
}

public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.EXTERNAL);
}

public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
SearcherSupplier releasable = null;
try {
SearcherSupplier reader = releasable = acquireSearcherSupplier(Function.identity(), scope);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),
searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader));
} finally {
Releasables.close(releasable);
}
Expand Down Expand Up @@ -1158,6 +1174,21 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}
}

public abstract static class SearcherSupplier implements Releasable {
private final Function<Searcher, Searcher> wrapper;

public SearcherSupplier(Function<Searcher, Searcher> wrapper) {
this.wrapper = wrapper;
}

public final Searcher acquireSearcher(String source) {
final Searcher searcher = acquireSearcherInternal(source);
return "can_match".equals(source) ? searcher : wrapper.apply(searcher);
}

protected abstract Searcher acquireSearcherInternal(String source);
}

public static final class Searcher extends IndexSearcher implements Releasable {
private final String source;
private final Closeable onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -158,15 +157,15 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

static final class StatsHolder {
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,20 @@ public void failShard(String reason, @Nullable Exception e) {
}

/**
* Acquire a lightweight searcher which can be used to rewrite shard search requests.
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.Searcher acquireCanMatchSearcher() {
public Engine.SearcherSupplier acquireSearcherSupplier() {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
readAllowed();
markSearcherAccessed();
return getEngine().acquireSearcher("can_match", Engine.SearcherScope.EXTERNAL);
final Engine engine = getEngine();
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
}

public Engine.Searcher acquireSearcher(String source) {
Expand All @@ -1218,7 +1226,7 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop
markSearcherAccessed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
return wrapSearcher(searcher);
return "can_match".equals(source) ? searcher : wrapSearcher(searcher);
}

private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -93,28 +92,29 @@ default void onNewReaderContext(ReaderContext readerContext) {}
default void onFreeReaderContext(ReaderContext readerContext) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param scrollContext the created search context
* Executed when a new scroll search {@link ReaderContext} was created
* @param readerContext the created reader context
*/
default void onNewScrollContext(ScrollContext scrollContext) {}
default void onNewScrollContext(ReaderContext readerContext) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param scrollContext the freed search context
* @param readerContext the freed search context
*/
default void onFreeScrollContext(ScrollContext scrollContext) {}
default void onFreeScrollContext(ReaderContext readerContext) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
* from the active contexts. If the context is deemed invalid a runtime
* exception can be thrown, which will prevent the context from being used.
* @param context the context retrieved from the active contexts
* @param readerContext The reader context used by this request.
* @param searchContext The newly created {@link SearchContext}.
* @param transportRequest the request that is going to use the search context
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}
default void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
Expand Down Expand Up @@ -225,33 +225,33 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(scrollContext);
listener.onNewScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(scrollContext);
listener.onFreeScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void validateSearchContext(SearchContext context, TransportRequest request) {
public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) {
Exception exception = null;
for (SearchOperationListener listener : listeners) {
try {
listener.validateSearchContext(context, request);
listener.validateSearchContext(readerContext, searchContext, request);
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down
Loading

0 comments on commit 51f9542

Please sign in to comment.