Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the ability to acquire readers in IndexShard #54966

Merged
merged 23 commits into from
Apr 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(SHARD_OPEN_READER_NAME, ThreadPool.Names.SAME,
TransportOpenReaderAction.ShardOpenReaderRequest::new,
(request, channel, task) -> {
searchService.openReaderContext(request.searchShardTarget.getShardId(), request.keepAlive,
searchService.openReaderContext(request.getShardId(), request.keepAlive,
ActionListener.map(new ChannelActionListener<>(channel, SHARD_OPEN_READER_NAME, request),
contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId, request.searchShardTarget)));
contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId)));
});
TransportActionProxy.registerProxyAction(
transportService, SHARD_OPEN_READER_NAME, TransportOpenReaderAction.ShardOpenReaderResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -34,6 +35,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.AliasFilter;
Expand Down Expand Up @@ -93,42 +95,49 @@ public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
}

static final class ShardOpenReaderRequest extends TransportRequest implements IndicesRequest {
final SearchShardTarget searchShardTarget;
final ShardId shardId;
final OriginalIndices originalIndices;
final TimeValue keepAlive;

ShardOpenReaderRequest(SearchShardTarget searchShardTarget, TimeValue keepAlive) {
this.searchShardTarget = searchShardTarget;
ShardOpenReaderRequest(ShardId shardId, OriginalIndices originalIndices, TimeValue keepAlive) {
this.shardId = shardId;
this.originalIndices = originalIndices;
this.keepAlive = keepAlive;
}

ShardOpenReaderRequest(StreamInput in) throws IOException {
super(in);
searchShardTarget = new SearchShardTarget(in);
shardId = new ShardId(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
keepAlive = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
searchShardTarget.writeTo(out);
shardId.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
out.writeTimeValue(keepAlive);
}

public ShardId getShardId() {
return shardId;
}

@Override
public String[] indices() {
return searchShardTarget.getOriginalIndices().indices();
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
return searchShardTarget.getOriginalIndices().indicesOptions();
return originalIndices.indicesOptions();
}
}

static final class ShardOpenReaderResponse extends SearchPhaseResult {
ShardOpenReaderResponse(SearchContextId contextId, SearchShardTarget searchShardTarget) {
ShardOpenReaderResponse(SearchContextId contextId) {
this.contextId = contextId;
setSearchShardTarget(searchShardTarget);
}

ShardOpenReaderResponse(StreamInput in) throws IOException {
Expand Down Expand Up @@ -163,7 +172,8 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha
SearchActionListener<SearchPhaseResult> listener) {
final Transport.Connection connection = getConnection(shardIt.getClusterAlias(), shard.currentNodeId());
final SearchShardTarget searchShardTarget = shardIt.newSearchShardTarget(shard.currentNodeId());
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget, openReaderRequest.keepAlive());
final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget.getShardId(),
searchShardTarget.getOriginalIndices(), openReaderRequest.keepAlive());
getSearchTransport().sendShardOpenReader(connection, getTask(), shardRequest, ActionListener.map(listener, r -> r));
}

Expand Down
109 changes: 70 additions & 39 deletions server/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
Expand Down Expand Up @@ -588,31 +589,17 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc

public abstract GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> searcherFactory) throws EngineException;


/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
*
* @see Searcher#close()
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.EXTERNAL);
public final SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper) throws EngineException {
return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL);
}

/**
* Returns a new searcher instance. The consumer of this
* API is responsible for releasing the returned searcher in a
* safe manner, preferably in a try/finally block.
*
* @param source the source API or routing that triggers this searcher acquire
* @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes
*
* @see Searcher#close()
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
public SearcherSupplier acquireSearcherSupplier(Function<Searcher, Searcher> wrapper, SearcherScope scope) throws EngineException {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
Expand All @@ -621,35 +608,64 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin
}
Releasable releasable = store::decRef;
try {
assert assertSearcherIsWarmedUp(source, scope);
ReferenceManager<ElasticsearchDirectoryReader> referenceManager = getReferenceManager(scope);
final ElasticsearchDirectoryReader acquire = referenceManager.acquire();
ElasticsearchDirectoryReader acquire = referenceManager.acquire();
AtomicBoolean released = new AtomicBoolean(false);
Searcher engineSearcher = new Searcher(source, acquire,
engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(),
() -> {
if (released.compareAndSet(false, true)) {
try {
referenceManager.release(acquire);
} finally {
store.decRef();
SearcherSupplier reader = new SearcherSupplier(wrapper) {
@Override
public Searcher acquireSearcherInternal(String source) {
assert assertSearcherIsWarmedUp(source, scope);
return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(),
engineConfig.getQueryCachingPolicy(), () -> {});
}

@Override
public void close() {
if (released.compareAndSet(false, true)) {
try {
referenceManager.release(acquire);
} catch (IOException e) {
throw new UncheckedIOException("failed to close", e);
} catch (AlreadyClosedException e) {
// This means there's a bug somewhere: don't suppress it
throw new AssertionError(e);
} finally {
store.decRef();
}
} else {
/* In general, readers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
* amount of time, this is why we only log a warning instead of throwing an exception. */
logger.warn("Reader was released twice", new IllegalStateException("Double release"));
}
} else {
/* In general, readers should never be released twice or this would break reference counting. There is one rare case
* when it might happen though: when the request and the Reaper thread would both try to release it in a very short
* amount of time, this is why we only log a warning instead of throwing an exception. */
logger.warn("Searcher was released twice", new IllegalStateException("Double release"));
}
});
};
releasable = null; // success - hand over the reference to the engine reader
return engineSearcher;
return reader;
} catch (AlreadyClosedException ex) {
throw ex;
} catch (Exception ex) {
maybeFailEngine("acquire_searcher", ex);
maybeFailEngine("acquire_reader", ex);
ensureOpen(ex); // throw EngineCloseException here if we are already closed
logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex);
throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex);
logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex);
throw new EngineException(shardId, "failed to acquire reader", ex);
} finally {
Releasables.close(releasable);
}
}

