Skip to content

Commit

Permalink
Compute pipelineStart before building request callback chain
Browse files Browse the repository at this point in the history
Also, IntelliJ suggested refactoring creation of the terminal request
callback into a separate method since the existing method was really
big. I liked that suggestion.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Oct 26, 2023
1 parent 22b7c37 commit 5bc661b
Showing 1 changed file with 16 additions and 13 deletions.
29 changes: 16 additions & 13 deletions server/src/main/java/org/opensearch/search/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,7 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque
return;

Check warning on line 137 in server/src/main/java/org/opensearch/search/pipeline/Pipeline.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/pipeline/Pipeline.java#L135-L137

Added lines #L135 - L137 were not covered by tests
}

long[] pipelineStart = new long[1];

ActionListener<SearchRequest> finalListener = ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformRequest(took);
requestListener.onResponse(new PipelinedRequest(this, r));
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]);
afterTransformRequest(took);
onTransformRequestFailure();
requestListener.onFailure(new SearchPipelineProcessingException(e));
});
ActionListener<SearchRequest> finalListener = getTerminalSearchRequestActionListener(requestListener);

// Chain listeners back-to-front
ActionListener<SearchRequest> currentListener = finalListener;
Expand Down Expand Up @@ -183,11 +172,25 @@ void transformRequest(SearchRequest request, ActionListener<SearchRequest> reque
}, finalListener::onFailure);
}

pipelineStart[0] = relativeTimeSupplier.getAsLong();
beforeTransformRequest();
currentListener.onResponse(request);
}

private ActionListener<SearchRequest> getTerminalSearchRequestActionListener(ActionListener<SearchRequest> requestListener) {
final long pipelineStart = relativeTimeSupplier.getAsLong();

return ActionListener.wrap(r -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
requestListener.onResponse(new PipelinedRequest(this, r));
}, e -> {
long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart);
afterTransformRequest(took);
onTransformRequestFailure();
requestListener.onFailure(new SearchPipelineProcessingException(e));
});
}

ActionListener<SearchResponse> transformResponseListener(SearchRequest request, ActionListener<SearchResponse> responseListener) {
if (searchResponseProcessors.isEmpty()) {
// No response transformation necessary
Expand Down

0 comments on commit 5bc661b

Please sign in to comment.