Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle partial search result with point in time #81349

Merged
merged 7 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ POST /_search <1>
}
}
--------------------------------------------------
// TEST[catch:missing]
// TEST[catch:unavailable]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to change this as the allow_partial_results defaults to true.


<1> A search request with the `pit` parameter must not specify `index`, `routing`,
and {ref}/search-request-body.html#request-body-search-preference[`preference`]
Expand Down Expand Up @@ -88,7 +88,8 @@ Additionally, if a segment contains deleted or updated documents then the
point in time must keep track of whether each document in the segment was live at
the time of the initial search request. Ensure that your nodes have sufficient heap
space if you have many open point-in-times on an index that is subject to ongoing
deletes or updates.
deletes or updates. Note that a point-in-time doesn't prevent its associated indices
from being deleted.

You can check how many point-in-times (i.e, search contexts) are open with the
<<cluster-nodes-stats,nodes stats API>>:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ GET /_search
]
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID for the search.
<2> Sorts hits for the search with an implicit tiebreak on `_shard_doc` ascending.
Expand Down Expand Up @@ -138,7 +138,7 @@ GET /_search
]
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID for the search.
<2> Sorts hits for the search with an explicit tiebreak on `_shard_doc` descending.
Expand Down Expand Up @@ -205,7 +205,7 @@ GET /_search
"track_total_hits": false <3>
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID returned by the previous search.
<2> Sort values from the previous search's last hit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

package org.elasticsearch.action.search;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
Expand Down Expand Up @@ -241,21 +241,38 @@ public void testIndexNotFound() {
}
refresh();
String pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2));
SearchResponse resp1 = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
assertNoFailures(resp1);
assertHitCount(resp1, index1 + index2);
client().admin().indices().prepareDelete("index-1").get();
if (randomBoolean()) {
SearchResponse resp2 = client().prepareSearch("index-*").get();
assertNoFailures(resp2);
assertHitCount(resp2, index2);
try {
SearchResponse resp = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
assertNoFailures(resp);
assertHitCount(resp, index1 + index2);
client().admin().indices().prepareDelete("index-1").get();
if (randomBoolean()) {
resp = client().prepareSearch("index-*").get();
assertNoFailures(resp);
assertHitCount(resp, index2);
}

// Allow partial search result
resp = client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pit))
.get();
assertFailures(resp);
assertHitCount(resp, index2);

// Do not allow partial search result
expectThrows(
ElasticsearchException.class,
() -> client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(false)
.setPointInTime(new PointInTimeBuilder(pit))
.get()
);
} finally {
closePointInTime(pit);
}
expectThrows(
IndexNotFoundException.class,
() -> client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get()
);
closePointInTime(resp1.pointInTimeId());
}

public void testCanMatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -931,7 +933,8 @@ private void executeSearch(
localIndices,
searchRequest.getLocalClusterAlias(),
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive()
searchRequest.pointInTimeBuilder().getKeepAlive(),
searchRequest.allowPartialSearchResults()
);
} else {
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
Expand Down Expand Up @@ -1413,22 +1416,35 @@ static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(
OriginalIndices originalIndices,
String localClusterAlias,
SearchContextId searchContext,
TimeValue keepAlive
TimeValue keepAlive,
boolean allowPartialSearchResults
) {
final List<SearchShardIterator> iterators = new ArrayList<>(searchContext.shards().size());
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = new ArrayList<>(shards.size());
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
final List<String> targetNodes = new ArrayList<>(2);
// Prefer executing shard requests on nodes that are part of PIT first.
if (clusterState.nodes().nodeExists(perNode.getNode())) {
targetNodes.add(perNode.getNode());
}
try {
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
} catch (IndexNotFoundException | ShardNotFoundException e) {
// We can hit these exceptions if the index was deleted after creating PIT or the cluster state on
// this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards
// when allowPartialSearchResults is false
if (allowPartialSearchResults == false) {
throw e;
}
}
OriginalIndices finalIndices = new OriginalIndices(
new String[] { shardId.getIndexName() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1273,7 +1275,7 @@ public void testLocalShardIteratorFromPointInTime() {
} else {
// relocated or no longer assigned
relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
targetNode = UUIDs.randomBase64UUID();
targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
}
contexts.put(
new ShardId(indexMetadata.getIndex(), shardId),
Expand All @@ -1292,7 +1294,8 @@ public void testLocalShardIteratorFromPointInTime() {
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive
keepAlive,
randomBoolean()
);
shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId));
assertThat(shardIterators, hasSize(numberOfShards));
Expand All @@ -1319,5 +1322,38 @@ public void testLocalShardIteratorFromPointInTime() {
assertThat(shardIterator.getSearchContextId(), equalTo(context.getSearchContextId()));
assertThat(shardIterator.getSearchContextKeepAlive(), equalTo(keepAlive));
}

// Fails when some indices don't exist and `allowPartialSearchResults` is false.
ShardId anotherShardId = new ShardId(new Index("another-index", IndexMetadata.INDEX_UUID_NA_VALUE), randomIntBetween(0, 10));
contexts.put(
anotherShardId,
new SearchContextIdForNode(
null,
randomFrom(clusterState.nodes().getAllNodes()).getId(),
new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
)
);
IndexNotFoundException error = expectThrows(IndexNotFoundException.class, () -> {
TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
clusterState,
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive,
false
);
});
assertThat(error.getIndex().getName(), equalTo("another-index"));
// Ok when some indices don't exist and `allowPartialSearchResults` is true.
Optional<SearchShardIterator> anotherShardIterator = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
clusterState,
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive,
true
).stream().filter(si -> si.shardId().equals(anotherShardId)).findFirst();
assertTrue(anotherShardIterator.isPresent());
assertThat(anotherShardIterator.get().getTargetNodeIds(), hasSize(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.index.engine.frozen;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
Expand Down Expand Up @@ -46,6 +47,7 @@

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -272,4 +274,49 @@ public void testRetryPointInTime() throws Exception {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
}
}

public void testPointInTimeWithDeletedIndices() {
createIndex("index-1");
createIndex("index-2");

int index1 = randomIntBetween(10, 50);
for (int i = 0; i < index1; i++) {
String id = Integer.toString(i);
client().prepareIndex("index-1").setId(id).setSource("value", i).get();
}

int index2 = randomIntBetween(10, 50);
for (int i = 0; i < index2; i++) {
String id = Integer.toString(i);
client().prepareIndex("index-2").setId(id).setSource("value", i).get();
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index-1", "index-2")).actionGet());
final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest("index-*").indicesOptions(
IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED
).keepAlive(TimeValue.timeValueMinutes(2));

final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPointInTimeRequest).actionGet().getPointInTimeId();
try {
client().admin().indices().prepareDelete("index-1").get();
// Return partial results if allow partial search result is allowed
SearchResponse resp = client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertFailures(resp);
assertHitCount(resp, index2);
// Fails if allow partial search result is not allowed
expectThrows(
ElasticsearchException.class,
client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(false)
.setPointInTime(new PointInTimeBuilder(pitId))::get
);
} finally {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
}
}
}