Skip to content

Commit

Permalink
Merge branch 'fb' of github.com:bharath-techie/OpenSearch into fb
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Jun 27, 2022
2 parents 8de0fac + 6016689 commit 9b12d89
Show file tree
Hide file tree
Showing 10 changed files with 120 additions and 465 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -131,6 +132,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -1303,6 +1305,27 @@ public void testClearScroll() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testCreatePit() throws IOException {
String[] indices = randomIndicesNames(0, 5);
Map<String, String> expectedParams = new HashMap<>();
expectedParams.put("keep_alive", "1d");
expectedParams.put("allow_partial_pit_creation", "true");
CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices);
setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams);
Request request = RequestConverters.createPit(createPitRequest);
StringJoiner endpoint = new StringJoiner("/", "/", "");
String index = String.join(",", indices);
if (Strings.hasLength(index)) {
endpoint.add(index);
}
endpoint.add("_search/point_in_time");
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(endpoint.toString(), request.getEndpoint());
assertEquals(expectedParams, request.getParameters());
assertToXContentBody(createPitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.index.Index;
import org.opensearch.index.query.Rewriteable;
Expand Down Expand Up @@ -297,6 +298,81 @@ void executeOnShardTarget(
);
}

public void executeRequest(
Task task,
SearchRequest searchRequest,
String actionName,
boolean includeSearchContext,
SinglePhaseSearchAction phaseSearchAction,
ActionListener<SearchResponse> listener
) {
executeRequest(task, searchRequest, new SearchAsyncActionProvider() {
@Override
public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
SearchTask task,
SearchRequest searchRequest,
Executor executor,
GroupShardsIterator<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
ActionListener<SearchResponse> listener,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
return new AbstractSearchAsyncAction<SearchPhaseResult>(
actionName,
logger,
searchTransportService,
connectionLookup,
aliasFilter,
concreteIndexBoosts,
indexRoutings,
executor,
searchRequest,
listener,
shardsIts,
timeProvider,
clusterState,
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters
) {
@Override
protected void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<SearchPhaseResult> listener
) {
final Transport.Connection connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
phaseSearchAction.executeOnShardTarget(task, shard, connection, listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase(getName()) {
@Override
public void run() {
final AtomicArray<SearchPhaseResult> atomicArray = results.getAtomicArray();
sendSearchResponse(InternalSearchResponse.empty(), atomicArray);
}
};
}

@Override
boolean buildPointInTimeFromSearchResults() {
return includeSearchContext;
}
};
}
}, listener);
}

private void executeRequest(
Task task,
SearchRequest searchRequest,
Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 9b12d89

Please sign in to comment.