From 51f9542d25b8e0d0fe9cc80e13d18dae3ecf067d Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Tue, 28 Apr 2020 00:11:41 +0200 Subject: [PATCH] Adds the ability to acquire readers in IndexShard (#54966) This change adds the ability to acquire a point in time reader on an engine. This is needed for frozen indices that lazily loads the reader on every phase of the search requests. Acquiring a reader on a frozen index ensures that the engine will not be closed until the reader is released, leaving the directory reader unopened until a call to acquire a searcher is made. When the searcher is closed, the underlyinng directory reader is also closed unless another requests on the same frozen shard is in-flight. This ensures that the directory reader of frozen indices is opened only when requests are executed (they consume a thread in the search throttled pool). --- .../action/search/SearchTransportService.java | 4 +- .../search/TransportOpenReaderAction.java | 30 +- .../elasticsearch/index/engine/Engine.java | 109 ++-- .../index/search/stats/ShardSearchStats.java | 7 +- .../elasticsearch/index/shard/IndexShard.java | 16 +- .../index/shard/SearchOperationListener.java | 28 +- .../search/DefaultSearchContext.java | 6 +- .../elasticsearch/search/SearchService.java | 286 +++++----- .../search/internal/LegacyReaderContext.java | 33 +- .../search/internal/ReaderContext.java | 52 +- .../search/internal/ScrollContext.java | 32 -- .../search/internal/SearchContext.java | 4 +- .../search/internal/ShardSearchRequest.java | 2 +- .../shard/SearchOperationListenerTests.java | 24 +- .../search/DefaultSearchContextTests.java | 33 +- .../search/SearchServiceTests.java | 30 +- .../search/internal/ScrollContextTests.java | 36 -- .../test/engine/MockInternalEngine.java | 6 + .../index/engine/FrozenEngine.java | 489 +++--------------- .../xpack/frozen/FrozenIndices.java | 9 - .../index/engine/FrozenEngineTests.java | 204 ++++---- .../index/engine/FrozenIndexTests.java | 49 +- .../SecuritySearchOperationListener.java | 42 +- .../DocumentLevelSecurityTests.java | 43 ++ .../integration/FieldLevelSecurityTests.java | 53 +- .../SecuritySearchOperationListenerTests.java | 234 +++++---- 26 files changed, 847 insertions(+), 1014 deletions(-) delete mode 100644 server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index be697cc78f79b..38a0288c5e2f0 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -374,9 +374,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler(SHARD_OPEN_READER_NAME, ThreadPool.Names.SAME, TransportOpenReaderAction.ShardOpenReaderRequest::new, (request, channel, task) -> { - searchService.openReaderContext(request.searchShardTarget.getShardId(), request.keepAlive, + searchService.openReaderContext(request.getShardId(), request.keepAlive, ActionListener.map(new ChannelActionListener<>(channel, SHARD_OPEN_READER_NAME, request), - contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId, request.searchShardTarget))); + contextId -> new TransportOpenReaderAction.ShardOpenReaderResponse(contextId))); }); TransportActionProxy.registerProxyAction( transportService, SHARD_OPEN_READER_NAME, TransportOpenReaderAction.ShardOpenReaderResponse::new); diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportOpenReaderAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportOpenReaderAction.java index b551202279a87..b827610893617 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportOpenReaderAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportOpenReaderAction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.IndicesOptions; @@ -34,6 +35,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.internal.AliasFilter; @@ -93,42 +95,49 @@ public AbstractSearchAsyncAction asyncSearchAction( } static final class ShardOpenReaderRequest extends TransportRequest implements IndicesRequest { - final SearchShardTarget searchShardTarget; + final ShardId shardId; + final OriginalIndices originalIndices; final TimeValue keepAlive; - ShardOpenReaderRequest(SearchShardTarget searchShardTarget, TimeValue keepAlive) { - this.searchShardTarget = searchShardTarget; + ShardOpenReaderRequest(ShardId shardId, OriginalIndices originalIndices, TimeValue keepAlive) { + this.shardId = shardId; + this.originalIndices = originalIndices; this.keepAlive = keepAlive; } ShardOpenReaderRequest(StreamInput in) throws IOException { super(in); - searchShardTarget = new SearchShardTarget(in); + shardId = new ShardId(in); + originalIndices = OriginalIndices.readOriginalIndices(in); keepAlive = in.readTimeValue(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - searchShardTarget.writeTo(out); + shardId.writeTo(out); + OriginalIndices.writeOriginalIndices(originalIndices, out); out.writeTimeValue(keepAlive); } + public ShardId getShardId() { + return shardId; + } + @Override public String[] indices() { - return searchShardTarget.getOriginalIndices().indices(); + return originalIndices.indices(); } @Override public IndicesOptions indicesOptions() { - return searchShardTarget.getOriginalIndices().indicesOptions(); + return originalIndices.indicesOptions(); } } static final class ShardOpenReaderResponse extends SearchPhaseResult { - ShardOpenReaderResponse(SearchContextId contextId, SearchShardTarget searchShardTarget) { + ShardOpenReaderResponse(SearchContextId contextId) { this.contextId = contextId; - setSearchShardTarget(searchShardTarget); } ShardOpenReaderResponse(StreamInput in) throws IOException { @@ -163,7 +172,8 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting sha SearchActionListener listener) { final Transport.Connection connection = getConnection(shardIt.getClusterAlias(), shard.currentNodeId()); final SearchShardTarget searchShardTarget = shardIt.newSearchShardTarget(shard.currentNodeId()); - final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget, openReaderRequest.keepAlive()); + final ShardOpenReaderRequest shardRequest = new ShardOpenReaderRequest(searchShardTarget.getShardId(), + searchShardTarget.getOriginalIndices(), openReaderRequest.keepAlive()); getSearchTransport().sendShardOpenReader(connection, getTask(), shardRequest, ActionListener.map(listener, r -> r)); } diff --git a/server/src/main/java/org/elasticsearch/index/engine/Engine.java b/server/src/main/java/org/elasticsearch/index/engine/Engine.java index bd90f73054c94..f795a8cf69b17 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -95,6 +95,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.stream.Stream; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; @@ -588,31 +589,17 @@ protected final GetResult getFromSearcher(Get get, BiFunction searcherFactory) throws EngineException; - /** - * Returns a new searcher instance. The consumer of this - * API is responsible for releasing the returned searcher in a - * safe manner, preferably in a try/finally block. - * - * @param source the source API or routing that triggers this searcher acquire - * - * @see Searcher#close() + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public final Searcher acquireSearcher(String source) throws EngineException { - return acquireSearcher(source, SearcherScope.EXTERNAL); + public final SearcherSupplier acquireSearcherSupplier(Function wrapper) throws EngineException { + return acquireSearcherSupplier(wrapper, SearcherScope.EXTERNAL); } /** - * Returns a new searcher instance. The consumer of this - * API is responsible for releasing the returned searcher in a - * safe manner, preferably in a try/finally block. - * - * @param source the source API or routing that triggers this searcher acquire - * @param scope the scope of this searcher ie. if the searcher will be used for get or search purposes - * - * @see Searcher#close() + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { /* Acquire order here is store -> manager since we need * to make sure that the store is not closed before * the searcher is acquired. */ @@ -621,35 +608,64 @@ public Searcher acquireSearcher(String source, SearcherScope scope) throws Engin } Releasable releasable = store::decRef; try { - assert assertSearcherIsWarmedUp(source, scope); ReferenceManager referenceManager = getReferenceManager(scope); - final ElasticsearchDirectoryReader acquire = referenceManager.acquire(); + ElasticsearchDirectoryReader acquire = referenceManager.acquire(); AtomicBoolean released = new AtomicBoolean(false); - Searcher engineSearcher = new Searcher(source, acquire, - engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(), - () -> { - if (released.compareAndSet(false, true)) { - try { - referenceManager.release(acquire); - } finally { - store.decRef(); + SearcherSupplier reader = new SearcherSupplier(wrapper) { + @Override + public Searcher acquireSearcherInternal(String source) { + assert assertSearcherIsWarmedUp(source, scope); + return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(), + engineConfig.getQueryCachingPolicy(), () -> {}); + } + + @Override + public void close() { + if (released.compareAndSet(false, true)) { + try { + referenceManager.release(acquire); + } catch (IOException e) { + throw new UncheckedIOException("failed to close", e); + } catch (AlreadyClosedException e) { + // This means there's a bug somewhere: don't suppress it + throw new AssertionError(e); + } finally { + store.decRef(); + } + } else { + /* In general, readers should never be released twice or this would break reference counting. There is one rare case + * when it might happen though: when the request and the Reaper thread would both try to release it in a very short + * amount of time, this is why we only log a warning instead of throwing an exception. */ + logger.warn("Reader was released twice", new IllegalStateException("Double release")); } - } else { - /* In general, readers should never be released twice or this would break reference counting. There is one rare case - * when it might happen though: when the request and the Reaper thread would both try to release it in a very short - * amount of time, this is why we only log a warning instead of throwing an exception. */ - logger.warn("Searcher was released twice", new IllegalStateException("Double release")); } - }); + }; releasable = null; // success - hand over the reference to the engine reader - return engineSearcher; + return reader; } catch (AlreadyClosedException ex) { throw ex; } catch (Exception ex) { - maybeFailEngine("acquire_searcher", ex); + maybeFailEngine("acquire_reader", ex); ensureOpen(ex); // throw EngineCloseException here if we are already closed - logger.error(() -> new ParameterizedMessage("failed to acquire searcher, source {}", source), ex); - throw new EngineException(shardId, "failed to acquire searcher, source " + source, ex); + logger.error(() -> new ParameterizedMessage("failed to acquire reader"), ex); + throw new EngineException(shardId, "failed to acquire reader", ex); + } finally { + Releasables.close(releasable); + } + } + + public final Searcher acquireSearcher(String source) throws EngineException { + return acquireSearcher(source, SearcherScope.EXTERNAL); + } + + public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + SearcherSupplier releasable = null; + try { + SearcherSupplier reader = releasable = acquireSearcherSupplier(Function.identity(), scope); + Searcher searcher = reader.acquireSearcher(source); + releasable = null; + return new Searcher(source, searcher.getDirectoryReader(), searcher.getSimilarity(), + searcher.getQueryCache(), searcher.getQueryCachingPolicy(), () -> Releasables.close(searcher, reader)); } finally { Releasables.close(releasable); } @@ -1158,6 +1174,21 @@ default void onFailedEngine(String reason, @Nullable Exception e) { } } + public abstract static class SearcherSupplier implements Releasable { + private final Function wrapper; + + public SearcherSupplier(Function wrapper) { + this.wrapper = wrapper; + } + + public final Searcher acquireSearcher(String source) { + final Searcher searcher = acquireSearcherInternal(source); + return "can_match".equals(source) ? searcher : wrapper.apply(searcher); + } + + protected abstract Searcher acquireSearcherInternal(String source); + } + public static final class Searcher extends IndexSearcher implements Releasable { private final String source; private final Closeable onClose; diff --git a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java index 664b631249f68..e72a5182287e1 100644 --- a/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java +++ b/server/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.util.Maps; import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import java.util.HashMap; @@ -158,15 +157,15 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.inc(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(ReaderContext readerContext) { totalStats.scrollCurrent.dec(); assert totalStats.scrollCurrent.count() >= 0; - totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - scrollContext.getStartTimeInNano())); + totalStats.scrollMetric.inc(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - readerContext.getStartTimeInNano())); } static final class StatsHolder { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 984654261db43..b6b9884a15096 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -1197,12 +1197,20 @@ public void failShard(String reason, @Nullable Exception e) { } /** - * Acquire a lightweight searcher which can be used to rewrite shard search requests. + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. */ - public Engine.Searcher acquireCanMatchSearcher() { + public Engine.SearcherSupplier acquireSearcherSupplier() { + return acquireSearcherSupplier(Engine.SearcherScope.EXTERNAL); + } + + /** + * Acquires a point-in-time reader that can be used to create {@link Engine.Searcher}s on demand. + */ + public Engine.SearcherSupplier acquireSearcherSupplier(Engine.SearcherScope scope) { readAllowed(); markSearcherAccessed(); - return getEngine().acquireSearcher("can_match", Engine.SearcherScope.EXTERNAL); + final Engine engine = getEngine(); + return engine.acquireSearcherSupplier(this::wrapSearcher, scope); } public Engine.Searcher acquireSearcher(String source) { @@ -1218,7 +1226,7 @@ private Engine.Searcher acquireSearcher(String source, Engine.SearcherScope scop markSearcherAccessed(); final Engine engine = getEngine(); final Engine.Searcher searcher = engine.acquireSearcher(source, scope); - return wrapSearcher(searcher); + return "can_match".equals(source) ? searcher : wrapSearcher(searcher); } private Engine.Searcher wrapSearcher(Engine.Searcher searcher) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java index 77e83edcdd18f..fff7e88e505cb 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java +++ b/server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java @@ -22,7 +22,6 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.transport.TransportRequest; @@ -93,28 +92,29 @@ default void onNewReaderContext(ReaderContext readerContext) {} default void onFreeReaderContext(ReaderContext readerContext) {} /** - * Executed when a new scroll search {@link SearchContext} was created - * @param scrollContext the created search context + * Executed when a new scroll search {@link ReaderContext} was created + * @param readerContext the created reader context */ - default void onNewScrollContext(ScrollContext scrollContext) {} + default void onNewScrollContext(ReaderContext readerContext) {} /** * Executed when a scroll search {@link SearchContext} is freed. * This happens either when the scroll search execution finishes, if the * execution failed or if the search context as idle for and needs to be * cleaned up. - * @param scrollContext the freed search context + * @param readerContext the freed search context */ - default void onFreeScrollContext(ScrollContext scrollContext) {} + default void onFreeScrollContext(ReaderContext readerContext) {} /** * Executed prior to using a {@link SearchContext} that has been retrieved * from the active contexts. If the context is deemed invalid a runtime * exception can be thrown, which will prevent the context from being used. - * @param context the context retrieved from the active contexts + * @param readerContext The reader context used by this request. + * @param searchContext The newly created {@link SearchContext}. * @param transportRequest the request that is going to use the search context */ - default void validateSearchContext(SearchContext context, TransportRequest transportRequest) {} + default void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest transportRequest) {} /** * Executed when a search context was freed. The implementor can implement @@ -225,10 +225,10 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewScrollContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onNewScrollContext(scrollContext); + listener.onNewScrollContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onNewScrollContext listener [{}] failed", listener), e); } @@ -236,10 +236,10 @@ public void onNewScrollContext(ScrollContext scrollContext) { } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { + public void onFreeScrollContext(ReaderContext readerContext) { for (SearchOperationListener listener : listeners) { try { - listener.onFreeScrollContext(scrollContext); + listener.onFreeScrollContext(readerContext); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("onFreeScrollContext listener [{}] failed", listener), e); } @@ -247,11 +247,11 @@ public void onFreeScrollContext(ScrollContext scrollContext) { } @Override - public void validateSearchContext(SearchContext context, TransportRequest request) { + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { Exception exception = null; for (SearchOperationListener listener : listeners) { try { - listener.validateSearchContext(context, request); + listener.validateSearchContext(readerContext, searchContext, request); } catch (Exception e) { exception = ExceptionsHelper.useOrSuppress(exception, e); } diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 2bdff5cafc965..c368590929a36 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -83,6 +83,7 @@ final class DefaultSearchContext extends SearchContext { private final ReaderContext readerContext; + private final Engine.Searcher engineSearcher; private final ShardSearchRequest request; private final SearchShardTarget shardTarget; private final LongSupplier relativeTimeSupplier; @@ -171,7 +172,7 @@ final class DefaultSearchContext extends SearchContext { this.indexShard = indexShard; this.indexService = indexService; this.clusterService = clusterService; - final Engine.Searcher engineSearcher = readerContext.engineSearcher(); + this.engineSearcher = readerContext.acquireSearcher("search"); this.searcher = new ContextIndexSearcher(engineSearcher.getIndexReader(), engineSearcher.getSimilarity(), engineSearcher.getQueryCache(), engineSearcher.getQueryCachingPolicy(), lowLevelCancellation); this.relativeTimeSupplier = relativeTimeSupplier; @@ -184,6 +185,7 @@ final class DefaultSearchContext extends SearchContext { @Override public void doClose() { + engineSearcher.close(); readerContext.decRef(); } @@ -304,7 +306,7 @@ public SearchContextId id() { @Override public String source() { - return readerContext.source(); + return "search"; } @Override diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 5ef4d2130f98e..f2617f13f9c1f 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -286,7 +286,6 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem if (reason == IndexRemovalReason.DELETED || reason == IndexRemovalReason.CLOSED || reason == IndexRemovalReason.REOPENED) { freeAllContextForIndex(index); } - } protected void putReaderContext(ReaderContext context) { @@ -323,7 +322,11 @@ public void executeDfsPhase(ShardSearchRequest request, boolean keepStatesInCont @Override public void onResponse(ShardSearchRequest rewritten) { // fork the execution in the search thread pool - runAsync(shard, () -> executeDfsPhase(request, task, keepStatesInContext), listener); + try { + runAsync(shard, () -> executeDfsPhase(request, task, keepStatesInContext), listener); + } catch (Exception exc) { + listener.onFailure(exc); + } } @Override @@ -369,40 +372,48 @@ public void executeQueryPhase(ShardSearchRequest request, boolean keepStatesInCo rewriteAndFetchShardRequest(shard, request, new ActionListener() { @Override public void onResponse(ShardSearchRequest orig) { - if (orig.canReturnNullResponseIfMatchNoDocs()) { - assert orig.shouldCreatePersistentReader() == false; - // we clone the shard request and perform a quick rewrite using a lightweight - // searcher since we are outside of the search thread pool. - // If the request rewrites to "match none" we can shortcut the query phase - // entirely. Otherwise we fork the execution in the search thread pool. - ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig); - try (Engine.Searcher searcher = shard.acquireCanMatchSearcher()) { - QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher, - canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias()); - Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true); - } catch (Exception exc) { - listener.onFailure(exc); - return; - } - if (canRewriteToMatchNone(canMatchRequest.source()) - && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { - if (canMatchRequest.readerId() != null) { - ReaderContext readerContext = getReaderContext(canMatchRequest.readerId()); - if (readerContext == null) { - listener.onFailure(new SearchContextMissingException(canMatchRequest.readerId())); - return; - } - try (Releasable r = readerContext.markAsUsed()) { - listener.onResponse(QuerySearchResult.nullInstance()); + final ReaderContext readerContext = createOrGetReaderContext(orig, keepStatesInContext); + try { + if (orig.canReturnNullResponseIfMatchNoDocs()) { + assert orig.shouldCreatePersistentReader() == false; + // we clone the shard request and perform a quick rewrite using a lightweight + // searcher since we are outside of the search thread pool. + // If the request rewrites to "match none" we can shortcut the query phase + // entirely. Otherwise we fork the execution in the search thread pool. + ShardSearchRequest canMatchRequest = new ShardSearchRequest(orig); + try (Engine.Searcher searcher = readerContext.acquireSearcher("can_match")) { + QueryShardContext context = indexService.newQueryShardContext(canMatchRequest.shardId().id(), searcher, + canMatchRequest::nowInMillis, canMatchRequest.getClusterAlias()); + Rewriteable.rewrite(canMatchRequest.getRewriteable(), context, true); + } + if (canRewriteToMatchNone(canMatchRequest.source()) + && canMatchRequest.source().query() instanceof MatchNoneQueryBuilder) { + if (orig.readerId() != null) { + try (Releasable r = readerContext.markAsUsed()) { + listener.onResponse(QuerySearchResult.nullInstance()); + } + } else { + try { + listener.onResponse(QuerySearchResult.nullInstance()); + } finally { + // close and remove the ephemeral reader context + try (readerContext) { + removeReaderContext(readerContext.id().getId()); + } + } } - } else { - listener.onResponse(QuerySearchResult.nullInstance()); + return; } - return; + } + // fork the execution in the search thread pool + runAsync(shard, () -> executeQueryPhase(orig, task, readerContext), listener); + } catch (Exception exc) { + try (readerContext) { + removeReaderContext(readerContext.id().getId()); + } finally { + listener.onFailure(exc); } } - // fork the execution in the search thread pool - runAsync(shard, () -> executeQueryPhase(orig, task, keepStatesInContext), listener); } @Override @@ -414,20 +425,16 @@ public void onFailure(Exception exc) { private void runAsync(IndexShard shard, CheckedSupplier command, ActionListener listener) { Executor executor = getExecutor(shard); - try { - executor.execute(() -> { - T result; - try { - result = command.get(); - } catch (Exception exc) { - listener.onFailure(exc); - return; - } - listener.onResponse(result); - }); - } catch (Exception exc) { - listener.onFailure(exc); - } + executor.execute(() -> { + T result; + try { + result = command.get(); + } catch (Exception exc) { + listener.onFailure(exc); + return; + } + listener.onResponse(result); + }); } private void runAsync(SearchContextId id, CheckedSupplier executable, ActionListener listener) { @@ -436,8 +443,7 @@ private void runAsync(SearchContextId id, CheckedSupplier ex private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchShardTask task, - boolean keepStatesInContext) throws Exception { - ReaderContext reader = createOrGetReaderContext(request, keepStatesInContext); + ReaderContext reader) throws Exception { try (SearchContext context = createContext(reader, request, task, true)) { final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { @@ -479,26 +485,26 @@ public void executeQueryPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.contextId()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + try (SearchContext searchContext = createContext(readerContext, readerContext.getShardSearchRequest(null), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { final long keepAlive = request.scroll().keepAlive().millis(); checkKeepAliveLimit(keepAlive); - reader.keepAlive(keepAlive); + readerContext.keepAlive(keepAlive); } - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); - processScroll(request, reader, context); - queryPhase.execute(context); + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + processScroll(request, readerContext, searchContext); + queryPhase.execute(searchContext); executor.success(); - final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); - context.queryResult().setRescoreDocIds(rescoreDocIds); - reader.setRescoreDocIds(rescoreDocIds); - return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); + final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); + searchContext.queryResult().setRescoreDocIds(rescoreDocIds); + readerContext.setRescoreDocIds(rescoreDocIds); + return new ScrollQuerySearchResult(searchContext.queryResult(), searchContext.shardTarget()); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -506,25 +512,26 @@ public void executeQueryPhase(InternalScrollSearchRequest request, public void executeQueryPhase(QuerySearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final ReaderContext reader = findReaderContext(request.contextId()); - reader.setAggregatedDfs(request.dfs()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(request.shardSearchRequest()), task, true); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - context.searcher().setAggregatedDfs(request.dfs()); - queryPhase.execute(context); - if (context.queryResult().hasSearchContext() == false && reader.singleSession()) { + final ReaderContext readerContext = findReaderContext(request.contextId()); + readerContext.setAggregatedDfs(request.dfs()); + try (SearchContext searchContext = + createContext(readerContext, readerContext.getShardSearchRequest(request.shardSearchRequest()), task, true); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + searchContext.searcher().setAggregatedDfs(request.dfs()); + queryPhase.execute(searchContext); + if (searchContext.queryResult().hasSearchContext() == false && readerContext.singleSession()) { // no hits, we can release the context since there will be no fetch phase - freeReaderContext(reader.id()); + freeReaderContext(readerContext.id()); } executor.success(); - final RescoreDocIds rescoreDocIds = context.rescoreDocIds(); - context.queryResult().setRescoreDocIds(rescoreDocIds); - reader.setRescoreDocIds(rescoreDocIds); - return context.queryResult(); + final RescoreDocIds rescoreDocIds = searchContext.rescoreDocIds(); + searchContext.queryResult().setRescoreDocIds(rescoreDocIds); + readerContext.setRescoreDocIds(rescoreDocIds); + return searchContext.queryResult(); } catch (Exception e) { logger.trace("Query phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -542,24 +549,24 @@ private Executor getExecutor(IndexShard indexShard) { public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final LegacyReaderContext reader = (LegacyReaderContext) findReaderContext(request.contextId()); - try (SearchContext context = createContext(reader, reader.getShardSearchRequest(null), task, false); - SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { + final LegacyReaderContext readerContext = (LegacyReaderContext) findReaderContext(request.contextId()); + try (SearchContext searchContext = createContext(readerContext, readerContext.getShardSearchRequest(null), task, false); + SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(searchContext)) { if (request.scroll() != null && request.scroll().keepAlive() != null) { checkKeepAliveLimit(request.scroll().keepAlive().millis()); - reader.keepAlive(request.scroll().keepAlive().millis()); + readerContext.keepAlive(request.scroll().keepAlive().millis()); } - context.assignRescoreDocIds(reader.getRescoreDocIds(null)); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(null)); - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); - processScroll(request, reader, context); - queryPhase.execute(context); + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(null)); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(null)); + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); + processScroll(request, readerContext, searchContext); + queryPhase.execute(searchContext); final long afterQueryTime = executor.success(); - QueryFetchSearchResult fetchSearchResult = executeFetchPhase(reader, context, afterQueryTime); - return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget()); + QueryFetchSearchResult fetchSearchResult = executeFetchPhase(readerContext, searchContext, afterQueryTime); + return new ScrollQueryFetchSearchResult(fetchSearchResult, searchContext.shardTarget()); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -567,27 +574,28 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchShardTa public void executeFetchPhase(ShardFetchRequest request, SearchShardTask task, ActionListener listener) { runAsync(request.contextId(), () -> { - final ReaderContext reader = findReaderContext(request.contextId()); - final ShardSearchRequest shardSearchRequest = reader.getShardSearchRequest(request.getShardSearchRequest()); - try (SearchContext context = createContext(reader, shardSearchRequest, task, false)) { - reader.indexShard().getSearchOperationListener().validateSearchContext(context, request); + final ReaderContext readerContext = findReaderContext(request.contextId()); + final ShardSearchRequest shardSearchRequest = readerContext.getShardSearchRequest(request.getShardSearchRequest()); + try (SearchContext searchContext = createContext(readerContext, shardSearchRequest, task, false)) { + readerContext.indexShard().getSearchOperationListener().validateSearchContext(readerContext, searchContext, request); if (request.lastEmittedDoc() != null) { - context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); + searchContext.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); } - context.assignRescoreDocIds(reader.getRescoreDocIds(request.getRescoreDocIds())); - context.searcher().setAggregatedDfs(reader.getAggregatedDfs(request.getAggregatedDfs())); - context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); - try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) { - fetchPhase.execute(context); - if (reader.singleSession()) { + searchContext.assignRescoreDocIds(readerContext.getRescoreDocIds(request.getRescoreDocIds())); + searchContext.searcher().setAggregatedDfs(readerContext.getAggregatedDfs(request.getAggregatedDfs())); + searchContext.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + try (SearchOperationListenerExecutor executor = + new SearchOperationListenerExecutor(searchContext, true, System.nanoTime())) { + fetchPhase.execute(searchContext); + if (readerContext.singleSession()) { freeReaderContext(request.contextId()); } executor.success(); } - return context.fetchResult(); + return searchContext.fetchResult(); } catch (Exception e) { logger.trace("Fetch phase failed", e); - processFailure(reader, e); + processFailure(readerContext, e); throw e; } }, listener); @@ -624,12 +632,12 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request, boolean } IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); IndexShard shard = indexService.getShard(request.shardId().id()); - Engine.Searcher searcher = shard.acquireSearcher("search"); - return createAndPutReaderContext(request, shard, searcher, keepStatesInContext); + Engine.SearcherSupplier reader = shard.acquireSearcherSupplier(); + return createAndPutReaderContext(request, shard, reader, keepStatesInContext); } final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexShard shard, - Engine.Searcher engineSearcher, boolean keepStatesInContext) { + Engine.SearcherSupplier reader, boolean keepStatesInContext) { assert request.readerId() == null; assert request.keepAlive() == null; ReaderContext readerContext = null; @@ -644,31 +652,29 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS + MAX_OPEN_SCROLL_CONTEXT.getKey() + "] setting."); } } + final long keepAlive = getKeepAlive(request); + checkKeepAliveLimit(keepAlive); if (keepStatesInContext || request.scroll() != null) { - long keepAlive = defaultKeepAlive; - if (request.scroll() != null && request.scroll().keepAlive() != null) { - keepAlive = request.scroll().keepAlive().millis(); - checkKeepAliveLimit(keepAlive); - } - readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), shard, engineSearcher, request, keepAlive); + readerContext = new LegacyReaderContext(idGenerator.incrementAndGet(), shard, reader, request, keepAlive); if (request.scroll() != null) { readerContext.addOnClose(decreaseScrollContexts); decreaseScrollContexts = null; } } else { - readerContext = new ReaderContext(idGenerator.incrementAndGet(), shard, engineSearcher, defaultKeepAlive, true); + readerContext = new ReaderContext(idGenerator.incrementAndGet(), shard, reader, keepAlive, + request.keepAlive() == null); } - engineSearcher = null; + reader = null; final ReaderContext finalReaderContext = readerContext; final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); searchOperationListener.onNewReaderContext(finalReaderContext); if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onNewScrollContext(finalReaderContext.scrollContext()); + searchOperationListener.onNewScrollContext(finalReaderContext); } readerContext.addOnClose(() -> { try { if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onFreeScrollContext(finalReaderContext.scrollContext()); + searchOperationListener.onFreeScrollContext(finalReaderContext); } } finally { searchOperationListener.onFreeReaderContext(finalReaderContext); @@ -678,7 +684,7 @@ final ReaderContext createAndPutReaderContext(ShardSearchRequest request, IndexS readerContext = null; return finalReaderContext; } finally { - Releasables.close(engineSearcher, readerContext, decreaseScrollContexts); + Releasables.close(reader, readerContext, decreaseScrollContexts); } } @@ -691,25 +697,25 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex()); final IndexShard shard = indexService.getShard(shardId.id()); final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); - shard.awaitShardSearchActive(ignored -> - runAsync(shard, () -> { - Releasable releasable = null; - try { - final Engine.Searcher engineSearcher = shard.acquireSearcher("search"); - releasable = engineSearcher; - final ReaderContext readerContext = new ReaderContext( - idGenerator.incrementAndGet(), shard, engineSearcher, keepAlive.millis(), false); - releasable = readerContext; - searchOperationListener.onNewReaderContext(readerContext); - readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(readerContext)); - putReaderContext(readerContext); - releasable = null; - return readerContext.id(); - } finally { - Releasables.close(releasable); - } - }, listener) - ); + shard.awaitShardSearchActive(ignored -> { + Releasable releasable = null; + try { + final Engine.SearcherSupplier reader = shard.acquireSearcherSupplier(); + releasable = reader; + final ReaderContext readerContext = new ReaderContext( + idGenerator.incrementAndGet(), shard, reader, keepAlive.millis(), false); + releasable = readerContext; + searchOperationListener.onNewReaderContext(readerContext); + readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(readerContext)); + putReaderContext(readerContext); + releasable = null; + listener.onResponse(readerContext.id()); + } catch (Exception exc) { + listener.onFailure(exc); + } finally { + Releasables.close(releasable); + } + }); } final SearchContext createContext(ReaderContext reader, @@ -749,12 +755,14 @@ final SearchContext createContext(ReaderContext reader, public DefaultSearchContext createSearchContext(ShardSearchRequest request, TimeValue timeout) throws IOException { IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().getId()); - Engine.Searcher engineSearcher = indexShard.acquireSearcher("search"); - try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, engineSearcher, -1L, true)) { - engineSearcher = null; // transfer ownership to readerContext - return createSearchContext(readerContext, request, timeout); + Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); + try (ReaderContext readerContext = new ReaderContext(idGenerator.incrementAndGet(), indexShard, reader, -1L, true)) { + reader = null; // transfer ownership to readerContext + DefaultSearchContext searchContext = createSearchContext(readerContext, request, timeout); + searchContext.addReleasable(readerContext); + return searchContext; } finally { - Releasables.close(engineSearcher); + Releasables.close(reader); } } @@ -1127,7 +1135,7 @@ public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException IndexShard indexShard = indexService.getShard(request.shardId().getId()); // we don't want to use the reader wrapper since it could run costly operations // and we can afford false positives. - try (Engine.Searcher searcher = indexShard.acquireCanMatchSearcher()) { + try (Engine.Searcher searcher = indexShard.acquireSearcher("can_match")) { final boolean aliasFilterCanMatch = request.getAliasFilter() .getQueryBuilder() instanceof MatchNoneQueryBuilder == false; QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher, diff --git a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java index d219c2c5621f4..5f719836a4a3c 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/LegacyReaderContext.java @@ -19,6 +19,8 @@ package org.elasticsearch.search.internal; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.RescoreDocIds; @@ -32,9 +34,12 @@ public class LegacyReaderContext extends ReaderContext { private AggregatedDfs aggregatedDfs; private RescoreDocIds rescoreDocIds; - public LegacyReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher, + private Engine.Searcher searcher; + private Releasable onClose; + + public LegacyReaderContext(long id, IndexShard indexShard, Engine.SearcherSupplier reader, ShardSearchRequest shardSearchRequest, long keepAliveInMillis) { - super(id, indexShard, engineSearcher, keepAliveInMillis, false); + super(id, indexShard, reader, keepAliveInMillis, false); assert shardSearchRequest.readerId() == null; assert shardSearchRequest.keepAlive() == null; this.shardSearchRequest = Objects.requireNonNull(shardSearchRequest); @@ -45,6 +50,30 @@ public LegacyReaderContext(long id, IndexShard indexShard, Engine.Searcher engin } } + @Override + public Engine.Searcher acquireSearcher(String source) { + if (scrollContext != null && "search".equals(source)) { + // Search scroll requests are special, they don't hold indices names so we have + // to reuse the searcher created on the request that initialized the scroll. + // This ensures that we wrap the searcher's reader with the user's permissions + // when they are available. + if (searcher == null) { + Engine.Searcher delegate = searcherSupplier.acquireSearcher(source); + onClose = delegate::close; + searcher = new Engine.Searcher(delegate.source(), delegate.getDirectoryReader(), + delegate.getSimilarity(), delegate.getQueryCache(), delegate.getQueryCachingPolicy(), () -> {}); + } + return searcher; + } + return super.acquireSearcher(source); + } + + @Override + protected void closeInternal() { + Releasables.close(onClose); + super.closeInternal(); + } + @Override public ShardSearchRequest getShardSearchRequest(ShardSearchRequest other) { return shardSearchRequest; diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index eb94b8f91e615..37593e7c172f3 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -28,7 +28,9 @@ import org.elasticsearch.search.RescoreDocIds; import org.elasticsearch.search.dfs.AggregatedDfs; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -47,7 +49,7 @@ public class ReaderContext extends AbstractRefCounted implements Releasable { private final SearchContextId id; private final IndexShard indexShard; - private final Engine.Searcher engineSearcher; + protected final Engine.SearcherSupplier searcherSupplier; private final AtomicBoolean closed = new AtomicBoolean(false); private final boolean singleSession; @@ -57,11 +59,19 @@ public class ReaderContext extends AbstractRefCounted implements Releasable { private final List onCloses = new CopyOnWriteArrayList<>(); - public ReaderContext(long id, IndexShard indexShard, Engine.Searcher engineSearcher, long keepAliveInMillis, boolean singleSession) { + private final long startTimeInNano = System.nanoTime(); + + private Map context; + + public ReaderContext(long id, + IndexShard indexShard, + Engine.SearcherSupplier searcherSupplier, + long keepAliveInMillis, + boolean singleSession) { super("reader_context"); this.id = new SearchContextId(UUIDs.base64UUID(), id); this.indexShard = indexShard; - this.engineSearcher = engineSearcher; + this.searcherSupplier = searcherSupplier; this.singleSession = singleSession; this.keepAlive = new AtomicLong(keepAliveInMillis); this.lastAccessTime = new AtomicLong(nowInMillis()); @@ -75,14 +85,12 @@ private long nowInMillis() { public final void close() { if (closed.compareAndSet(false, true)) { decRef(); - } else { - assert false : "ReaderContext was closed already"; } } @Override protected void closeInternal() { - Releasables.close(Releasables.wrap(onCloses), engineSearcher); + Releasables.close(Releasables.wrap(onCloses), searcherSupplier); } public void addOnClose(Releasable releasable) { @@ -93,16 +101,13 @@ public SearchContextId id() { return id; } + public IndexShard indexShard() { return indexShard; } - public Engine.Searcher engineSearcher() { - return engineSearcher; - } - - public String source() { - return engineSearcher.source(); + public Engine.Searcher acquireSearcher(String source) { + return searcherSupplier.acquireSearcher(source); } public void keepAlive(long keepAlive) { @@ -163,4 +168,27 @@ public void setRescoreDocIds(RescoreDocIds rescoreDocIds) { public boolean singleSession() { return singleSession; } + + /** + * Returns the object or null if the given key does not have a + * value in the context + */ + @SuppressWarnings("unchecked") // (T)object + public T getFromContext(String key) { + return context != null ? (T) context.get(key) : null; + } + + /** + * Puts the object into the context + */ + public void putInContext(String key, Object value) { + if (context == null) { + context = new HashMap<>(); + } + context.put(key, value); + } + + public long getStartTimeInNano() { + return startTimeInNano; + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java index 440408646d127..5b9c632d4e522 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ScrollContext.java @@ -23,42 +23,10 @@ import org.apache.lucene.search.TotalHits; import org.elasticsearch.search.Scroll; -import java.util.HashMap; -import java.util.Map; - /** Wrapper around information that needs to stay around when scrolling. */ public final class ScrollContext { - - private Map context = null; - public TotalHits totalHits = null; public float maxScore = Float.NaN; public ScoreDoc lastEmittedDoc; public Scroll scroll; - - private final long startTimeInNano = System.nanoTime(); - - /** - * Returns the object or null if the given key does not have a - * value in the context - */ - @SuppressWarnings("unchecked") // (T)object - public T getFromContext(String key) { - return context != null ? (T) context.get(key) : null; - } - - /** - * Puts the object into the context - */ - public void putInContext(String key, Object value) { - if (context == null) { - context = new HashMap<>(); - } - context.put(key, value); - } - - - public long getStartTimeInNano() { - return startTimeInNano; - } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java index 47827350d2713..b6733c325fa13 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/SearchContext.java @@ -82,9 +82,7 @@ public abstract class SearchContext implements Releasable { private final AtomicBoolean closed = new AtomicBoolean(false); private InnerHitsContext innerHitsContext; - protected SearchContext() { - - } + protected SearchContext() {} public abstract void setTask(SearchShardTask task); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index 254c0ab300e6a..5b16937749516 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -276,7 +276,7 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeBoolean(canReturnNullResponseIfMatchNoDocs); out.writeOptionalWriteable(bottomSortValues); } - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(Version.V_8_0_0) && asKey == false) { out.writeOptionalWriteable(readerId); out.writeOptionalTimeValue(keepAlive); } diff --git a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java index 34c528dfeda6a..10db9a7f80421 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.index.shard; import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.TestSearchContext; @@ -105,20 +104,21 @@ public void onFreeReaderContext(ReaderContext readerContext) { } @Override - public void onNewScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onNewScrollContext(ReaderContext readerContext) { + assertNotNull(readerContext); newScrollContext.incrementAndGet(); } @Override - public void onFreeScrollContext(ScrollContext scrollContext) { - assertNotNull(scrollContext); + public void onFreeScrollContext(ReaderContext readerContext) { + assertNotNull(readerContext); freeScrollContext.incrementAndGet(); } @Override - public void validateSearchContext(SearchContext context, TransportRequest request) { - assertNotNull(context); + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { + assertNotNull(readerContext); + assertNotNull(searchContext); validateSearchContext.incrementAndGet(); } }; @@ -232,7 +232,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onNewScrollContext(new ScrollContext()); + compositeListener.onNewScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -258,7 +258,7 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, freeScrollContext.get()); assertEquals(0, validateSearchContext.get()); - compositeListener.onFreeScrollContext(new ScrollContext()); + compositeListener.onFreeScrollContext(mock(ReaderContext.class)); assertEquals(2, preFetch.get()); assertEquals(2, preQuery.get()); assertEquals(2, failedFetch.get()); @@ -272,10 +272,10 @@ public void validateSearchContext(SearchContext context, TransportRequest reques assertEquals(0, validateSearchContext.get()); if (throwingListeners == 0) { - compositeListener.validateSearchContext(ctx, Empty.INSTANCE); + compositeListener.validateSearchContext(mock(ReaderContext.class), ctx, Empty.INSTANCE); } else { - RuntimeException expected = - expectThrows(RuntimeException.class, () -> compositeListener.validateSearchContext(ctx, Empty.INSTANCE)); + RuntimeException expected = expectThrows(RuntimeException.class, + () -> compositeListener.validateSearchContext(mock(ReaderContext.class), ctx, Empty.INSTANCE)); assertNull(expected.getMessage()); assertEquals(throwingListeners - 1, expected.getSuppressed().length); if (throwingListeners > 1) { diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 38f790cb50c7a..de96b084f3e6c 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -60,7 +60,9 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import java.io.IOException; import java.util.UUID; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Matchers.anyObject; @@ -115,17 +117,28 @@ public void testPreProcess() throws Exception { BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); try (Directory dir = newDirectory(); - RandomIndexWriter w = new RandomIndexWriter(random(), dir); - IndexReader reader = w.getReader()) { - - final Engine.Searcher engineSearcher = new Engine.Searcher("test", w.getReader(), - IndexSearcher.getDefaultSimilarity(), IndexSearcher.getDefaultQueryCache(), - IndexSearcher.getDefaultQueryCachingPolicy(), reader); + RandomIndexWriter w = new RandomIndexWriter(random(), dir)) { + + final Engine.SearcherSupplier engineReader = new Engine.SearcherSupplier(Function.identity()) { + @Override + public void close() {} + + @Override + protected Engine.Searcher acquireSearcherInternal(String source) { + try { + IndexReader reader = w.getReader(); + return new Engine.Searcher("test", reader, IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), reader); + } catch (IOException exc) { + throw new AssertionError(exc); + } + } + }; SearchShardTarget target = new SearchShardTarget("node", shardId, null, OriginalIndices.NONE); ReaderContext readerWithoutScroll = new ReaderContext( - randomNonNegativeLong(), indexShard, engineSearcher, randomNonNegativeLong(), false); + randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false); DefaultSearchContext contextWithoutScroll = new DefaultSearchContext(readerWithoutScroll, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null, false); contextWithoutScroll.from(300); @@ -141,7 +154,7 @@ public void testPreProcess() throws Exception { // resultWindow greater than maxResultWindow and scrollContext isn't null when(shardSearchRequest.scroll()).thenReturn(new Scroll(TimeValue.timeValueMillis(randomInt(1000)))); ReaderContext readerContext = new LegacyReaderContext( - randomNonNegativeLong(), indexShard, engineSearcher, shardSearchRequest, randomNonNegativeLong()); + randomNonNegativeLong(), indexShard, engineReader, shardSearchRequest, randomNonNegativeLong()); DefaultSearchContext context1 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null, false); context1.from(300); @@ -173,7 +186,7 @@ public void testPreProcess() throws Exception { + "] index level setting.")); readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, randomNonNegativeLong(), false); + readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false); // rescore is null but sliceBuilder is not null DefaultSearchContext context2 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null, false); @@ -203,7 +216,7 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]); readerContext.close(); - readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineSearcher, randomNonNegativeLong(), false); + readerContext = new ReaderContext(randomNonNegativeLong(), indexShard, engineReader, randomNonNegativeLong(), false); DefaultSearchContext context4 = new DefaultSearchContext(readerContext, shardSearchRequest, target, null, indexService, indexShard, bigArrays, null, timeout, null, false); context4.sliceBuilder(new SliceBuilder(1,2)).parsedQuery(parsedQuery).preProcess(false); diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 11804fdca1ba1..0f9a7f7b55b4b 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -175,15 +175,6 @@ protected Map, Object>> pluginScripts() { @Override public void onIndexModule(IndexModule indexModule) { indexModule.addSearchOperationListener(new SearchOperationListener() { - @Override - public void onNewReaderContext(ReaderContext readerContext) { - if ("throttled_threadpool_index".equals(readerContext.indexShard().shardId().getIndex().getName())) { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search_throttled]")); - } else { - assertThat(Thread.currentThread().getName(), startsWith("elasticsearch[node_s_0][search]")); - } - } - @Override public void onFetchPhase(SearchContext context, long tookInNanos) { if ("throttled_threadpool_index".equals(context.indexShard().shardId().getIndex().getName())) { @@ -522,7 +513,7 @@ public void testMaxOpenScrollContexts() throws Exception { final ShardScrollRequestTest request = new ShardScrollRequestTest(indexShard.shardId()); ElasticsearchException ex = expectThrows(ElasticsearchException.class, - () -> service.createAndPutReaderContext(request, indexShard, indexShard.acquireSearcher("test"), randomBoolean())); + () -> service.createAndPutReaderContext(request, indexShard, indexShard.acquireSearcherSupplier(), randomBoolean())); assertEquals( "Trying to create too many scroll contexts. Must be less than or equal to: [" + SearchService.MAX_OPEN_SCROLL_CONTEXT.get(Settings.EMPTY) + "]. " + @@ -545,11 +536,11 @@ public void testOpenScrollContextsConcurrently() throws Exception { try { latch.await(); for (; ; ) { - Engine.Searcher searcher = indexShard.acquireSearcher("test"); + Engine.SearcherSupplier reader = indexShard.acquireSearcherSupplier(); try { searchService.createAndPutReaderContext( - new ShardScrollRequestTest(indexShard.shardId()), indexShard, searcher, true); - searcher = null; + new ShardScrollRequestTest(indexShard.shardId()), indexShard, reader, true); + reader = null; } catch (ElasticsearchException e) { assertThat(e.getMessage(), equalTo( "Trying to create too many scroll contexts. Must be less than or equal to: " + @@ -557,7 +548,7 @@ public void testOpenScrollContextsConcurrently() throws Exception { "This limit can be set by changing the [search.max_open_scroll_context] setting.")); return; } finally { - IOUtils.close(searcher); + IOUtils.close(reader); } } } catch (Exception e) { @@ -992,10 +983,10 @@ public void testLookUpSearchContext() throws Exception { ShardSearchRequest request = new ShardSearchRequest( OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); - final Engine.Searcher searcher = indexShard.acquireSearcher("test"); - final ReaderContext reader = searchService.createAndPutReaderContext(request, indexShard, searcher, randomBoolean()); - assertThat(reader.id().getId(), equalTo((long) (i + 1))); - contextIds.add(reader.id()); + final ReaderContext context = searchService.createAndPutReaderContext(request, indexShard, + indexShard.acquireSearcherSupplier(), randomBoolean()); + assertThat(context.id().getId(), equalTo((long) (i + 1))); + contextIds.add(context.id()); } assertThat(searchService.getActiveContexts(), equalTo(contextIds.size())); while (contextIds.isEmpty() == false) { @@ -1031,7 +1022,6 @@ public void testOpenReaderContext() { } private ReaderContext createReaderContext(IndexShard shard) { - Engine.Searcher searcher = shard.acquireSearcher("test"); - return new ReaderContext(randomNonNegativeLong(), shard, searcher, randomNonNegativeLong(), false); + return new ReaderContext(randomNonNegativeLong(), shard, shard.acquireSearcherSupplier(), randomNonNegativeLong(), false); } } diff --git a/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java b/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java deleted file mode 100644 index de4863dd92a08..0000000000000 --- a/server/src/test/java/org/elasticsearch/search/internal/ScrollContextTests.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.search.internal; - -import org.elasticsearch.test.ESTestCase; - -public class ScrollContextTests extends ESTestCase { - - public void testStoringObjectsInScrollContext() { - final ScrollContext scrollContext = new ScrollContext(); - final String key = randomAlphaOfLengthBetween(1, 16); - assertNull(scrollContext.getFromContext(key)); - - final String value = randomAlphaOfLength(6); - scrollContext.putInContext(key, value); - - assertEquals(value, scrollContext.getFromContext(key)); - } -} diff --git a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java index 42b7a4d8b102d..435feda33280a 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java +++ b/test/framework/src/main/java/org/elasticsearch/test/engine/MockInternalEngine.java @@ -25,6 +25,7 @@ import org.elasticsearch.index.engine.InternalEngine; import java.io.IOException; +import java.util.function.Function; final class MockInternalEngine extends InternalEngine { private MockEngineSupport support; @@ -81,4 +82,9 @@ public Engine.Searcher acquireSearcher(String source, SearcherScope scope) { final Engine.Searcher engineSearcher = super.acquireSearcher(source, scope); return support().wrapSearcher(engineSearcher); } + + @Override + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + return super.acquireSearcherSupplier(wrapper.andThen(s -> support().wrapSearcher(s)), scope); + } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java index 4cdf8c9c3af67..e99767b896db5 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/index/engine/FrozenEngine.java @@ -5,66 +5,37 @@ */ package org.elasticsearch.index.engine; -import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.FieldInfos; -import org.apache.lucene.index.Fields; -import org.apache.lucene.index.FilterDirectoryReader; -import org.apache.lucene.index.FilterLeafReader; import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; -import org.apache.lucene.index.LeafMetaData; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; -import org.apache.lucene.index.NumericDocValues; -import org.apache.lucene.index.PointValues; -import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentReader; import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper; -import org.apache.lucene.index.SortedDocValues; -import org.apache.lucene.index.SortedNumericDocValues; -import org.apache.lucene.index.SortedSetDocValues; -import org.apache.lucene.index.StoredFieldVisitor; -import org.apache.lucene.index.Terms; import org.apache.lucene.search.ReferenceManager; -import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.Directory; -import org.apache.lucene.util.Bits; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.shard.DocsStats; -import org.elasticsearch.index.shard.SearchOperationListener; -import org.elasticsearch.search.internal.ReaderContext; -import org.elasticsearch.search.internal.SearchContext; -import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.index.store.Store; import java.io.IOException; import java.io.UncheckedIOException; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.Function; /** - * This is a stand-alone read-only engine that maintains a lazy loaded index reader that is opened on calls to - * {@link Engine#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore and then - * releases itself from the engine. The readers returned from this engine are lazy which allows release after and reset before a search - * phase starts. This allows releasing references as soon as possible on the search layer. - * - * Internally this class uses a set of wrapper abstractions to allow a reader that is used inside the {@link Engine.Searcher} returned from - * {@link #acquireSearcher(String, SearcherScope)} to release and reset it's internal resources. This is necessary to for instance release - * all SegmentReaders after a search phase finishes and reopen them before the next search phase starts. This together with a throttled - * threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader open at the same time. - * - * In particular we have LazyDirectoryReader that wraps its LeafReaders (the actual segment readers) inside LazyLeafReaders. Each of the - * LazyLeafReader delegates to segment LeafReader that can be reset (it's reference decremented and nulled out) on a search phase is - * finished. Before the next search phase starts we can reopen the corresponding reader and reset the reference to execute the search phase. - * This allows the SearchContext to hold on to the same LazyDirectoryReader across its lifecycle but under the hood resources (memory) is - * released while the SearchContext phases are not executing. - * + * This is a stand-alone read-only engine that maintains an index reader that is opened lazily on calls to + * {@link SearcherSupplier#acquireSearcher(String)}. The index reader opened is maintained until there are no reference to it anymore + * and then releases itself from the engine. + * This is necessary to for instance release all SegmentReaders after a search phase finishes and reopen them before the next search + * phase starts. + * This together with a throttled threadpool (search_throttled) guarantees that at most N frozen shards have a low level index reader + * open at the same time. * The internal reopen of readers is treated like a refresh and refresh listeners are called up-on reopen. This allows to consume refresh * stats in order to obtain the number of reopens. */ @@ -164,6 +135,11 @@ private synchronized void onReaderClosed(IndexReader.CacheKey key) { } } + @SuppressForbidden(reason = "we manage references explicitly here") + private synchronized void closeReader(IndexReader reader) throws IOException { + reader.decRef(); + } + private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOException { ElasticsearchDirectoryReader reader = null; boolean success = false; @@ -177,7 +153,7 @@ private synchronized ElasticsearchDirectoryReader getOrOpenReader() throws IOExc reader = lastOpenedReader = wrapReader(dirReader, Function.identity()); processReader(reader); reader.getReaderCacheHelper().addClosedListener(this::onReaderClosed); - for (ReferenceManager.RefreshListener listeners : config ().getInternalRefreshListener()) { + for (ReferenceManager.RefreshListener listeners : config().getInternalRefreshListener()) { listeners.afterRefresh(true); } } @@ -199,404 +175,63 @@ private synchronized ElasticsearchDirectoryReader getReader() { } @Override - @SuppressWarnings("fallthrough") - @SuppressForbidden( reason = "we manage references explicitly here") - public Searcher acquireSearcher(String source, SearcherScope scope) throws EngineException { + public SearcherSupplier acquireSearcherSupplier(Function wrapper, SearcherScope scope) throws EngineException { + final Store store = this.store; store.incRef(); - boolean releaseRefeference = true; - try { - final boolean maybeOpenReader; - switch (source) { - case "load_seq_no": - case "load_version": - assert false : "this is a read-only engine"; - case "doc_stats": - assert false : "doc_stats are overwritten"; - case "refresh_needed": - assert false : "refresh_needed is always false"; - case "segments": - case "segments_stats": - case "completion_stats": - case "can_match": // special case for can_match phase - we use the cached point values reader - maybeOpenReader = false; - break; - default: - maybeOpenReader = true; - } - // special case we only want to report segment stats if we have a reader open. in that case we only get a reader if we still - // have one open at the time and can inc it's reference. - ElasticsearchDirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); - if (reader == null) { - // we just hand out a searcher on top of an empty reader that we opened for the ReadOnlyEngine in the #open(IndexCommit) - // method. this is the case when we don't have a reader open right now and we get a stats call any other that falls in - // the category that doesn't trigger a reopen - if ("can_match".equals(source)) { - canMatchReader.incRef(); - return new Searcher(source, canMatchReader, - engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(), - canMatchReader::decRef); - } - return super.acquireSearcher(source, scope); - } else { - try { - LazyDirectoryReader lazyDirectoryReader = new LazyDirectoryReader(reader, this); - Searcher newSearcher = new Searcher(source, lazyDirectoryReader, - engineConfig.getSimilarity(), engineConfig.getQueryCache(), engineConfig.getQueryCachingPolicy(), - () -> IOUtils.close(lazyDirectoryReader, store::decRef)); - releaseRefeference = false; - return newSearcher; - } finally { - if (releaseRefeference) { - reader.decRef(); // don't call close here we manage reference ourselves - } - } - } - } catch (IOException e) { - throw new UncheckedIOException(e); - } finally { - if (releaseRefeference) { - store.decRef(); - } - } - } - - static LazyDirectoryReader unwrapLazyReader(DirectoryReader reader) { - while (reader instanceof FilterDirectoryReader) { - if (reader instanceof LazyDirectoryReader) { - return (LazyDirectoryReader) reader; - } - reader = ((FilterDirectoryReader) reader).getDelegate(); - } - return null; - } - - /* - * We register this listener for a frozen index that will - * 1. reset the reader every time the search context is validated which happens when the context is looked up ie. on a fetch phase - * etc. - * 2. register a releasable resource that is cleaned after each phase that releases the reader for this searcher - */ - public static class ReacquireEngineSearcherListener implements SearchOperationListener { - - @Override - public void validateSearchContext(SearchContext context, TransportRequest transportRequest) { - DirectoryReader dirReader = context.searcher().getDirectoryReader(); - LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); - if (lazyDirectoryReader != null) { - try { - lazyDirectoryReader.reset(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } - - @Override - public void onFreeSearchContext(SearchContext context) { - DirectoryReader dirReader = context.searcher().getDirectoryReader(); - LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); - if (lazyDirectoryReader != null) { + return new SearcherSupplier(wrapper) { + @Override + @SuppressForbidden(reason = "we manage references explicitly here") + public Searcher acquireSearcherInternal(String source) { try { - lazyDirectoryReader.release(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } - - @Override - public void onNewReaderContext(ReaderContext readerContext) { - DirectoryReader dirReader = readerContext.engineSearcher().getDirectoryReader(); - LazyDirectoryReader lazyDirectoryReader = unwrapLazyReader(dirReader); - if (lazyDirectoryReader != null) { - readerContext.addOnClose(() -> { - try { - lazyDirectoryReader.release(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - } - } - } - - /** - * This class allows us to use the same high level reader across multiple search phases but replace the underpinnings - * on/after each search phase. This is really important otherwise we would hold on to multiple readers across phases. - * - * This reader and its leaf reader counterpart overrides FilterDirectory/LeafReader for convenience to be unwrapped but still - * overrides all it's delegate methods. We have tests to ensure we never miss an override but we need to in order to make sure - * the wrapper leaf readers don't register themself as close listeners on the wrapped ones otherwise we fail plugging in new readers - * on the next search phase. - */ - static final class LazyDirectoryReader extends FilterDirectoryReader { - - private final FrozenEngine engine; - private volatile DirectoryReader delegate; // volatile since it might be closed concurrently - - private LazyDirectoryReader(DirectoryReader reader, FrozenEngine engine) throws IOException { - super(reader, new SubReaderWrapper() { - @Override - public LeafReader wrap(LeafReader reader) { - return new LazyLeafReader(reader); - } - }); - this.delegate = reader; - this.engine = engine; - } - - @SuppressForbidden(reason = "we manage references explicitly here") - synchronized void release() throws IOException { - if (delegate != null) { // we are lenient here it's ok to double close - delegate.decRef(); - delegate = null; - if (tryIncRef()) { // only do this if we are not closed already - // we end up in this case when we are not closed but in an intermediate - // state were we want to release all or the real leaf readers ie. in between search phases - // but still want to keep this Lazy reference open. In oder to let the heavy real leaf - // readers to be GCed we need to null our the references. - try { - for (LeafReaderContext leaf : leaves()) { - LazyLeafReader reader = (LazyLeafReader) leaf.reader(); - reader.in = null; - } - } finally { - decRef(); - } - } - } - } - - void reset() throws IOException { - boolean success = false; - DirectoryReader reader = engine.getOrOpenReader(); - try { - reset(reader); - success = true; - } finally { - if (success == false) { - IOUtils.close(reader); + return openSearcher(source, scope); + } catch (IOException exc) { + throw new UncheckedIOException(exc); } } - } - - private synchronized void reset(DirectoryReader delegate) { - if (this.delegate != null) { - throw new AssertionError("lazy reader is not released"); - } - assert (delegate instanceof LazyDirectoryReader) == false : "must not be a LazyDirectoryReader"; - List leaves = delegate.leaves(); - int ord = 0; - for (LeafReaderContext leaf : leaves()) { - LazyLeafReader reader = (LazyLeafReader) leaf.reader(); - LeafReader newReader = leaves.get(ord++).reader(); - assert reader.in == null; - reader.in = newReader; - assert reader.info.info.equals(Lucene.segmentReader(newReader).getSegmentInfo().info); - } - this.delegate = delegate; - } - @Override - protected DirectoryReader doWrapDirectoryReader(DirectoryReader in) { - throw new UnsupportedOperationException(); - } - - void ensureOpenOrReset() { - // ensure we fail early and with good exceptions - ensureOpen(); - if (delegate == null) { - throw new AlreadyClosedException("delegate is released"); + @Override + public void close() { + store.decRef(); } - } - - @Override - public long getVersion() { - ensureOpenOrReset(); - return delegate.getVersion(); - } - - @Override - public boolean isCurrent() throws IOException { - ensureOpenOrReset(); - return delegate.isCurrent(); - } - - @Override - public IndexCommit getIndexCommit() throws IOException { - ensureOpenOrReset(); - return delegate.getIndexCommit(); - } - - @Override - protected void doClose() throws IOException { - release(); - } - - @Override - public CacheHelper getReaderCacheHelper() { - ensureOpenOrReset(); - return delegate.getReaderCacheHelper(); - } - - @Override - public DirectoryReader getDelegate() { - ensureOpenOrReset(); - return delegate; - } + }; } - /** - * We basically duplicate a FilterLeafReader here since we don't want the - * incoming reader to register with this reader as a parent reader. This would mean we barf if the incoming - * reader is closed and that is what we actually doing on purpose. - */ - static final class LazyLeafReader extends FilterLeafReader { - - private volatile LeafReader in; - private final SegmentCommitInfo info; - private final int numDocs; - private final int maxDocs; - - private LazyLeafReader(LeafReader in) { - super(Lucene.emptyReader(in.maxDoc())); // empty reader here to make FilterLeafReader happy - this.info = Lucene.segmentReader(in).getSegmentInfo(); - this.in = in; - numDocs = in.numDocs(); - maxDocs = in.maxDoc(); - // don't register in reader as a subreader here. - } - - private void ensureOpenOrReleased() { - ensureOpen(); - if (in == null) { - throw new AlreadyClosedException("leaf is already released"); + @SuppressWarnings("fallthrough") + @SuppressForbidden(reason = "we manage references explicitly here") + private Engine.Searcher openSearcher(String source, SearcherScope scope) throws IOException { + boolean maybeOpenReader; + switch (source) { + case "load_seq_no": + case "load_version": + assert false : "this is a read-only engine"; + case "doc_stats": + assert false : "doc_stats are overwritten"; + case "refresh_needed": + assert false : "refresh_needed is always false"; + case "segments": + case "segments_stats": + case "completion_stats": + case "can_match": // special case for can_match phase - we use the cached point values reader + maybeOpenReader = false; + break; + default: + maybeOpenReader = true; + } + ElasticsearchDirectoryReader reader = maybeOpenReader ? getOrOpenReader() : getReader(); + if (reader == null) { + if ("can_match".equals(source)) { + canMatchReader.incRef(); + return new Searcher(source, canMatchReader, engineConfig.getSimilarity(), engineConfig.getQueryCache(), + engineConfig.getQueryCachingPolicy(), canMatchReader::decRef); + } else { + ReferenceManager manager = getReferenceManager(scope); + ElasticsearchDirectoryReader acquire = manager.acquire(); + return new Searcher(source, acquire, engineConfig.getSimilarity(), engineConfig.getQueryCache(), + engineConfig.getQueryCachingPolicy(), () -> manager.release(acquire)); } - } - - @Override - public Bits getLiveDocs() { - ensureOpenOrReleased(); - return in.getLiveDocs(); - } - - @Override - public FieldInfos getFieldInfos() { - ensureOpenOrReleased(); - return in.getFieldInfos(); - } - - @Override - public PointValues getPointValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getPointValues(field); - } - - @Override - public Fields getTermVectors(int docID) - throws IOException { - ensureOpenOrReleased(); - return in.getTermVectors(docID); - } - - @Override - public int numDocs() { - return numDocs; - } - - @Override - public int maxDoc() { - return maxDocs; - } - - @Override - public void document(int docID, StoredFieldVisitor visitor) throws IOException { - ensureOpenOrReleased(); - in.document(docID, visitor); - } - - @Override - protected void doClose() throws IOException { - in.close(); - } - - @Override - public CacheHelper getReaderCacheHelper() { - ensureOpenOrReleased(); - return in.getReaderCacheHelper(); - } - - @Override - public CacheHelper getCoreCacheHelper() { - ensureOpenOrReleased(); - return in.getCoreCacheHelper(); - } - - @Override - public Terms terms(String field) throws IOException { - ensureOpenOrReleased(); - return in.terms(field); - } - - @Override - public String toString() { - final StringBuilder buffer = new StringBuilder("LazyLeafReader("); - buffer.append(in); - buffer.append(')'); - return buffer.toString(); - } - - @Override - public NumericDocValues getNumericDocValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getNumericDocValues(field); - } - - @Override - public BinaryDocValues getBinaryDocValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getBinaryDocValues(field); - } - - @Override - public SortedDocValues getSortedDocValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getSortedDocValues(field); - } - - @Override - public SortedNumericDocValues getSortedNumericDocValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getSortedNumericDocValues(field); - } - - @Override - public SortedSetDocValues getSortedSetDocValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getSortedSetDocValues(field); - } - - @Override - public NumericDocValues getNormValues(String field) throws IOException { - ensureOpenOrReleased(); - return in.getNormValues(field); - } - - @Override - public LeafMetaData getMetaData() { - ensureOpenOrReleased(); - return in.getMetaData(); - } - - @Override - public void checkIntegrity() throws IOException { - ensureOpenOrReleased(); - in.checkIntegrity(); - } - - @Override - public LeafReader getDelegate() { - return in; + } else { + return new Searcher(source, reader, engineConfig.getSimilarity(), engineConfig.getQueryCache(), + engineConfig.getQueryCachingPolicy(), () -> closeReader(reader)); } } diff --git a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java index c2fb11d294dec..7c5e29b03b6ed 100644 --- a/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java +++ b/x-pack/plugin/frozen-indices/src/main/java/org/elasticsearch/xpack/frozen/FrozenIndices.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; -import org.elasticsearch.index.IndexModule; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.FrozenEngine; @@ -55,14 +54,6 @@ public List> getSettings() { return Arrays.asList(FrozenEngine.INDEX_FROZEN); } - @Override - public void onIndexModule(IndexModule indexModule) { - if (FrozenEngine.INDEX_FROZEN.get(indexModule.getSettings())) { - indexModule.addSearchOperationListener(new FrozenEngine.ReacquireEngineSearcherListener()); - } - super.onIndexModule(indexModule); - } - @Override public List> getActions() { List> actions = new ArrayList<>(); diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java index 9048983447781..0d505880575a9 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenEngineTests.java @@ -11,7 +11,6 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; @@ -32,6 +31,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; @@ -50,22 +50,25 @@ public void testAcquireReleaseReset() throws IOException { listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); - Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); - assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher - .getDirectoryReader()).shardId()); - assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.search(new MatchAllDocsQuery(), numDocs); - assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, listener.afterRefresh.get()); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertFalse(frozenEngine.isReaderOpen()); - assertEquals(1, listener.afterRefresh.get()); - expectThrows(AlreadyClosedException.class, () -> searcher.search(new MatchAllDocsQuery(), numDocs)); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(2, listener.afterRefresh.get()); - search = searcher.search(new MatchAllDocsQuery(), numDocs); - assertEquals(search.scoreDocs.length, numDocs); - searcher.close(); + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + assertFalse(frozenEngine.isReaderOpen()); + try (Engine.Searcher searcher = reader.acquireSearcher("frozen")) { + assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher + .getDirectoryReader()).shardId()); + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, listener.afterRefresh.get()); + } + assertFalse(frozenEngine.isReaderOpen()); + assertEquals(1, listener.afterRefresh.get()); + + try (Engine.Searcher searcher = reader.acquireSearcher("frozen")) { + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + } + } } } } @@ -84,24 +87,31 @@ public void testAcquireReleaseResetTwoSearchers() throws IOException { listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { assertFalse(frozenEngine.isReaderOpen()); - Engine.Searcher searcher1 = frozenEngine.acquireSearcher("test"); - assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher1.search(new MatchAllDocsQuery(), numDocs); - assertEquals(search.scoreDocs.length, numDocs); - assertEquals(1, listener.afterRefresh.get()); - FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).release(); - Engine.Searcher searcher2 = frozenEngine.acquireSearcher("test"); - search = searcher2.search(new MatchAllDocsQuery(), numDocs); - assertEquals(search.scoreDocs.length, numDocs); - assertTrue(frozenEngine.isReaderOpen()); - assertEquals(2, listener.afterRefresh.get()); - expectThrows(AlreadyClosedException.class, () -> searcher1.search(new MatchAllDocsQuery(), numDocs)); - FrozenEngine.unwrapLazyReader(searcher1.getDirectoryReader()).reset(); + Engine.SearcherSupplier reader1 = frozenEngine.acquireSearcherSupplier(Function.identity()); + try (Engine.Searcher searcher1 = reader1.acquireSearcher("test")) { + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher1.search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertEquals(1, listener.afterRefresh.get()); + } + assertFalse(frozenEngine.isReaderOpen()); + Engine.SearcherSupplier reader2 = frozenEngine.acquireSearcherSupplier(Function.identity()); + try (Engine.Searcher searcher2 = reader2.acquireSearcher("test")) { + TopDocs search = searcher2.search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertTrue(frozenEngine.isReaderOpen()); + assertEquals(2, listener.afterRefresh.get()); + } + assertFalse(frozenEngine.isReaderOpen()); assertEquals(2, listener.afterRefresh.get()); - search = searcher1.search(new MatchAllDocsQuery(), numDocs); - assertEquals(search.scoreDocs.length, numDocs); - searcher1.close(); - searcher2.close(); + reader2.close(); + try (Engine.Searcher searcher1 = reader1.acquireSearcher("test")) { + TopDocs search = searcher1.search(new MatchAllDocsQuery(), numDocs); + assertEquals(search.scoreDocs.length, numDocs); + assertTrue(frozenEngine.isReaderOpen()); + } + reader1.close(); + assertFalse(frozenEngine.isReaderOpen()); } } } @@ -119,21 +129,24 @@ public void testSegmentStats() throws IOException { engine.flushAndClose(); listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { - Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); - SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); - assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, listener.afterRefresh.get()); - segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); - assertEquals(0, segmentsStats.getCount()); - segmentsStats = frozenEngine.segmentsStats(randomBoolean(), true); - assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - assertEquals(1, listener.afterRefresh.get()); - assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); - assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); - searcher.close(); + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + SegmentsStats segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + assertEquals(1, listener.afterRefresh.get()); + } + segmentsStats = frozenEngine.segmentsStats(randomBoolean(), false); + assertEquals(0, segmentsStats.getCount()); + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + segmentsStats = frozenEngine.segmentsStats(randomBoolean(), true); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + assertEquals(2, listener.afterRefresh.get()); + } + assertFalse(frozenEngine.isReaderOpen()); + segmentsStats = frozenEngine.segmentsStats(randomBoolean(), true); + assertEquals(frozenEngine.segments(randomBoolean()).size(), segmentsStats.getCount()); + } } } } @@ -165,16 +178,18 @@ public void testCircuitBreakerAccounting() throws IOException { assertEquals(0, breaker.getUsed()); listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { - Engine.Searcher searcher = frozenEngine.acquireSearcher("test"); - assertEquals(expectedUse, breaker.getUsed()); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); - assertEquals(1, listener.afterRefresh.get()); - assertEquals(0, breaker.getUsed()); - assertFalse(frozenEngine.isReaderOpen()); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertEquals(expectedUse, breaker.getUsed()); - searcher.close(); - assertEquals(0, breaker.getUsed()); + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + assertEquals(expectedUse, breaker.getUsed()); + } + assertEquals(1, listener.afterRefresh.get()); + assertEquals(0, breaker.getUsed()); + assertFalse(frozenEngine.isReaderOpen()); + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + assertEquals(expectedUse, breaker.getUsed()); + } + assertEquals(0, breaker.getUsed()); + } } } } @@ -216,18 +231,17 @@ public void testSearchConcurrently() throws IOException, InterruptedException { CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { threads[i] = new Thread(() -> { - try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { barrier.await(); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); for (int j = 0; j < numIters; j++) { - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); - assertTrue(frozenEngine.isReaderOpen()); - TopDocs search = searcher.search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded)); - assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded)); - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).release(); + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + assertTrue(frozenEngine.isReaderOpen()); + TopDocs search = searcher.search(new MatchAllDocsQuery(), Math.min(10, numDocsAdded)); + assertEquals(search.scoreDocs.length, Math.min(10, numDocsAdded)); + } } if (randomBoolean()) { - FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader()).reset(); + reader.acquireSearcher("test").close(); } } catch (Exception e) { throw new AssertionError(e); @@ -267,12 +281,6 @@ private static void checkOverrideMethods(Class clazz) throws NoSuchMethodExce } } - // here we make sure we catch any change to their super classes FilterLeafReader / FilterDirectoryReader - public void testOverrideMethods() throws Exception { - checkOverrideMethods(FrozenEngine.LazyDirectoryReader.class); - checkOverrideMethods(FrozenEngine.LazyLeafReader.class); - } - private class CountingRefreshListener implements ReferenceManager.RefreshListener { final AtomicInteger afterRefresh = new AtomicInteger(0); @@ -306,25 +314,27 @@ public void testCanMatch() throws IOException { engine.flushAndClose(); listener.reset(); try (FrozenEngine frozenEngine = new FrozenEngine(engine.engineConfig, true)) { - DirectoryReader reader; - try (Engine.Searcher searcher = frozenEngine.acquireSearcher("can_match")) { - assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); - assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher - .getDirectoryReader()).shardId()); - reader = searcher.getDirectoryReader(); - assertNotEquals(reader, Matchers.instanceOf(FrozenEngine.LazyDirectoryReader.class)); - assertEquals(0, listener.afterRefresh.get()); - DirectoryReader unwrap = FilterDirectoryReader.unwrap(searcher.getDirectoryReader()); - assertThat(unwrap, Matchers.instanceOf(RewriteCachingDirectoryReader.class)); - assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); + DirectoryReader dirReader; + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + try (Engine.Searcher searcher = reader.acquireSearcher("can_match")) { + dirReader = searcher.getDirectoryReader(); + assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); + assertEquals(config.getShardId(), ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher + .getDirectoryReader()).shardId()); + assertEquals(0, listener.afterRefresh.get()); + DirectoryReader unwrap = FilterDirectoryReader.unwrap(searcher.getDirectoryReader()); + assertThat(unwrap, Matchers.instanceOf(RewriteCachingDirectoryReader.class)); + assertNotNull(ElasticsearchDirectoryReader.getElasticsearchDirectoryReader(searcher.getDirectoryReader())); + } } - try (Engine.Searcher searcher = frozenEngine.acquireSearcher("can_match")) { - assertSame(reader, searcher.getDirectoryReader()); - assertNotEquals(reader, Matchers.instanceOf(FrozenEngine.LazyDirectoryReader.class)); - assertEquals(0, listener.afterRefresh.get()); - DirectoryReader unwrap = FilterDirectoryReader.unwrap(searcher.getDirectoryReader()); - assertThat(unwrap, Matchers.instanceOf(RewriteCachingDirectoryReader.class)); + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + try (Engine.Searcher searcher = reader.acquireSearcher("can_match")) { + assertSame(dirReader, searcher.getDirectoryReader()); + assertEquals(0, listener.afterRefresh.get()); + DirectoryReader unwrap = FilterDirectoryReader.unwrap(searcher.getDirectoryReader()); + assertThat(unwrap, Matchers.instanceOf(RewriteCachingDirectoryReader.class)); + } } } } @@ -346,14 +356,18 @@ public void testSearchers() throws Exception { // See TransportVerifyShardBeforeCloseAction#executeShardOperation engine.flush(true, true); engine.refresh("test"); - try (Engine.Searcher searcher = engine.acquireSearcher("test")) { - totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length; + try (Engine.SearcherSupplier reader = engine.acquireSearcherSupplier(Function.identity())) { + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + totalDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE).scoreDocs.length; + } } } try (FrozenEngine frozenEngine = new FrozenEngine(config, true)) { - try (Engine.Searcher searcher = frozenEngine.acquireSearcher("test")) { - TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); - assertThat(topDocs.scoreDocs.length, equalTo(totalDocs)); + try (Engine.SearcherSupplier reader = frozenEngine.acquireSearcherSupplier(Function.identity())) { + try (Engine.Searcher searcher = reader.acquireSearcher("test")) { + TopDocs topDocs = searcher.search(new MatchAllDocsQuery(), Integer.MAX_VALUE); + assertThat(topDocs.scoreDocs.length, equalTo(totalDocs)); + } } } } diff --git a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 27cd14576f8af..dd6d22342990c 100644 --- a/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/test/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -11,9 +11,14 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.ClearReaderAction; +import org.elasticsearch.action.search.ClearReaderRequest; +import org.elasticsearch.action.search.OpenReaderRequest; +import org.elasticsearch.action.search.OpenReaderResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; +import org.elasticsearch.action.search.TransportOpenReaderAction; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -65,7 +70,15 @@ protected Collection> getPlugins() { return pluginList(FrozenIndices.class); } - public void testCloseFreezeAndOpen() { + String openReaders(TimeValue keepAlive, String... indices) { + OpenReaderRequest request = new OpenReaderRequest(indices, IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED, + keepAlive, null, null); + final OpenReaderResponse response = client() + .execute(TransportOpenReaderAction.INSTANCE, request).actionGet(); + return response.getReaderId(); + } + + public void testCloseFreezeAndOpen() throws Exception { createIndex("index", Settings.builder().put("index.number_of_shards", 2).build()); client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); @@ -86,9 +99,7 @@ public void testCloseFreezeAndOpen() { assertEquals(useDFS ? 3 : 2, shard.refreshStats().getTotal()); assertFalse(((FrozenEngine)engine).isReaderOpen()); assertTrue(indexService.getIndexSettings().isSearchThrottled()); - try (Engine.Searcher searcher = shard.acquireSearcher("test")) { - assertNotNull(FrozenEngine.unwrapLazyReader(searcher.getDirectoryReader())); - } + // now scroll SearchResponse searchResponse = client().prepareSearch().setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) .setScroll(TimeValue.timeValueMinutes(1)).setSize(1).get(); @@ -100,13 +111,39 @@ public void testCloseFreezeAndOpen() { for (int i = 0; i < 2; i++) { shard = indexService.getShard(i); engine = IndexShardTestCase.getEngine(shard); - assertFalse(((FrozenEngine) engine).isReaderOpen()); + // scrolls keep the reader open + assertTrue(((FrozenEngine) engine).isReaderOpen()); } searchResponse = client().prepareSearchScroll(searchResponse.getScrollId()).setScroll(TimeValue.timeValueMinutes(1)).get(); } while (searchResponse.getHits().getHits().length > 0); + client().prepareClearScroll().addScrollId(searchResponse.getScrollId()).get(); + + String readerId = openReaders( TimeValue.timeValueMinutes(1), "index"); + try { + // now readerId + for (int from = 0; from < 3; from++) { + searchResponse = client().prepareSearch() + .setIndicesOptions(IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED) + .setReader(readerId, TimeValue.timeValueMinutes(1)) + .setSize(1) + .setFrom(from) + .get(); + assertHitCount(searchResponse, 3); + assertEquals(1, searchResponse.getHits().getHits().length); + SearchService searchService = getInstanceFromNode(SearchService.class); + assertThat(searchService.getActiveContexts(), Matchers.greaterThanOrEqualTo(1)); + for (int i = 0; i < 2; i++) { + shard = indexService.getShard(i); + engine = IndexShardTestCase.getEngine(shard); + assertFalse(((FrozenEngine) engine).isReaderOpen()); + } + } + } finally { + client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(searchResponse.getReaderId())).get(); + } } - public void testSearchAndGetAPIsAreThrottled() throws InterruptedException, IOException { + public void testSearchAndGetAPIsAreThrottled() throws IOException { XContentBuilder mapping = XContentFactory.jsonBuilder().startObject().startObject("_doc") .startObject("properties").startObject("field").field("type", "text").field("term_vector", "with_positions_offsets_payloads") .endObject().endObject() diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java index 1cf5098b8bdd8..5c591bdeb4166 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListener.java @@ -9,7 +9,7 @@ import org.elasticsearch.index.shard.SearchOperationListener; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.search.SearchContextMissingException; -import org.elasticsearch.search.internal.ScrollContext; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContextId; import org.elasticsearch.transport.TransportRequest; @@ -24,13 +24,12 @@ import static org.elasticsearch.xpack.security.authz.AuthorizationService.ORIGINATING_ACTION_KEY; /** - * A {@link SearchOperationListener} that is used to provide authorization for scroll requests. + * A {@link SearchOperationListener} that is used to provide authorization for search requests. * - * In order to identify the user associated with a scroll request, we replace the {@link ScrollContext} - * on creation with a custom implementation that holds the {@link Authentication} object. When - * this context is accessed again in {@link SearchOperationListener#onPreQueryPhase(SearchContext)} - * the ScrollContext is inspected for the authentication, which is compared to the currently - * authentication. + * In order to identify the user associated with a search request, we put the {@link Authentication} + * object in the {@link ReaderContext} on creation. When this context is accessed again in + * {@link SearchOperationListener#validateSearchContext} the ReaderContext is inspected for + * the authentication, which is compared to the currently authentication. */ public final class SecuritySearchOperationListener implements SearchOperationListener { @@ -44,36 +43,31 @@ public SecuritySearchOperationListener(SecurityContext securityContext, XPackLic this.auditTrailService = auditTrail; } - /** - * Adds the {@link Authentication} to the {@link ScrollContext} - */ @Override - public void onNewScrollContext(ScrollContext scrollContext) { + public void onNewReaderContext(ReaderContext readerContext) { if (licenseState.isSecurityEnabled()) { - scrollContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, securityContext.getAuthentication()); + readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, securityContext.getAuthentication()); } } /** - * Checks for the {@link ScrollContext} if it exists and compares the {@link Authentication} - * object from the scroll context with the current authentication context + * compares the {@link Authentication} object from the reader context with the current + * authentication context */ @Override - public void validateSearchContext(SearchContext searchContext, TransportRequest request) { + public void validateSearchContext(ReaderContext readerContext, SearchContext searchContext, TransportRequest request) { if (licenseState.isSecurityEnabled()) { - if (searchContext.scrollContext() != null) { - final Authentication originalAuth = searchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); - final Authentication current = securityContext.getAuthentication(); - final ThreadContext threadContext = securityContext.getThreadContext(); - final String action = threadContext.getTransient(ORIGINATING_ACTION_KEY); - ensureAuthenticatedUserIsSame(originalAuth, current, auditTrailService, searchContext.id(), action, request, - AuditUtil.extractRequestId(threadContext), threadContext.getTransient(AUTHORIZATION_INFO_KEY)); - } + final Authentication originalAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY); + final Authentication current = securityContext.getAuthentication(); + final ThreadContext threadContext = securityContext.getThreadContext(); + final String action = threadContext.getTransient(ORIGINATING_ACTION_KEY); + ensureAuthenticatedUserIsSame(originalAuth, current, auditTrailService, searchContext.id(), action, request, + AuditUtil.extractRequestId(threadContext), threadContext.getTransient(AUTHORIZATION_INFO_KEY)); } } /** - * Compares the {@link Authentication} that was stored in the {@link ScrollContext} with the + * Compares the {@link Authentication} that was stored in the {@link ReaderContext} with the * current authentication. We cannot guarantee that all of the details of the authentication will * be the same. Some things that could differ include the roles, the name of the authenticating * (or lookup) realm. To work around this we compare the username and the originating realm type. diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java index 72d09aec8a140..87b25c5cf3f50 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/DocumentLevelSecurityTests.java @@ -12,6 +12,8 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.search.ClearReaderAction; +import org.elasticsearch.action.search.ClearReaderRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.termvectors.MultiTermVectorsResponse; @@ -63,6 +65,7 @@ import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import static org.elasticsearch.integration.FieldLevelSecurityTests.openReaders; import static org.elasticsearch.join.query.JoinQueryBuilders.hasChildQuery; import static org.elasticsearch.join.query.JoinQueryBuilders.hasParentQuery; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -759,6 +762,46 @@ public void testScroll() throws Exception { } } + public void testReaderId() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)) + .setMapping("field1", "type=text", "field2", "type=text", "field3", "type=text") + ); + final int numVisible = scaledRandomIntBetween(2, 10); + final int numInvisible = scaledRandomIntBetween(2, 10); + int id = 1; + for (int i = 0; i < numVisible; i++) { + client().prepareIndex("test").setId(String.valueOf(id++)).setSource("field1", "value1").get(); + } + + for (int i = 0; i < numInvisible; i++) { + client().prepareIndex("test").setId(String.valueOf(id++)).setSource("field2", "value2").get(); + client().prepareIndex("test").setId(String.valueOf(id++)).setSource("field3", "value3").get(); + } + refresh(); + + String readerId = openReaders("user1", TimeValue.timeValueMinutes(1), "test"); + SearchResponse response = null; + try { + for (int from = 0; from < numVisible; from++) { + response = client() + .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) + .prepareSearch() + .setSize(1) + .setFrom(from) + .setReader(readerId, TimeValue.timeValueMinutes(1)) + .setQuery(termQuery("field1", "value1")) + .get(); + assertNoFailures(response); + assertThat(response.getHits().getTotalHits().value, is((long) numVisible)); + assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); + } + } finally { + client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(response.getReaderId())).actionGet(); + } + } + public void testRequestCache() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") .setSettings(Settings.builder().put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java index a4fe2e9a2cee0..f7b1e227cdd3b 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/integration/FieldLevelSecurityTests.java @@ -11,8 +11,13 @@ import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.search.ClearReaderAction; +import org.elasticsearch.action.search.ClearReaderRequest; import org.elasticsearch.action.search.MultiSearchResponse; +import org.elasticsearch.action.search.OpenReaderRequest; +import org.elasticsearch.action.search.OpenReaderResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.TransportOpenReaderAction; import org.elasticsearch.action.termvectors.MultiTermVectorsResponse; import org.elasticsearch.action.termvectors.TermVectorsRequest; import org.elasticsearch.action.termvectors.TermVectorsResponse; @@ -672,7 +677,8 @@ public void testMSearchApi() throws Exception { public void testScroll() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") - .setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) + .setSettings(Settings.builder() + .put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) .setMapping("field1", "type=text", "field2", "type=text", "field3", "type=text") ); @@ -722,6 +728,51 @@ public void testScroll() throws Exception { } } + static String openReaders(String userName, TimeValue keepAlive, String... indices) { + OpenReaderRequest request = new OpenReaderRequest(indices, OpenReaderRequest.DEFAULT_INDICES_OPTIONS, keepAlive, null, null); + final OpenReaderResponse response = client() + .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue(userName, USERS_PASSWD))) + .execute(TransportOpenReaderAction.INSTANCE, request).actionGet(); + return response.getReaderId(); + } + + public void testReaderId() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) + .setMapping("field1", "type=text", "field2", "type=text", "field3", "type=text") + ); + + final int numDocs = scaledRandomIntBetween(2, 10); + for (int i = 0; i < numDocs; i++) { + client().prepareIndex("test").setId(String.valueOf(i)) + .setSource("field1", "value1", "field2", "value2", "field3", "value3") + .get(); + } + refresh("test"); + + String readerId = openReaders("user1", TimeValue.timeValueMinutes(1), "test"); + SearchResponse response = null; + try { + for (int from = 0; from < numDocs; from++) { + response = client() + .filterWithHeader(Collections.singletonMap(BASIC_AUTH_HEADER, basicAuthHeaderValue("user1", USERS_PASSWD))) + .prepareSearch() + .setReader(readerId, TimeValue.timeValueMinutes(1L)) + .setSize(1) + .setFrom(from) + .setQuery(constantScoreQuery(termQuery("field1", "value1"))) + .setFetchSource(true) + .get(); + assertThat(response.getHits().getTotalHits().value, is((long) numDocs)); + assertThat(response.getHits().getHits().length, is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().size(), is(1)); + assertThat(response.getHits().getAt(0).getSourceAsMap().get("field1"), is("value1")); + } + } finally { + client().execute(ClearReaderAction.INSTANCE, new ClearReaderRequest(readerId)).actionGet(); + } + } + public void testQueryCache() throws Exception { assertAcked(client().admin().indices().prepareCreate("test") .setSettings(Settings.builder().put(IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.getKey(), true)) diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java index 1e4cc478a0421..b390ccbb85f81 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/SecuritySearchOperationListenerTests.java @@ -10,15 +10,17 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext.StoredContext; +import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.license.XPackLicenseState.Feature; import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchContextMissingException; import org.elasticsearch.search.internal.InternalScrollSearchRequest; +import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContextId; -import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.ESSingleNodeTestCase; import org.elasticsearch.test.TestSearchContext; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest.Empty; @@ -30,6 +32,7 @@ import org.elasticsearch.xpack.core.security.user.User; import org.elasticsearch.xpack.security.audit.AuditTrail; import org.elasticsearch.xpack.security.audit.AuditTrailService; +import org.junit.Before; import org.mockito.Mockito; import java.util.Collections; @@ -47,134 +50,145 @@ import static org.mockito.Mockito.verifyZeroInteractions; import static org.mockito.Mockito.when; -public class SecuritySearchOperationListenerTests extends ESTestCase { +public class SecuritySearchOperationListenerTests extends ESSingleNodeTestCase { + private IndexShard shard; + + @Before + private void setupShard() { + shard = createIndex("index").getShard(0); + } public void testUnlicensed() { - XPackLicenseState licenseState = mock(XPackLicenseState.class); - when(licenseState.isSecurityEnabled()).thenReturn(false); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - AuditTrailService auditTrailService = mock(AuditTrailService.class); - SearchContext searchContext = mock(SearchContext.class); - ScrollContext scrollContext = new ScrollContext(); - when(searchContext.scrollContext()).thenReturn(scrollContext); - - SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); - listener.validateSearchContext(searchContext, Empty.INSTANCE); - verify(licenseState, times(2)).isSecurityEnabled(); - verifyZeroInteractions(auditTrailService, searchContext); + try (ReaderContext readerContext = new ReaderContext(0L, shard, shard.acquireSearcherSupplier(), Long.MAX_VALUE, false)) { + XPackLicenseState licenseState = mock(XPackLicenseState.class); + when(licenseState.isSecurityEnabled()).thenReturn(false); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); + AuditTrailService auditTrailService = mock(AuditTrailService.class); + SearchContext searchContext = mock(SearchContext.class); + + SecuritySearchOperationListener listener = + new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); + listener.onNewReaderContext(readerContext); + listener.validateSearchContext(readerContext, searchContext, Empty.INSTANCE); + verify(licenseState, times(2)).isSecurityEnabled(); + verifyZeroInteractions(auditTrailService, searchContext); + } } public void testOnNewContextSetsAuthentication() throws Exception { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); - final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(2L)); - testSearchContext.scrollContext().scroll = scroll; - XPackLicenseState licenseState = mock(XPackLicenseState.class); - when(licenseState.isSecurityEnabled()).thenReturn(true); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - AuditTrailService auditTrailService = mock(AuditTrailService.class); - Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); - authentication.writeToContext(threadContext); + try (ReaderContext readerContext = new ReaderContext(0L, shard, shard.acquireSearcherSupplier(), Long.MAX_VALUE, false)) { + XPackLicenseState licenseState = mock(XPackLicenseState.class); + when(licenseState.isSecurityEnabled()).thenReturn(true); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); + AuditTrailService auditTrailService = mock(AuditTrailService.class); + Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); + authentication.writeToContext(threadContext); - SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); - listener.onNewScrollContext(scrollContext); + SecuritySearchOperationListener listener = + new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); + listener.onNewReaderContext(readerContext); - Authentication contextAuth = testSearchContext.scrollContext().getFromContext(AuthenticationField.AUTHENTICATION_KEY); - assertEquals(authentication, contextAuth); - assertEquals(scroll, testSearchContext.scrollContext().scroll); + Authentication contextAuth = readerContext.getFromContext(AuthenticationField.AUTHENTICATION_KEY); + assertEquals(authentication, contextAuth); - verify(licenseState).isSecurityEnabled(); - verifyZeroInteractions(auditTrailService); + verify(licenseState).isSecurityEnabled(); + verifyZeroInteractions(auditTrailService); + } } public void testValidateSearchContext() throws Exception { - ScrollContext scrollContext = new ScrollContext(); - TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); - testSearchContext.scrollContext().putInContext(AuthenticationField.AUTHENTICATION_KEY, + try (ReaderContext readerContext = new ReaderContext(0L, shard, shard.acquireSearcherSupplier(), Long.MAX_VALUE, false)) { + ScrollContext scrollContext = new ScrollContext(); + TestSearchContext testSearchContext = new TestSearchContext(null, null, null, scrollContext); + readerContext.putInContext(AuthenticationField.AUTHENTICATION_KEY, new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null)); - testSearchContext.scrollContext().scroll = new Scroll(TimeValue.timeValueSeconds(2L)); - XPackLicenseState licenseState = mock(XPackLicenseState.class); - when(licenseState.isSecurityEnabled()).thenReturn(true); - when(licenseState.isAllowed(Feature.SECURITY_AUDITING)).thenReturn(true); - ThreadContext threadContext = new ThreadContext(Settings.EMPTY); - final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); - AuditTrail auditTrail = mock(AuditTrail.class); - AuditTrailService auditTrailService = new AuditTrailService(Collections.singletonList(auditTrail), licenseState); + testSearchContext.scrollContext().scroll = new Scroll(TimeValue.timeValueSeconds(2L)); + XPackLicenseState licenseState = mock(XPackLicenseState.class); + when(licenseState.isSecurityEnabled()).thenReturn(true); + when(licenseState.isAllowed(Feature.SECURITY_AUDITING)).thenReturn(true); + ThreadContext threadContext = new ThreadContext(Settings.EMPTY); + final SecurityContext securityContext = new SecurityContext(Settings.EMPTY, threadContext); + AuditTrail auditTrail = mock(AuditTrail.class); + AuditTrailService auditTrailService = + new AuditTrailService(Collections.singletonList(auditTrail), licenseState); - SecuritySearchOperationListener listener = new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); - try (StoredContext ignore = threadContext.newStoredContext(false)) { - Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); - authentication.writeToContext(threadContext); - listener.validateSearchContext(testSearchContext, Empty.INSTANCE); - verify(licenseState).isSecurityEnabled(); - verifyZeroInteractions(auditTrail); - } + SecuritySearchOperationListener listener = + new SecuritySearchOperationListener(securityContext, licenseState, auditTrailService); + try (StoredContext ignore = threadContext.newStoredContext(false)) { + Authentication authentication = new Authentication(new User("test", "role"), new RealmRef("realm", "file", "node"), null); + authentication.writeToContext(threadContext); + listener.validateSearchContext(readerContext, testSearchContext, Empty.INSTANCE); + verify(licenseState).isSecurityEnabled(); + verifyZeroInteractions(auditTrail); + } - try (StoredContext ignore = threadContext.newStoredContext(false)) { - final String nodeName = randomAlphaOfLengthBetween(1, 8); - final String realmName = randomAlphaOfLengthBetween(1, 16); - Authentication authentication = new Authentication(new User("test", "role"), new RealmRef(realmName, "file", nodeName), null); - authentication.writeToContext(threadContext); - listener.validateSearchContext(testSearchContext, Empty.INSTANCE); - verify(licenseState, times(2)).isSecurityEnabled(); - verifyZeroInteractions(auditTrail); - } + try (StoredContext ignore = threadContext.newStoredContext(false)) { + final String nodeName = randomAlphaOfLengthBetween(1, 8); + final String realmName = randomAlphaOfLengthBetween(1, 16); + Authentication authentication = + new Authentication(new User("test", "role"), new RealmRef(realmName, "file", nodeName), null); + authentication.writeToContext(threadContext); + listener.validateSearchContext(readerContext, testSearchContext, Empty.INSTANCE); + verify(licenseState, times(2)).isSecurityEnabled(); + verifyZeroInteractions(auditTrail); + } - try (StoredContext ignore = threadContext.newStoredContext(false)) { - final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); - final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); - final String type = randomAlphaOfLengthBetween(5, 16); - Authentication authentication = new Authentication(new User("test", "role"), new RealmRef(realmName, type, nodeName), null); - authentication.writeToContext(threadContext); - threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); - threadContext.putTransient(AUTHORIZATION_INFO_KEY, - (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); - final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - SearchContextMissingException expected = - expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request)); - assertEquals(testSearchContext.id(), expected.contextId()); - verify(licenseState, Mockito.atLeast(3)).isSecurityEnabled(); - verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request), - authzInfoRoles(authentication.getUser().roles())); - } + try (StoredContext ignore = threadContext.newStoredContext(false)) { + final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); + final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); + final String type = randomAlphaOfLengthBetween(5, 16); + Authentication authentication = + new Authentication(new User("test", "role"), new RealmRef(realmName, type, nodeName), null); + authentication.writeToContext(threadContext); + threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); + threadContext.putTransient(AUTHORIZATION_INFO_KEY, + (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); + final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); + SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, + () -> listener.validateSearchContext(readerContext, testSearchContext, request)); + assertEquals(testSearchContext.id(), expected.contextId()); + verify(licenseState, Mockito.atLeast(3)).isSecurityEnabled(); + verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request), + authzInfoRoles(authentication.getUser().roles())); + } - // another user running as the original user - try (StoredContext ignore = threadContext.newStoredContext(false)) { - final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); - final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); - final String type = randomAlphaOfLengthBetween(5, 16); - User user = new User(new User("test", "role"), new User("authenticated", "runas")); - Authentication authentication = new Authentication(user, new RealmRef(realmName, type, nodeName), + // another user running as the original user + try (StoredContext ignore = threadContext.newStoredContext(false)) { + final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); + final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); + final String type = randomAlphaOfLengthBetween(5, 16); + User user = new User(new User("test", "role"), new User("authenticated", "runas")); + Authentication authentication = new Authentication(user, new RealmRef(realmName, type, nodeName), new RealmRef(randomAlphaOfLengthBetween(1, 16), "file", nodeName)); - authentication.writeToContext(threadContext); - threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); - final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - listener.validateSearchContext(testSearchContext, request); - verify(licenseState, Mockito.atLeast(4)).isSecurityEnabled(); - verifyNoMoreInteractions(auditTrail); - } + authentication.writeToContext(threadContext); + threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); + final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); + listener.validateSearchContext(readerContext, testSearchContext, request); + verify(licenseState, Mockito.atLeast(4)).isSecurityEnabled(); + verifyNoMoreInteractions(auditTrail); + } - // the user that authenticated for the run as request - try (StoredContext ignore = threadContext.newStoredContext(false)) { - final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); - final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); - final String type = randomAlphaOfLengthBetween(5, 16); - Authentication authentication = + // the user that authenticated for the run as request + try (StoredContext ignore = threadContext.newStoredContext(false)) { + final String nodeName = randomBoolean() ? "node" : randomAlphaOfLengthBetween(1, 8); + final String realmName = randomBoolean() ? "realm" : randomAlphaOfLengthBetween(1, 16); + final String type = randomAlphaOfLengthBetween(5, 16); + Authentication authentication = new Authentication(new User("authenticated", "runas"), new RealmRef(realmName, type, nodeName), null); - authentication.writeToContext(threadContext); - threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); - threadContext.putTransient(AUTHORIZATION_INFO_KEY, - (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); - final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); - SearchContextMissingException expected = - expectThrows(SearchContextMissingException.class, () -> listener.validateSearchContext(testSearchContext, request)); - assertEquals(testSearchContext.id(), expected.contextId()); - verify(licenseState, Mockito.atLeast(5)).isSecurityEnabled(); - verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request), - authzInfoRoles(authentication.getUser().roles())); + authentication.writeToContext(threadContext); + threadContext.putTransient(ORIGINATING_ACTION_KEY, "action"); + threadContext.putTransient(AUTHORIZATION_INFO_KEY, + (AuthorizationInfo) () -> Collections.singletonMap(PRINCIPAL_ROLES_FIELD_NAME, authentication.getUser().roles())); + final InternalScrollSearchRequest request = new InternalScrollSearchRequest(); + SearchContextMissingException expected = expectThrows(SearchContextMissingException.class, + () -> listener.validateSearchContext(readerContext, testSearchContext, request)); + assertEquals(testSearchContext.id(), expected.contextId()); + verify(licenseState, Mockito.atLeast(5)).isSecurityEnabled(); + verify(auditTrail).accessDenied(eq(null), eq(authentication), eq("action"), eq(request), + authzInfoRoles(authentication.getUser().roles())); + } } }