Skip to content

Commit

Permalink
Handle partial search result with point in time (#81349) (#81534)
Browse files Browse the repository at this point in the history
Today, a search request with PIT would fail immediately if any 
associated indices or nodes are gone, which is inconsistent when
allow_partial_search_results is true.

Relates #81256
  • Loading branch information
dnhatn authored Dec 8, 2021
1 parent ed377fd commit 3d6093b
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 34 deletions.
5 changes: 3 additions & 2 deletions docs/reference/search/point-in-time-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ POST /_search <1>
}
}
--------------------------------------------------
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> A search request with the `pit` parameter must not specify `index`, `routing`,
and {ref}/search-request-body.html#request-body-search-preference[`preference`]
Expand Down Expand Up @@ -88,7 +88,8 @@ Additionally, if a segment contains deleted or updated documents then the
point in time must keep track of whether each document in the segment was live at
the time of the initial search request. Ensure that your nodes have sufficient heap
space if you have many open point-in-times on an index that is subject to ongoing
deletes or updates.
deletes or updates. Note that a point-in-time doesn't prevent its associated indices
from being deleted.

You can check how many point-in-times (i.e, search contexts) are open with the
<<cluster-nodes-stats,nodes stats API>>:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ GET /_search
]
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID for the search.
<2> Sorts hits for the search with an implicit tiebreak on `_shard_doc` ascending.
Expand Down Expand Up @@ -138,7 +138,7 @@ GET /_search
]
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID for the search.
<2> Sorts hits for the search with an explicit tiebreak on `_shard_doc` descending.
Expand Down Expand Up @@ -205,7 +205,7 @@ GET /_search
"track_total_hits": false <3>
}
----
// TEST[catch:missing]
// TEST[catch:unavailable]

<1> PIT ID returned by the previous search.
<2> Sort values from the previous search's last hit.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

package org.elasticsearch.action.search;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
Expand Down Expand Up @@ -241,21 +241,38 @@ public void testIndexNotFound() {
}
refresh();
String pit = openPointInTime(new String[] { "index-*" }, TimeValue.timeValueMinutes(2));
SearchResponse resp1 = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
assertNoFailures(resp1);
assertHitCount(resp1, index1 + index2);
client().admin().indices().prepareDelete("index-1").get();
if (randomBoolean()) {
SearchResponse resp2 = client().prepareSearch("index-*").get();
assertNoFailures(resp2);
assertHitCount(resp2, index2);
try {
SearchResponse resp = client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get();
assertNoFailures(resp);
assertHitCount(resp, index1 + index2);
client().admin().indices().prepareDelete("index-1").get();
if (randomBoolean()) {
resp = client().prepareSearch("index-*").get();
assertNoFailures(resp);
assertHitCount(resp, index2);
}

// Allow partial search result
resp = client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pit))
.get();
assertFailures(resp);
assertHitCount(resp, index2);

// Do not allow partial search result
expectThrows(
ElasticsearchException.class,
() -> client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(false)
.setPointInTime(new PointInTimeBuilder(pit))
.get()
);
} finally {
closePointInTime(pit);
}
expectThrows(
IndexNotFoundException.class,
() -> client().prepareSearch().setPreference(null).setPointInTime(new PointInTimeBuilder(pit)).get()
);
closePointInTime(resp1.pointInTimeId());
}

public void testCanMatch() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.search.SearchPhaseResult;
Expand Down Expand Up @@ -914,6 +916,10 @@ private void executeSearch(
) {

clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ);
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}

// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
Expand All @@ -931,7 +937,8 @@ private void executeSearch(
localIndices,
searchRequest.getLocalClusterAlias(),
searchContext,
searchRequest.pointInTimeBuilder().getKeepAlive()
searchRequest.pointInTimeBuilder().getKeepAlive(),
searchRequest.allowPartialSearchResults()
);
} else {
final Index[] indices = resolveLocalIndices(localIndices, clusterState, timeProvider);
Expand Down Expand Up @@ -988,10 +995,6 @@ private void executeSearch(
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard
searchRequest.searchType(QUERY_THEN_FETCH);
}
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
}
if (searchRequest.isSuggestOnly()) {
// disable request cache if we have only suggest
searchRequest.requestCache(false);
Expand Down Expand Up @@ -1413,22 +1416,35 @@ static List<SearchShardIterator> getLocalLocalShardsIteratorFromPointInTime(
OriginalIndices originalIndices,
String localClusterAlias,
SearchContextId searchContext,
TimeValue keepAlive
TimeValue keepAlive,
boolean allowPartialSearchResults
) {
final List<SearchShardIterator> iterators = new ArrayList<>(searchContext.shards().size());
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
final SearchContextIdForNode perNode = entry.getValue();
if (Strings.isEmpty(perNode.getClusterAlias())) {
final ShardId shardId = entry.getKey();
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
final List<String> targetNodes = new ArrayList<>(shards.size());
targetNodes.add(perNode.getNode());
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
final List<String> targetNodes = new ArrayList<>(2);
// Prefer executing shard requests on nodes that are part of PIT first.
if (clusterState.nodes().nodeExists(perNode.getNode())) {
targetNodes.add(perNode.getNode());
}
try {
final ShardIterator shards = OperationRouting.getShards(clusterState, shardId);
if (perNode.getSearchContextId().getSearcherId() != null) {
for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
}
}
}
} catch (IndexNotFoundException | ShardNotFoundException e) {
// We can hit these exceptions if the index was deleted after creating PIT or the cluster state on
// this coordinating node is outdated. It's fine to ignore these extra "retry-able" target shards
// when allowPartialSearchResults is false
if (allowPartialSearchResults == false) {
throw e;
}
}
OriginalIndices finalIndices = new OriginalIndices(
new String[] { shardId.getIndexName() },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -1273,7 +1275,7 @@ public void testLocalShardIteratorFromPointInTime() {
} else {
// relocated or no longer assigned
relocatedContexts.add(new ShardId(indexMetadata.getIndex(), shardId));
targetNode = UUIDs.randomBase64UUID();
targetNode = randomFrom(clusterState.nodes().getAllNodes()).getId();
}
contexts.put(
new ShardId(indexMetadata.getIndex(), shardId),
Expand All @@ -1292,7 +1294,8 @@ public void testLocalShardIteratorFromPointInTime() {
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive
keepAlive,
randomBoolean()
);
shardIterators.sort(Comparator.comparing(SearchShardIterator::shardId));
assertThat(shardIterators, hasSize(numberOfShards));
Expand All @@ -1319,5 +1322,38 @@ public void testLocalShardIteratorFromPointInTime() {
assertThat(shardIterator.getSearchContextId(), equalTo(context.getSearchContextId()));
assertThat(shardIterator.getSearchContextKeepAlive(), equalTo(keepAlive));
}

// Fails when some indices don't exist and `allowPartialSearchResults` is false.
ShardId anotherShardId = new ShardId(new Index("another-index", IndexMetadata.INDEX_UUID_NA_VALUE), randomIntBetween(0, 10));
contexts.put(
anotherShardId,
new SearchContextIdForNode(
null,
randomFrom(clusterState.nodes().getAllNodes()).getId(),
new ShardSearchContextId(UUIDs.randomBase64UUID(), randomNonNegativeLong(), null)
)
);
IndexNotFoundException error = expectThrows(IndexNotFoundException.class, () -> {
TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
clusterState,
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive,
false
);
});
assertThat(error.getIndex().getName(), equalTo("another-index"));
// Ok when some indices don't exist and `allowPartialSearchResults` is true.
Optional<SearchShardIterator> anotherShardIterator = TransportSearchAction.getLocalLocalShardsIteratorFromPointInTime(
clusterState,
OriginalIndices.NONE,
null,
new SearchContextId(contexts, aliasFilterMap),
keepAlive,
true
).stream().filter(si -> si.shardId().equals(anotherShardId)).findFirst();
assertTrue(anotherShardIterator.isPresent());
assertThat(anotherShardIterator.get().getTargetNodeIds(), hasSize(1));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package org.elasticsearch.index.engine.frozen;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClosePointInTimeAction;
import org.elasticsearch.action.search.ClosePointInTimeRequest;
Expand Down Expand Up @@ -46,6 +47,7 @@

import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_EXCLUDE_GROUP_SETTING;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand Down Expand Up @@ -272,4 +274,49 @@ public void testRetryPointInTime() throws Exception {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
}
}

public void testPointInTimeWithDeletedIndices() {
createIndex("index-1");
createIndex("index-2");

int index1 = randomIntBetween(10, 50);
for (int i = 0; i < index1; i++) {
String id = Integer.toString(i);
client().prepareIndex("index-1").setId(id).setSource("value", i).get();
}

int index2 = randomIntBetween(10, 50);
for (int i = 0; i < index2; i++) {
String id = Integer.toString(i);
client().prepareIndex("index-2").setId(id).setSource("value", i).get();
}

assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index-1", "index-2")).actionGet());
final OpenPointInTimeRequest openPointInTimeRequest = new OpenPointInTimeRequest("index-*").indicesOptions(
IndicesOptions.STRICT_EXPAND_OPEN_FORBID_CLOSED
).keepAlive(TimeValue.timeValueMinutes(2));

final String pitId = client().execute(OpenPointInTimeAction.INSTANCE, openPointInTimeRequest).actionGet().getPointInTimeId();
try {
client().admin().indices().prepareDelete("index-1").get();
// Return partial results if allow partial search result is allowed
SearchResponse resp = client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(true)
.setPointInTime(new PointInTimeBuilder(pitId))
.get();
assertFailures(resp);
assertHitCount(resp, index2);
// Fails if allow partial search result is not allowed
expectThrows(
ElasticsearchException.class,
client().prepareSearch()
.setPreference(null)
.setAllowPartialSearchResults(false)
.setPointInTime(new PointInTimeBuilder(pitId))::get
);
} finally {
client().execute(ClosePointInTimeAction.INSTANCE, new ClosePointInTimeRequest(pitId)).actionGet();
}
}
}

0 comments on commit 3d6093b

Please sign in to comment.