Skip to content

Commit

Permalink
Introduce point in time APIs in x-pack basic (#61062)
Browse files Browse the repository at this point in the history
This commit introduces a new API that manages point-in-times in x-pack 
basic. Elasticsearch pit (point in time) is a lightweight view into the
state of the data as it existed when initiated. A search request by
default executes against the most recent point in time. In some cases,
it is preferred to perform multiple search requests using the same point
in time. For example, if refreshes happen between search_after requests,
then the results of those requests might not be consistent as changes
happening between searches are only visible to the more recent point in
time.

A point in time must be opened before being used in search requests. The 
`keep_alive` parameter tells Elasticsearch how long it should keep a
point in time around.

```
POST /my_index/_pit?keep_alive=1m
```

The response from the above request includes a `id`, which should be 
passed to the `id` of the `pit` parameter of search requests.

```
POST /_search
{
    "query": {
        "match" : {
            "title" : "elasticsearch"
        }
    },
    "pit": {
            "id":  "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==",
            "keep_alive": "1m"
    }
}
```

Point-in-times are automatically closed when the `keep_alive` is 
elapsed. However, keeping point-in-times has a cost; hence,
point-in-times should be closed as soon as they are no longer used in
search requests.

```
DELETE /_pit
{
    "id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
}
```

#### Notable works in this change:

- Move the search state to the coordinating node: #52741
- Allow searches with a specific reader context: #53989
- Add the ability to acquire readers in IndexShard: #54966

Relates #46523
Relates #26472

Co-authored-by: Jim Ferenczi <jimczi@apache.org>
  • Loading branch information
dnhatn and jimczi authored Aug 25, 2020
1 parent fc50e17 commit 879279c
Show file tree
Hide file tree
Showing 121 changed files with 4,366 additions and 2,116 deletions.
116 changes: 116 additions & 0 deletions docs/reference/search/point-in-time.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
[role="xpack"]
[testenv="basic"]
[[point-in-time]]
==== Point in time

A search request by default executes against the most recent visible data of
the target indices, which is called point in time. Elasticsearch pit (point in time)
is a lightweight view into the state of the data as it existed when initiated.
In some cases, it's preferred to perform multiple search requests using
the same point in time. For example, if <<indices-refresh,refreshes>> happen between
search_after requests, then the results of those requests might not be consistent as
changes happening between searches are only visible to the more recent point in time.

A point in time must be opened explicitly before being used in search requests. The
keep_alive parameter tells Elasticsearch how long it should keep a point in time alive,
e.g. `?keep_alive=5m`.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m
--------------------------------------------------
// TEST[setup:my_index]

The result from the above request includes a `id`, which should
be passed to the `id` of the `pit` parameter of a search request.

[source,console]
--------------------------------------------------
POST /_search <1>
{
"size": 100,
"query": {
"match" : {
"title" : "elasticsearch"
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
--------------------------------------------------
// TEST[catch:missing]

<1> A search request with the `pit` parameter must not specify `index`, `routing`,
and {ref}/search-request-body.html#request-body-search-preference[`preference`]
as these parameters are copied from the point in time.
<2> The `id` parameter tells Elasticsearch to execute the request using contexts
from this point int time.
<3> The `keep_alive` parameter tells Elasticsearch how long it should extend
the time to live of the point in time.

IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.

[[point-in-time-keep-alive]]
===== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
search request, extends the time to live of the corresponding point in time.
The value (e.g. `1m`, see <<time-units>>) does not need to be long enough to
process all data -- it just needs to be long enough for the next request.

Normally, the background merge process optimizes the index by merging together
smaller segments to create new, bigger segments. Once the smaller segments are
no longer needed they are deleted. However, open point-in-times prevent the
old segments from being deleted since they are still in use.

TIP: Keeping older segments alive means that more disk space and file handles
are needed. Ensure that you have configured your nodes to have ample free file
handles. See <<file-descriptors>>.

Additionally, if a segment contains deleted or updated documents then the
point in time must keep track of whether each document in the segment was live at
the time of the initial search request. Ensure that your nodes have sufficient heap
space if you have many open point-in-times on an index that is subject to ongoing
deletes or updates.

You can check how many point-in-times (i.e, search contexts) are open with the
<<cluster-nodes-stats,nodes stats API>>:

[source,console]
---------------------------------------
GET /_nodes/stats/indices/search
---------------------------------------

===== Close point in time API

Point-in-time is automatically closed when its `keep_alive` has
been elapsed. However keeping point-in-times has a cost, as discussed in the
<<point-in-time-keep-alive,previous section>>. Point-in-times should be closed
as soon as they are no longer used in search requests.

[source,console]
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
}
---------------------------------------
// TEST[catch:missing]

The API returns the following response:

[source,console-result]
--------------------------------------------------
{
"succeeded": true, <1>
"num_freed": 3 <2>
}
--------------------------------------------------
// TESTRESPONSE[s/"succeeded": true/"succeeded": $body.succeeded/]
// TESTRESPONSE[s/"num_freed": 3/"num_freed": $body.num_freed/]

<1> If true, all search contexts associated with the point-in-time id are successfully closed
<2> The number of search contexts have been successfully closed
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,8 @@ public TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
search.max_keep_alive: "1m"

- do:
catch: /.*Keep alive for scroll.*is too large.*/
catch: /.*Keep alive for.*is too large.*/
search:
rest_total_hits_as_int: true
index: test_scroll
Expand All @@ -61,7 +61,7 @@
- length: {hits.hits: 1 }

- do:
catch: /.*Keep alive for scroll.*is too large.*/
catch: /.*Keep alive for.*is too large.*/
scroll:
rest_total_hits_as_int: true
scroll_id: $scroll_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public void testInvalidScrollKeepAlive() throws IOException {
IllegalArgumentException illegalArgumentException =
(IllegalArgumentException) ExceptionsHelper.unwrap(exc, IllegalArgumentException.class);
assertNotNull(illegalArgumentException);
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for scroll (2h) is too large"));
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for request (2h) is too large"));

SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
Expand All @@ -619,7 +619,7 @@ public void testInvalidScrollKeepAlive() throws IOException {
illegalArgumentException =
(IllegalArgumentException) ExceptionsHelper.unwrap(exc, IllegalArgumentException.class);
assertNotNull(illegalArgumentException);
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for scroll (3h) is too large"));
assertThat(illegalArgumentException.getMessage(), containsString("Keep alive for request (3h) is too large"));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NoShardAvailableActionException;
import org.elasticsearch.action.ShardOperationFailedException;
Expand Down Expand Up @@ -163,7 +164,7 @@ public final void start() {
// total hits is null in the response if the tracking of total hits is disabled
boolean withTotalHits = trackTotalHitsUpTo != SearchContext.TRACK_TOTAL_HITS_DISABLED;
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(withTotalHits), null, 0, 0, 0, buildTookInMillis(),
ShardSearchFailure.EMPTY_ARRAY, clusters));
ShardSearchFailure.EMPTY_ARRAY, clusters, null));
return;
}
executePhase(this);
Expand Down Expand Up @@ -514,22 +515,29 @@ public final SearchRequest getRequest() {
return request;
}

protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse,
String scrollId,
ShardSearchFailure[] failures) {
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
String scrollId, String searchContextId) {
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
skippedOps.get(), buildTookInMillis(), failures, clusters);
skippedOps.get(), buildTookInMillis(), failures, clusters, searchContextId);
}

