Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce point in time APIs in x-pack basic #61872

Closed
wants to merge 9 commits into from
116 changes: 116 additions & 0 deletions docs/reference/search/point-in-time.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
[role="xpack"]
[testenv="basic"]
[[point-in-time]]
==== Point in time

A search request by default executes against the most recent visible data of
the target indices, which is called point in time. Elasticsearch pit (point in time)
is a lightweight view into the state of the data as it existed when initiated.
In some cases, it's preferred to perform multiple search requests using
the same point in time. For example, if <<indices-refresh,refreshes>> happen between
search_after requests, then the results of those requests might not be consistent as
changes happening between searches are only visible to the more recent point in time.

A point in time must be opened explicitly before being used in search requests. The
keep_alive parameter tells Elasticsearch how long it should keep a point in time alive,
e.g. `?keep_alive=5m`.

[source,console]
--------------------------------------------------
POST /my-index-000001/_pit?keep_alive=1m
--------------------------------------------------
// TEST[setup:my_index]

The result from the above request includes a `id`, which should
be passed to the `id` of the `pit` parameter of a search request.

[source,console]
--------------------------------------------------
POST /_search <1>
{
"size": 100,
"query": {
"match" : {
"title" : "elasticsearch"
}
},
"pit": {
"id": "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWICBXV1aWQyAAAFdXVpZDEAAQltYXRjaF9hbGw_gAAAAA==", <2>
"keep_alive": "1m" <3>
}
}
--------------------------------------------------
// TEST[catch:missing]

<1> A search request with the `pit` parameter must not specify `index`, `routing`,
and {ref}/search-request-body.html#request-body-search-preference[`preference`]
as these parameters are copied from the point in time.
<2> The `id` parameter tells Elasticsearch to execute the request using contexts
from this point int time.
<3> The `keep_alive` parameter tells Elasticsearch how long it should extend
the time to live of the point in time.

IMPORTANT: The open point in time request and each subsequent search request can
return different `id`; thus always use the most recently received `id` for the
next search request.

[[point-in-time-keep-alive]]
===== Keeping point in time alive
The `keep_alive` parameter, which is passed to a open point in time request and
search request, extends the time to live of the corresponding point in time.
The value (e.g. `1m`, see <<time-units>>) does not need to be long enough to
process all data -- it just needs to be long enough for the next request.

Normally, the background merge process optimizes the index by merging together
smaller segments to create new, bigger segments. Once the smaller segments are
no longer needed they are deleted. However, open point-in-times prevent the
old segments from being deleted since they are still in use.

TIP: Keeping older segments alive means that more disk space and file handles
are needed. Ensure that you have configured your nodes to have ample free file
handles. See <<file-descriptors>>.

Additionally, if a segment contains deleted or updated documents then the
point in time must keep track of whether each document in the segment was live at
the time of the initial search request. Ensure that your nodes have sufficient heap
space if you have many open point-in-times on an index that is subject to ongoing
deletes or updates.

You can check how many point-in-times (i.e, search contexts) are open with the
<<cluster-nodes-stats,nodes stats API>>:

[source,console]
---------------------------------------
GET /_nodes/stats/indices/search
---------------------------------------

===== Close point in time API

Point-in-time is automatically closed when its `keep_alive` has
been elapsed. However keeping point-in-times has a cost, as discussed in the
<<point-in-time-keep-alive,previous section>>. Point-in-times should be closed
as soon as they are no longer used in search requests.

[source,console]
---------------------------------------
DELETE /_pit
{
"id" : "46ToAwMDaWR4BXV1aWQxAgZub2RlXzEAAAAAAAAAAAEBYQNpZHkFdXVpZDIrBm5vZGVfMwAAAAAAAAAAKgFjA2lkeQV1dWlkMioGbm9kZV8yAAAAAAAAAAAMAWIBBXV1aWQyAAA="
}
---------------------------------------
// TEST[catch:missing]

The API returns the following response:

[source,console-result]
--------------------------------------------------
{
"succeeded": true, <1>
"num_freed": 3 <2>
}
--------------------------------------------------
// TESTRESPONSE[s/"succeeded": true/"succeeded": $body.succeeded/]
// TESTRESPONSE[s/"num_freed": 3/"num_freed": $body.num_freed/]

