Skip to content

Commit

Permalink
Make search pipelines asynchronous
Browse files Browse the repository at this point in the history
If a search processor needs to make a call out to another
service, we should not risk blocking on the transport thread. We should
support async execution.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Oct 26, 2023
1 parent 5ae9333 commit 22b7c37
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 124 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Mute the query profile IT with concurrent execution ([#9840](https://github.com/opensearch-project/OpenSearch/pull/9840))
- Force merge with `only_expunge_deletes` honors max segment size ([#10036](https://github.com/opensearch-project/OpenSearch/pull/10036))
- Add the means to extract the contextual properties from HttpChannel, TcpCChannel and TrasportChannel without excessive typecasting ([#10562](https://github.com/opensearch-project/OpenSearch/pull/10562))
- Search pipelines now support asynchronous request and response processors to avoid blocking on a transport thread ([#10598](https://github.com/opensearch-project/OpenSearch/pull/10598))
- [Remote Store] Add Remote Store backpressure rejection stats to `_nodes/stats` ([#10524](https://github.com/opensearch-project/OpenSearch/pull/10524))
- [BUG] Fix java.lang.SecurityException in repository-gcs plugin ([#10642](https://github.com/opensearch-project/OpenSearch/pull/10642))
- Add telemetry tracer/metric enable flag and integ test. ([#10395](https://github.com/opensearch-project/OpenSearch/pull/10395))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,24 +506,51 @@ private void executeRequest(
ActionListener<SearchResponse> listener;
try {
searchRequest = searchPipelineService.resolvePipeline(originalSearchRequest);
listener = ActionListener.wrap(
r -> originalListener.onResponse(searchRequest.transformResponse(r)),
originalListener::onFailure
);
listener = searchRequest.transformResponseListener(originalListener);
} catch (Exception e) {
originalListener.onFailure(e);
return;
}

if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(searchRequest.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
ActionListener<SearchRequest> requestTransformListener = ActionListener.wrap(sr -> {
if (searchQueryMetricsEnabled) {
try {
searchQueryCategorizer.categorize(sr.source());
} catch (Exception e) {
logger.error("Error while trying to categorize the query.", e);
}
}
}

ActionListener<SearchSourceBuilder> rewriteListener = ActionListener.wrap(source -> {
ActionListener<SearchSourceBuilder> rewriteListener = buildRewriteListener(
sr,
task,
timeProvider,
searchAsyncActionProvider,
listener,
searchRequestOperationsListener
);
if (sr.source() == null) {
rewriteListener.onResponse(sr.source());
} else {
Rewriteable.rewriteAndFetch(
sr.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}, listener::onFailure);
searchRequest.transformRequest(requestTransformListener);
}

private ActionListener<SearchSourceBuilder> buildRewriteListener(
SearchRequest searchRequest,
Task task,
SearchTimeProvider timeProvider,
SearchAsyncActionProvider searchAsyncActionProvider,
ActionListener<SearchResponse> listener,
SearchRequestOperationsListener searchRequestOperationsListener
) {
return ActionListener.wrap(source -> {
if (source != searchRequest.source()) {
// only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch
// situations when source is rewritten to null due to a bug
Expand Down Expand Up @@ -634,15 +661,6 @@ private void executeRequest(
}
}
}, listener::onFailure);
if (searchRequest.source() == null) {
rewriteListener.onResponse(searchRequest.source());
} else {
Rewriteable.rewriteAndFetch(
searchRequest.source(),
searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis),
rewriteListener
);
}
}

static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) {
Expand Down
203 changes: 124 additions & 79 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.search.SearchPhaseResult;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -117,92 +119,135 @@ protected void afterResponseProcessor(Processor processor, long timeInNanos) {}

protected void onResponseProcessorFailed(Processor processor) {}

SearchRequest transformRequest(SearchRequest request) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
try {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
}
}
for (SearchRequestProcessor processor : searchRequestProcessors) {
beforeRequestProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
request = processor.processRequest(request);
} catch (Exception e) {
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
}
void transformRequest(SearchRequest request, ActionListener<SearchRequest> requestListener) throws SearchPipelineProcessingException {
if (searchRequestProcessors.isEmpty()) {
requestListener.onResponse(request);
return;
}

try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
request.writeTo(bytesStreamOutput);
try (StreamInput in = bytesStreamOutput.bytes().streamInput()) {
try (StreamInput input = new NamedWriteableAwareStreamInput(in, namedWriteableRegistry)) {
request = new SearchRequest(input);
}
} catch (Exception e) {
onTransformRequestFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
}
} catch (IOException e) {
requestListener.onFailure(new SearchPipelineProcessingException(e));
return;
}

long[] pipelineStart = new long[1];

ActionListener<SearchRequest> finalListener = ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformRequest(took);
requestListener.onResponse(new PipelinedRequest(this, r));
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformRequest(took);
onTransformRequestFailure();
requestListener.onFailure(new SearchPipelineProcessingException(e));
});

// Chain listeners back-to-front
ActionListener<SearchRequest> currentListener = finalListener;
for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchRequest> nextListener = currentListener;
SearchRequestProcessor processor = searchRequestProcessors.get(i);
currentListener = ActionListener.wrap(r -> {
long start = relativeTimeSupplier.getAsLong();
beforeRequestProcessor(processor);
processor.asyncProcessRequest(r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
nextListener.onResponse(rr);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterRequestProcessor(processor, took);
onRequestProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from request processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
nextListener.onResponse(r);
} else {
nextListener.onFailure(new SearchPipelineProcessingException(e));
}
}));
}, finalListener::onFailure);
}
return request;

pipelineStart[0] = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
currentListener.onResponse(request);
}

SearchResponse transformResponse(SearchRequest request, SearchResponse response) throws SearchPipelineProcessingException {
if (searchResponseProcessors.isEmpty() == false) {
long pipelineStart = relativeTimeSupplier.getAsLong();
beforeTransformResponse();
try {
for (SearchResponseProcessor processor : searchResponseProcessors) {
beforeResponseProcessor(processor);
long start = relativeTimeSupplier.getAsLong();
try {
response = processor.processResponse(request, response);
} catch (Exception e) {
onResponseProcessorFailed(processor);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
} else {
throw e;
}
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
ActionListener<SearchResponse> transformResponseListener(SearchRequest request, ActionListener<SearchResponse> responseListener) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
return responseListener;
}

long[] pipelineStart = new long[1];

final ActionListener<SearchResponse> originalListener = responseListener;
responseListener = ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
originalListener.onResponse(r);
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformResponse(took);
onTransformResponseFailure();
originalListener.onFailure(e);
});
ActionListener<SearchResponse> finalListener = responseListener; // Jump directly to this one on exception.

for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) {
final ActionListener<SearchResponse> currentFinalListener = responseListener;
final SearchResponseProcessor processor = searchResponseProcessors.get(i);

responseListener = ActionListener.wrap(r -> {
beforeResponseProcessor(processor);
final long start = relativeTimeSupplier.getAsLong();
processor.asyncProcessResponse(request, r, ActionListener.wrap(rr -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
currentFinalListener.onResponse(rr);
}, e -> {
onResponseProcessorFailed(processor);
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start);
afterResponseProcessor(processor, took);
if (processor.isIgnoreFailure()) {
logger.warn(
"The exception from response processor ["
+ processor.getType()
+ "] in the search pipeline ["
+ id
+ "] was ignored",
e
);
// Pass the previous response through to the next processor in the chain
currentFinalListener.onResponse(r);
} else {
currentFinalListener.onFailure(new SearchPipelineProcessingException(e));
}
}
} catch (Exception e) {
onTransformResponseFailure();
throw new SearchPipelineProcessingException(e);
} finally {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformResponse(took);
}
}));
}, finalListener::onFailure);
}
return response;
final ActionListener<SearchResponse> chainListener = responseListener;
return ActionListener.wrap(r -> {
beforeTransformResponse();
pipelineStart[0] = relativeTimeSupplier.getAsLong();
chainListener.onResponse(r);
}, originalListener::onFailure);

}

<Result extends SearchPhaseResult> void runSearchPhaseResultsTransformer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.action.search.SearchPhaseResults;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.core.action.ActionListener;
import org.opensearch.search.SearchPhaseResult;

/**
Expand All @@ -27,8 +28,12 @@ public final class PipelinedRequest extends SearchRequest {
this.pipeline = pipeline;
}

public SearchResponse transformResponse(SearchResponse response) {
return pipeline.transformResponse(this, response);
public void transformRequest(ActionListener<SearchRequest> requestListener) {
pipeline.transformRequest(this, requestListener);
}

public ActionListener<SearchResponse> transformResponseListener(ActionListener<SearchResponse> responseListener) {
return pipeline.transformResponseListener(this, responseListener);
}

public <Result extends SearchPhaseResult> void transformSearchPhaseResults(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,8 +408,7 @@ public PipelinedRequest resolvePipeline(SearchRequest searchRequest) {
pipeline = pipelineHolder.pipeline;
}
}
SearchRequest transformedRequest = pipeline.transformRequest(searchRequest);
return new PipelinedRequest(pipeline, transformedRequest);
return new PipelinedRequest(pipeline, searchRequest);
}

Map<String, Processor.Factory<SearchRequestProcessor>> getRequestProcessorFactories() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,37 @@
package org.opensearch.search.pipeline;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.core.action.ActionListener;

/**
* Interface for a search pipeline processor that modifies a search request.
*/
public interface SearchRequestProcessor extends Processor {

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Implement this method if the processor makes no asynchronous calls.
* @param request the executed {@link SearchRequest}
* @return a new {@link SearchRequest} (or the input {@link SearchRequest} if no changes)
* @throws Exception if an error occurs during processing
*/
SearchRequest processRequest(SearchRequest request) throws Exception;

/**
* Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase}
* executes.
* <p>
* Expert method: Implement this if the processor needs to make asynchronous calls. Otherwise, implement processRequest.
* @param request the executed {@link SearchRequest}
* @param requestListener callback to be invoked on successful processing or on failure
*/
default void asyncProcessRequest(SearchRequest request, ActionListener<SearchRequest> requestListener) {
try {
requestListener.onResponse(processRequest(request));
} catch (Exception e) {
requestListener.onFailure(e);
}
}
}
Loading

0 comments on commit 22b7c37

Please sign in to comment.