public final Searcher acquireSearcher(String source) throws EngineException {
return acquireSearcher(source, SearcherScope.EXTERNAL);
}

public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException {
SearcherSupplier releasable = null;
try {
SearcherSupplier reader = releasable = acquireSearcherSupplier(Function.identity(), scope);
Searcher searcher = reader.acquireSearcher(source);
releasable = null;
return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(),
searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader));
} finally {
Releasables.close(releasable);
}
Expand Down Expand Up @@ -1158,6 +1174,21 @@ default void onFailedEngine(String reason, @Nullable Exception e) {
}
}

public abstract static class SearcherSupplier implements Releasable {
private final Function<Searcher, Searcher> wrapper;

public SearcherSupplier(Function<Searcher, Searcher> wrapper) {
this.wrapper = wrapper;
}

public final Searcher acquireSearcher(String source) {
final Searcher searcher = acquireSearcherInternal(source);
return "can_match".equals(source) ? searcher : wrapper.apply(searcher);
}

protected abstract Searcher acquireSearcherInternal(String source);
}

public static final class Searcher extends IndexSearcher implements Releasable {
private final String source;
private final Closeable onClose;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;

import java.util.HashMap;
Expand Down Expand Up @@ -158,15 +157,15 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.inc();
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
totalStats.scrollCurrent.dec();
assert totalStats.scrollCurrent.count() >= 0;
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano()));
totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano()));
}

static final class StatsHolder {
Expand Down
16 changes: 12 additions & 4 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1197,12 +1197,20 @@ public void failShard(String reason, @Nullable Exception e) {
}

/**
* Acquire a lightweight searcher which can be used to rewrite shard search requests.
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.Searcher acquireCanMatchSearcher() {
public Engine.SearcherSupplier acquireSearcherSupplier() {
return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL);
}

/**
* Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand.
*/
public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) {
readAllowed();
markSearcherAccessed();
return getEngine().acquireSearcher("can_match", Engine.SearcherScope.EXTERNAL);
final Engine engine = getEngine();
return engine.acquireSearcherSupplier(this::wrapSearcher, scope);
}

public Engine.Searcher acquireSearcher(String source) {
Expand All @@ -1218,7 +1226,7 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop
markSearcherAccessed();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source, scope);
return wrapSearcher(searcher);
return "can_match".equals(source) ? searcher : wrapSearcher(searcher);
}

private Engine.Searcher wrapSearcher(Engine.Searcher searcher) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.search.internal.ReaderContext;
import org.elasticsearch.search.internal.ScrollContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.transport.TransportRequest;

Expand Down Expand Up @@ -93,28 +92,29 @@ default void onNewReaderContext(ReaderContext readerContext) {}
default void onFreeReaderContext(ReaderContext readerContext) {}

/**
* Executed when a new scroll search {@link SearchContext} was created
* @param scrollContext the created search context
* Executed when a new scroll search {@link ReaderContext} was created
* @param readerContext the created reader context
*/
default void onNewScrollContext(ScrollContext scrollContext) {}
default void onNewScrollContext(ReaderContext readerContext) {}

/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param scrollContext the freed search context
* @param readerContext the freed search context
*/
default void onFreeScrollContext(ScrollContext scrollContext) {}
default void onFreeScrollContext(ReaderContext readerContext) {}

/**
* Executed prior to using a {@link SearchContext} that has been retrieved
* from the active contexts. If the context is deemed invalid a runtime
* exception can be thrown, which will prevent the context from being used.
* @param context the context retrieved from the active contexts
* @param readerContext The reader context used by this request.
* @param searchContext The newly created {@link SearchContext}.
* @param transportRequest the request that is going to use the search context
*/
default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {}
default void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest transportRequest) {}

/**
* Executed when a search context was freed. The implementor can implement
Expand Down Expand Up @@ -225,33 +225,33 @@ public void onFreeReaderContext(ReaderContext readerContext) {
}

@Override
public void onNewScrollContext(ScrollContext scrollContext) {
public void onNewScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(scrollContext);
listener.onNewScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void onFreeScrollContext(ScrollContext scrollContext) {
public void onFreeScrollContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(scrollContext);
listener.onFreeScrollContext(readerContext);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e);
}
}
}

@Override
public void validateSearchContext(SearchContext context, TransportRequest request) {
public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) {
Exception exception = null;
for (SearchOperationListener listener : listeners) {
try {
listener.validateSearchContext(context, request);
listener.validateSearchContext(readerContext, searchContext, request);
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down
Loading