<1> If true, all search contexts associated with the point-in-time id are successfully closed
<2> The number of search contexts have been successfully closed
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public String getName() {
public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// Creates the search request with all required params
SearchRequest searchRequest = new SearchRequest();
RestSearchAction.parseSearchRequest(searchRequest, request, null, size -> searchRequest.source().size(size));
RestSearchAction.parseSearchRequest(
searchRequest, request, null, client.getNamedWriteableRegistry(), size -> searchRequest.source().size(size));

// Creates the search template request
SearchTemplateRequest searchTemplateRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,8 @@ public TopDocsAndMaxScore topDocs(SearchHit hit) throws IOException {
topDocsCollector = TopScoreDocCollector.create(topN, Integer.MAX_VALUE);
maxScoreCollector = new MaxScoreCollector();
}
try {
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
} finally {
clearReleasables(Lifetime.COLLECTION);
for (LeafReaderContext ctx : context.searcher().getIndexReader().leaves()) {
intersect(weight, innerHitQueryWeight, MultiCollector.wrap(topDocsCollector, maxScoreCollector), ctx);
}
TopDocs topDocs = topDocsCollector.topDocs(from(), size());
float maxScore = Float.NaN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
Expand All @@ -49,7 +50,7 @@ protected AbstractBaseReindexRestHandler(A action) {
protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient client,
boolean includeCreated, boolean includeUpdated) throws IOException {
// Build the internal request
Request internal = setCommonOptions(request, buildRequest(request));
Request internal = setCommonOptions(request, buildRequest(request, client.getNamedWriteableRegistry()));

// Executes the request and waits for completion
if (request.paramAsBoolean("wait_for_completion", true)) {
Expand Down Expand Up @@ -77,7 +78,7 @@ protected RestChannelConsumer doPrepareRequest(RestRequest request, NodeClient c
/**
* Build the Request based on the RestRequest.
*/
protected abstract Request buildRequest(RestRequest request) throws IOException;
protected abstract Request buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException;

/**
* Sets common options of {@link AbstractBulkByScrollRequest} requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand All @@ -44,15 +45,16 @@ protected AbstractBulkByQueryRestHandler(A action) {
super(action);
}

protected void parseInternalRequest(Request internal, RestRequest restRequest,
protected void parseInternalRequest(Request internal, RestRequest restRequest, NamedWriteableRegistry namedWriteableRegistry,
Map<String, Consumer<Object>> bodyConsumers) throws IOException {
assert internal != null : "Request should not be null";
assert restRequest != null : "RestRequest should not be null";

SearchRequest searchRequest = internal.getSearchRequest();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> setMaxDocsFromSearchSize(internal, size));
RestSearchAction.parseSearchRequest(
searchRequest, restRequest, parser, namedWriteableRegistry, size -> setMaxDocsFromSearchSize(internal, size));
}

searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest;

import java.io.IOException;
Expand Down Expand Up @@ -56,7 +57,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOException {
protected DeleteByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/*
* Passing the search request through DeleteByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
Expand All @@ -68,7 +69,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);
parseInternalRequest(internal, request, namedWriteableRegistry, consumers);

return internal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestRequest;

Expand Down Expand Up @@ -56,7 +57,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected ReindexRequest buildRequest(RestRequest request) throws IOException {
protected ReindexRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
if (request.hasParam("pipeline")) {
throw new IllegalArgumentException("_reindex doesn't support [pipeline] as a query parameter. "
+ "Specify it in the [dest] object instead.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.reindex;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.Script;

Expand Down Expand Up @@ -57,7 +58,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
}

@Override
protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOException {
protected UpdateByQueryRequest buildRequest(RestRequest request, NamedWriteableRegistry namedWriteableRegistry) throws IOException {
/*
* Passing the search request through UpdateByQueryRequest first allows
* it to set its own defaults which differ from SearchRequest's
Expand All @@ -70,7 +71,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept
consumers.put("script", o -> internal.setScript(Script.parse(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);
parseInternalRequest(internal, request, namedWriteableRegistry, consumers);

internal.setPipeline(request.param("pipeline"));
return internal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testTypeInPath() throws IOException {

// checks the type in the URL is propagated correctly to the request object
// only works after the request is dispatched, so its params are filled from url.
DeleteByQueryRequest dbqRequest = action.buildRequest(request);
DeleteByQueryRequest dbqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertArrayEquals(new String[]{"some_type"}, dbqRequest.getDocTypes());

// RestDeleteByQueryAction itself doesn't check for a deprecated type usage
Expand All @@ -57,7 +57,8 @@ public void testTypeInPath() throws IOException {
}

public void testParseEmpty() throws IOException {
DeleteByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build());
final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build();
DeleteByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent;
Expand All @@ -30,6 +31,7 @@
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.Arrays;

import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -61,7 +63,8 @@ public void testPipelineQueryParameterIsError() throws IOException {
request.withContent(BytesReference.bytes(body), body.contentType());
}
request.withParams(singletonMap("pipeline", "doesn't matter"));
Exception e = expectThrows(IllegalArgumentException.class, () -> action.buildRequest(request.build()));
Exception e = expectThrows(IllegalArgumentException.class, () ->
action.buildRequest(request.build(), new NamedWriteableRegistry(Collections.emptyList())));

assertEquals("_reindex doesn't support [pipeline] as a query parameter. Specify it in the [dest] object instead.", e.getMessage());
}
Expand All @@ -70,14 +73,14 @@ public void testSetScrollTimeout() throws IOException {
{
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT, request.getScrollTime());
}
{
FakeRestRequest.Builder requestBuilder = new FakeRestRequest.Builder(xContentRegistry());
requestBuilder.withParams(singletonMap("scroll", "10m"));
requestBuilder.withContent(new BytesArray("{}"), XContentType.JSON);
ReindexRequest request = action.buildRequest(requestBuilder.build());
ReindexRequest request = action.buildRequest(requestBuilder.build(), new NamedWriteableRegistry(Collections.emptyList()));
assertEquals("10m", request.getScrollTime().toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testTypeInPath() throws IOException {

// checks the type in the URL is propagated correctly to the request object
// only works after the request is dispatched, so its params are filled from url.
UpdateByQueryRequest ubqRequest = action.buildRequest(request);
UpdateByQueryRequest ubqRequest = action.buildRequest(request, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertArrayEquals(new String[]{"some_type"}, ubqRequest.getDocTypes());

// RestUpdateByQueryAction itself doesn't check for a deprecated type usage
Expand All @@ -58,7 +58,8 @@ public void testTypeInPath() throws IOException {
}

public void testParseEmpty() throws IOException {
UpdateByQueryRequest request = action.buildRequest(new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build());
final FakeRestRequest restRequest = new FakeRestRequest.Builder(new NamedXContentRegistry(emptyList())).build();
UpdateByQueryRequest request = action.buildRequest(restRequest, DEFAULT_NAMED_WRITABLE_REGISTRY);
assertEquals(AbstractBulkByScrollRequest.SIZE_ALL_MATCHES, request.getSize());
assertEquals(AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE, request.getSearchRequest().source().size());
}
Expand Down
Loading