diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index fa80cf23b8075..2699f18d6f037 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -137,18 +137,7 @@ void transformRequest(SearchRequest request, ActionListener reque return; } - long[] pipelineStart = new long[1]; - - ActionListener finalListener = ActionListener.wrap(r -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformRequest(took); - requestListener.onResponse(new PipelinedRequest(this, r)); - }, e -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformRequest(took); - onTransformRequestFailure(); - requestListener.onFailure(new SearchPipelineProcessingException(e)); - }); + ActionListener finalListener = getTerminalSearchRequestActionListener(requestListener); // Chain listeners back-to-front ActionListener currentListener = finalListener; @@ -183,11 +172,25 @@ void transformRequest(SearchRequest request, ActionListener reque }, finalListener::onFailure); } - pipelineStart[0] = relativeTimeSupplier.getAsLong(); beforeTransformRequest(); currentListener.onResponse(request); } + private ActionListener getTerminalSearchRequestActionListener(ActionListener requestListener) { + final long pipelineStart = relativeTimeSupplier.getAsLong(); + + return ActionListener.wrap(r -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformRequest(took); + requestListener.onResponse(new PipelinedRequest(this, r)); + }, e -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart); + afterTransformRequest(took); + onTransformRequestFailure(); + requestListener.onFailure(new SearchPipelineProcessingException(e)); + }); + } + ActionListener transformResponseListener(SearchRequest request, ActionListener responseListener) { if (searchResponseProcessors.isEmpty()) { // No response transformation necessary