boolean includeSearchContextInResponse() {
return request.pointInTimeBuilder() != null;
}

@Override
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, AtomicArray<SearchPhaseResult> queryResults) {
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (allowPartialResults == false && failures.length > 0){
if (request.pointInTimeBuilder() == null && allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));
final Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
final String scrollId = request.scroll() != null ? TransportSearchHelper.buildScrollId(queryResults, minNodeVersion) : null;
final String searchContextId =
includeSearchContextInResponse() ? SearchContextId.encode(queryResults.asList(), aliasFilter, minNodeVersion) : null;
listener.onResponse(buildSearchResponse(internalSearchResponse, failures, scrollId, searchContextId));
}
}

Expand Down Expand Up @@ -598,12 +606,13 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings);
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings,
shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
// Note that, we have to disable this shortcut for scroll queries.
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && request.scroll() == null);
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);
return shardRequest;
}

Expand Down Expand Up @@ -673,8 +682,4 @@ private synchronized Runnable tryQueue(Runnable runnable) {
return toExecute;
}
}

protected ClusterState clusterState() {
return clusterState;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,28 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;

final class ClearScrollController implements Runnable {
public final class ClearScrollController implements Runnable {
private final DiscoveryNodes nodes;
private final SearchTransportService searchTransportService;
private final CountDown expectedOps;
Expand All @@ -56,19 +64,18 @@ final class ClearScrollController implements Runnable {
expectedOps = nodes.getSize();
runner = this::cleanAllScrolls;
} else {
List<ScrollIdForNode> parsedScrollIds = new ArrayList<>();
for (String parsedScrollId : request.getScrollIds()) {
ScrollIdForNode[] context = parseScrollId(parsedScrollId).getContext();
for (ScrollIdForNode id : context) {
parsedScrollIds.add(id);
}
// TODO: replace this with #closeContexts
List<SearchContextIdForNode> contexts = new ArrayList<>();
for (String scrollId : request.getScrollIds()) {
SearchContextIdForNode[] context = parseScrollId(scrollId).getContext();
Collections.addAll(contexts, context);
}
if (parsedScrollIds.isEmpty()) {
if (contexts.isEmpty()) {
expectedOps = 0;
runner = () -> listener.onResponse(new ClearScrollResponse(true, 0));
} else {
expectedOps = parsedScrollIds.size();
runner = () -> cleanScrollIds(parsedScrollIds);
expectedOps = contexts.size();
runner = () -> cleanScrollIds(contexts);
}
}
this.expectedOps = new CountDown(expectedOps);
Expand Down Expand Up @@ -101,17 +108,17 @@ public void onFailure(Exception e) {
}
}

void cleanScrollIds(List<ScrollIdForNode> parsedScrollIds) {
SearchScrollAsyncAction.collectNodesAndRun(parsedScrollIds, nodes, searchTransportService, ActionListener.wrap(
void cleanScrollIds(List<SearchContextIdForNode> contextIds) {
SearchScrollAsyncAction.collectNodesAndRun(contextIds, nodes, searchTransportService, ActionListener.wrap(
lookup -> {
for (ScrollIdForNode target : parsedScrollIds) {
for (SearchContextIdForNode target : contextIds) {
final DiscoveryNode node = lookup.apply(target.getClusterAlias(), target.getNode());
if (node == null) {
onFreedContext(false);
} else {
try {
Transport.Connection connection = searchTransportService.getConnection(target.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, target.getContextId(),
searchTransportService.sendFreeContext(connection, target.getSearchContextId(),
ActionListener.wrap(freed -> onFreedContext(freed.isFreed()), e -> onFailedFreedContext(e, node)));
} catch (Exception e) {
onFailedFreedContext(e, node);
Expand Down Expand Up @@ -142,4 +149,45 @@ private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
listener.onResponse(new ClearScrollResponse(false, freedSearchContexts.get()));
}
}

/**
* Closes the given context id and reports the number of freed contexts via the listener
*/
public static void closeContexts(DiscoveryNodes nodes, SearchTransportService searchTransportService,
Collection<SearchContextIdForNode> contextIds,
ActionListener<Integer> listener) {
if (contextIds.isEmpty()) {
listener.onResponse(0);
return;
}
final Set<String> clusters = contextIds.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias).collect(Collectors.toSet());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();
if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
}
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(listener, (l, rs) -> l.onResponse(Math.toIntExact(rs.stream().filter(r -> r).count()))),
contextIds.size()
);
for (SearchContextIdForNode contextId : contextIds) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onResponse(false);
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendFreeContext(connection, contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)));
} catch (Exception e) {
groupedListener.onResponse(false);
}
}
}
}, listener::onFailure);
}
}
Loading

0 comments on commit 879279c

Please sign in to comment.