diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index ac5731883595d..ed9fb35c2a45a 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -430,7 +430,13 @@ private void executeRequest( } ActionListener requestTransformListener = ActionListener.wrap(sr -> { - ActionListener rewriteListener = buildRewriteListener(sr, task,timeProvider, searchAsyncActionProvider, listener); + ActionListener rewriteListener = buildRewriteListener( + sr, + task, + timeProvider, + searchAsyncActionProvider, + listener + ); if (sr.source() == null) { rewriteListener.onResponse(sr.source()); } else { @@ -440,123 +446,125 @@ private void executeRequest( rewriteListener ); } - }, listener::onFailure); + }, listener::onFailure); searchRequest.transformRequest(requestTransformListener); } - private ActionListener buildRewriteListener(SearchRequest searchRequest, - Task task, - SearchTimeProvider timeProvider, - SearchAsyncActionProvider searchAsyncActionProvider, - ActionListener listener) { - return ActionListener.wrap(source -> { - if (source != searchRequest.source()) { - // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch - // situations when source is rewritten to null due to a bug - searchRequest.source(source); - } - final ClusterState clusterState = clusterService.state(); - final SearchContextId searchContext; - final Map remoteClusterIndices; - if (searchRequest.pointInTimeBuilder() != null) { - searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId()); - remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions()); - } else { - searchContext = null; - remoteClusterIndices = remoteClusterService.groupIndices( - searchRequest.indicesOptions(), - searchRequest.indices(), - idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) - ); - } - OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - if (remoteClusterIndices.isEmpty()) { - executeLocalSearch( - task, - timeProvider, - searchRequest, - localIndices, - clusterState, - listener, - searchContext, - searchAsyncActionProvider - ); - } else { - if (shouldMinimizeRoundtrips(searchRequest)) { - ccsRemoteReduce( - searchRequest, - localIndices, - remoteClusterIndices, - timeProvider, - searchService.aggReduceContextBuilder(searchRequest.source()), - remoteClusterService, - threadPool, - listener, - (r, l) -> executeLocalSearch( - task, - timeProvider, - r, - localIndices, - clusterState, - l, - searchContext, - searchAsyncActionProvider - ) - ); - } else { - AtomicInteger skippedClusters = new AtomicInteger(0); - collectSearchShards( - searchRequest.indicesOptions(), - searchRequest.preference(), - searchRequest.routing(), - skippedClusters, - remoteClusterIndices, - remoteClusterService, - threadPool, - ActionListener.wrap(searchShardsResponses -> { - final BiFunction clusterNodeLookup = getRemoteClusterNodeLookup( - searchShardsResponses - ); - final Map remoteAliasFilters; - final List remoteShardIterators; - if (searchContext != null) { - remoteAliasFilters = searchContext.aliasFilter(); - remoteShardIterators = getRemoteShardsIteratorFromPointInTime( - searchShardsResponses, - searchContext, - searchRequest.pointInTimeBuilder().getKeepAlive(), - remoteClusterIndices - ); - } else { - remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses); - remoteShardIterators = getRemoteShardsIterator( - searchShardsResponses, - remoteClusterIndices, - remoteAliasFilters - ); - } - int localClusters = localIndices == null ? 0 : 1; - int totalClusters = remoteClusterIndices.size() + localClusters; - int successfulClusters = searchShardsResponses.size() + localClusters; - executeSearch( - (SearchTask) task, - timeProvider, - searchRequest, - localIndices, - remoteShardIterators, - clusterNodeLookup, - clusterState, - remoteAliasFilters, - listener, - new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()), - searchContext, - searchAsyncActionProvider - ); - }, listener::onFailure) - ); - } - } - }, listener::onFailure); + private ActionListener buildRewriteListener( + SearchRequest searchRequest, + Task task, + SearchTimeProvider timeProvider, + SearchAsyncActionProvider searchAsyncActionProvider, + ActionListener listener + ) { + return ActionListener.wrap(source -> { + if (source != searchRequest.source()) { + // only set it if it changed - we don't allow null values to be set but it might be already null. this way we catch + // situations when source is rewritten to null due to a bug + searchRequest.source(source); + } + final ClusterState clusterState = clusterService.state(); + final SearchContextId searchContext; + final Map remoteClusterIndices; + if (searchRequest.pointInTimeBuilder() != null) { + searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId()); + remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions()); + } else { + searchContext = null; + remoteClusterIndices = remoteClusterService.groupIndices( + searchRequest.indicesOptions(), + searchRequest.indices(), + idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) + ); + } + OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + if (remoteClusterIndices.isEmpty()) { + executeLocalSearch( + task, + timeProvider, + searchRequest, + localIndices, + clusterState, + listener, + searchContext, + searchAsyncActionProvider + ); + } else { + if (shouldMinimizeRoundtrips(searchRequest)) { + ccsRemoteReduce( + searchRequest, + localIndices, + remoteClusterIndices, + timeProvider, + searchService.aggReduceContextBuilder(searchRequest.source()), + remoteClusterService, + threadPool, + listener, + (r, l) -> executeLocalSearch( + task, + timeProvider, + r, + localIndices, + clusterState, + l, + searchContext, + searchAsyncActionProvider + ) + ); + } else { + AtomicInteger skippedClusters = new AtomicInteger(0); + collectSearchShards( + searchRequest.indicesOptions(), + searchRequest.preference(), + searchRequest.routing(), + skippedClusters, + remoteClusterIndices, + remoteClusterService, + threadPool, + ActionListener.wrap(searchShardsResponses -> { + final BiFunction clusterNodeLookup = getRemoteClusterNodeLookup( + searchShardsResponses + ); + final Map remoteAliasFilters; + final List remoteShardIterators; + if (searchContext != null) { + remoteAliasFilters = searchContext.aliasFilter(); + remoteShardIterators = getRemoteShardsIteratorFromPointInTime( + searchShardsResponses, + searchContext, + searchRequest.pointInTimeBuilder().getKeepAlive(), + remoteClusterIndices + ); + } else { + remoteAliasFilters = getRemoteAliasFilters(searchShardsResponses); + remoteShardIterators = getRemoteShardsIterator( + searchShardsResponses, + remoteClusterIndices, + remoteAliasFilters + ); + } + int localClusters = localIndices == null ? 0 : 1; + int totalClusters = remoteClusterIndices.size() + localClusters; + int successfulClusters = searchShardsResponses.size() + localClusters; + executeSearch( + (SearchTask) task, + timeProvider, + searchRequest, + localIndices, + remoteShardIterators, + clusterNodeLookup, + clusterState, + remoteAliasFilters, + listener, + new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get()), + searchContext, + searchAsyncActionProvider + ); + }, listener::onFailure) + ); + } + } + }, listener::onFailure); } static boolean shouldMinimizeRoundtrips(SearchRequest searchRequest) { diff --git a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java index a32ac470b4e5e..fa80cf23b8075 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java +++ b/server/src/main/java/org/opensearch/search/pipeline/Pipeline.java @@ -95,41 +95,29 @@ List getSearchPhaseResultsProcessors() { return searchPhaseResultsProcessors; } - protected void beforeTransformRequest() { - } + protected void beforeTransformRequest() {} - protected void afterTransformRequest(long timeInNanos) { - } + protected void afterTransformRequest(long timeInNanos) {} - protected void onTransformRequestFailure() { - } + protected void onTransformRequestFailure() {} - protected void beforeRequestProcessor(Processor processor) { - } + protected void beforeRequestProcessor(Processor processor) {} - protected void afterRequestProcessor(Processor processor, long timeInNanos) { - } + protected void afterRequestProcessor(Processor processor, long timeInNanos) {} - protected void onRequestProcessorFailed(Processor processor) { - } + protected void onRequestProcessorFailed(Processor processor) {} - protected void beforeTransformResponse() { - } + protected void beforeTransformResponse() {} - protected void afterTransformResponse(long timeInNanos) { - } + protected void afterTransformResponse(long timeInNanos) {} - protected void onTransformResponseFailure() { - } + protected void onTransformResponseFailure() {} - protected void beforeResponseProcessor(Processor processor) { - } + protected void beforeResponseProcessor(Processor processor) {} - protected void afterResponseProcessor(Processor processor, long timeInNanos) { - } + protected void afterResponseProcessor(Processor processor, long timeInNanos) {} - protected void onResponseProcessorFailed(Processor processor) { - } + protected void onResponseProcessorFailed(Processor processor) {} void transformRequest(SearchRequest request, ActionListener requestListener) throws SearchPipelineProcessingException { if (searchRequestProcessors.isEmpty()) { @@ -162,41 +150,37 @@ void transformRequest(SearchRequest request, ActionListener reque requestListener.onFailure(new SearchPipelineProcessingException(e)); }); - // Chain listeners back-to-front ActionListener currentListener = finalListener; for (int i = searchRequestProcessors.size() - 1; i >= 0; i--) { final ActionListener nextListener = currentListener; SearchRequestProcessor processor = searchRequestProcessors.get(i); - currentListener = ActionListener.wrap( - r -> { - long start = relativeTimeSupplier.getAsLong(); - beforeRequestProcessor(processor); - processor.asyncProcessRequest(r, ActionListener.wrap(rr -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); - afterRequestProcessor(processor, took); - nextListener.onResponse(rr); - }, e -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); - afterRequestProcessor(processor, took); - onRequestProcessorFailed(processor); - if (processor.isIgnoreFailure()) { - logger.warn( - "The exception from request processor [" - + processor.getType() - + "] in the search pipeline [" - + id - + "] was ignored", - e - ); - nextListener.onResponse(r); - } else { - nextListener.onFailure(new SearchPipelineProcessingException(e)); - } - })); - }, - finalListener::onFailure - ); + currentListener = ActionListener.wrap(r -> { + long start = relativeTimeSupplier.getAsLong(); + beforeRequestProcessor(processor); + processor.asyncProcessRequest(r, ActionListener.wrap(rr -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterRequestProcessor(processor, took); + nextListener.onResponse(rr); + }, e -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterRequestProcessor(processor, took); + onRequestProcessorFailed(processor); + if (processor.isIgnoreFailure()) { + logger.warn( + "The exception from request processor [" + + processor.getType() + + "] in the search pipeline [" + + id + + "] was ignored", + e + ); + nextListener.onResponse(r); + } else { + nextListener.onFailure(new SearchPipelineProcessingException(e)); + } + })); + }, finalListener::onFailure); } pipelineStart[0] = relativeTimeSupplier.getAsLong(); @@ -213,63 +197,56 @@ ActionListener transformResponseListener(SearchRequest request, long[] pipelineStart = new long[1]; final ActionListener originalListener = responseListener; - responseListener = ActionListener.wrap( - r -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformResponse(took); - originalListener.onResponse(r); - }, - e -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); - afterTransformResponse(took); - onTransformResponseFailure(); - originalListener.onFailure(e); - } - ); + responseListener = ActionListener.wrap(r -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); + afterTransformResponse(took); + originalListener.onResponse(r); + }, e -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - pipelineStart[0]); + afterTransformResponse(took); + onTransformResponseFailure(); + originalListener.onFailure(e); + }); ActionListener finalListener = responseListener; // Jump directly to this one on exception. for (int i = searchResponseProcessors.size() - 1; i >= 0; i--) { final ActionListener currentFinalListener = responseListener; final SearchResponseProcessor processor = searchResponseProcessors.get(i); - responseListener = ActionListener.wrap( - r -> { - beforeResponseProcessor(processor); - final long start = relativeTimeSupplier.getAsLong(); - processor.asyncProcessResponse(request, r, ActionListener.wrap(rr -> { - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); - afterResponseProcessor(processor, took); - currentFinalListener.onResponse(rr); - }, e -> { - onResponseProcessorFailed(processor); - long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); - afterResponseProcessor(processor, took); - if (processor.isIgnoreFailure()) { - logger.warn( - "The exception from response processor [" - + processor.getType() - + "] in the search pipeline [" - + id - + "] was ignored", - e - ); - // Pass the previous response through to the next processor in the chain - currentFinalListener.onResponse(r); - } else { - currentFinalListener.onFailure(new SearchPipelineProcessingException(e)); - } - })); - }, - finalListener::onFailure - ); + responseListener = ActionListener.wrap(r -> { + beforeResponseProcessor(processor); + final long start = relativeTimeSupplier.getAsLong(); + processor.asyncProcessResponse(request, r, ActionListener.wrap(rr -> { + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterResponseProcessor(processor, took); + currentFinalListener.onResponse(rr); + }, e -> { + onResponseProcessorFailed(processor); + long took = TimeUnit.NANOSECONDS.toMillis(relativeTimeSupplier.getAsLong() - start); + afterResponseProcessor(processor, took); + if (processor.isIgnoreFailure()) { + logger.warn( + "The exception from response processor [" + + processor.getType() + + "] in the search pipeline [" + + id + + "] was ignored", + e + ); + // Pass the previous response through to the next processor in the chain + currentFinalListener.onResponse(r); + } else { + currentFinalListener.onFailure(new SearchPipelineProcessingException(e)); + } + })); + }, finalListener::onFailure); } final ActionListener chainListener = responseListener; return ActionListener.wrap(r -> { - beforeTransformResponse(); - pipelineStart[0] = relativeTimeSupplier.getAsLong(); - chainListener.onResponse(r); - }, - originalListener::onFailure); + beforeTransformResponse(); + pipelineStart[0] = relativeTimeSupplier.getAsLong(); + chainListener.onResponse(r); + }, originalListener::onFailure); } diff --git a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java index ec90e4201627f..0501d1ee45403 100644 --- a/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java +++ b/server/src/main/java/org/opensearch/search/pipeline/SearchRequestProcessor.java @@ -16,7 +16,6 @@ */ public interface SearchRequestProcessor extends Processor { - /** * Transform a {@link SearchRequest}. Executed on the coordinator node before any {@link org.opensearch.action.search.SearchPhase} * executes. diff --git a/server/src/test/java/org/opensearch/cli/ParseLuceneChanges.java b/server/src/test/java/org/opensearch/cli/ParseLuceneChanges.java deleted file mode 100644 index 756ede1b02517..0000000000000 --- a/server/src/test/java/org/opensearch/cli/ParseLuceneChanges.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cli; - -import org.opensearch.core.common.Strings; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.PrintWriter; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.atomic.LongAdder; - -public class ParseLuceneChanges { - private static class ChangelogEntry { - private final String issueUrl; - private final String description; - private final List authors; - private final List companies; - - public ChangelogEntry(String issueUrl, String description, List authors, List companies) { - this.issueUrl = issueUrl; - this.description = description; - this.authors = authors; - this.companies = companies; - } - } - - private static final Map COMMITTER_COMPANY_MAPPING = new HashMap<>(); - public static void main(String[] args) throws IOException { - List allLines = Files.readAllLines(Path.of(args[0])); - Map> linesByRelease = splitByRelease(allLines, args[1]); - Map contributionsByCompany = new HashMap<>(); - Path outputFile = Path.of(args[2]); - // Assume committer company mapping is next to the output file - initializeCompanyMapping(outputFile.getParent()); - try (PrintWriter outputWriter = new PrintWriter(Files.newBufferedWriter(outputFile))) { - for (Map.Entry> releaseEntry : linesByRelease.entrySet()) { - for (Map.Entry> sctionEntry : splitBySection(releaseEntry.getValue()).entrySet()) { - for (ChangelogEntry changelogEntry : convertToChangelog(sctionEntry.getValue())) { - Set creditedCompanies = new HashSet<>(); - for (String company : changelogEntry.companies) { - if (!creditedCompanies.contains(company)) { - contributionsByCompany.computeIfAbsent(company, k -> new LongAdder()).increment(); - creditedCompanies.add(company); - } - } - String entryBuilder = releaseEntry.getKey() + - '\t' + - sctionEntry.getKey() + - '\t' + - changelogEntry.description + - '\t' + - changelogEntry.issueUrl + - '\t' + - Strings.collectionToCommaDelimitedString(changelogEntry.authors); - outputWriter.println(entryBuilder); - } - } - } - } - for (Map.Entry contributionByCompany : contributionsByCompany.entrySet()) { - System.out.println(contributionByCompany.getKey() + " : " + contributionByCompany.getValue().intValue()); - } - } - - - private static List convertToChangelog(List sectionLines) { - StringBuilder curEntry = new StringBuilder(); - List changelogEntries = new ArrayList<>(); - for (String line : sectionLines) { - if (line.startsWith("* ")) { - if (curEntry.length() > 0) { - changelogEntries.add(parseEntry(curEntry.toString())); - curEntry.setLength(0); - } - curEntry.append(line.substring(2).trim()); - } else if (!line.trim().isEmpty()) { - curEntry.append(" ").append(line.trim()); - } - } - if (curEntry.length() > 0) { - ChangelogEntry entry = parseEntry(curEntry.toString()); - if (entry != null) { - changelogEntries.add(entry); - } - } - return changelogEntries; - } - - private static ChangelogEntry parseEntry(String singleLineEntry) { - if ("(No changes)".equalsIgnoreCase(singleLineEntry.trim())) { - return null; - } - int colonPos = singleLineEntry.indexOf(':'); - if (colonPos < 0 || singleLineEntry.substring(0, colonPos).contains(" ")) { - // Some changelog entries don't have a colon after the issue ID. It's annoying. - if (singleLineEntry.startsWith("LUCENE-") || singleLineEntry.startsWith("GITHUB#")) { - colonPos = singleLineEntry.indexOf(' '); - } else { - // There is no issue ID - colonPos = 0; - } - if (colonPos < 0) { - throw new IllegalArgumentException(); - } - } - String issueId = singleLineEntry.substring(0, colonPos).trim(); - List authorList = new ArrayList<>(); - int lastCloseParenPos = singleLineEntry.lastIndexOf(')'); - int endOfDesc = singleLineEntry.length(); - if (lastCloseParenPos > 0) { - // Some issues (e.g. GITHUB#12296) don't have authors in the changelog. - int authorStart = singleLineEntry.substring(0, lastCloseParenPos).lastIndexOf('('); - endOfDesc = authorStart; - String[] authors = Strings.commaDelimitedListToStringArray(singleLineEntry.substring(authorStart + 1, lastCloseParenPos)); - for (String author : authors) { - if (author.contains(" via ")) { - String author1 = author.substring(0, author.indexOf(" via ")).trim(); - String author2 = author.substring(author.indexOf(" via ") + 5).trim(); - authorList.add(author1.trim()); - authorList.add(author2.trim()); - } else if (author.contains(" and ")) { - String author1 = author.substring(0, author.indexOf(" and ")).trim(); - String author2 = author.substring(author.indexOf(" and ") + 5).trim(); - authorList.add(author1.trim()); - authorList.add(author2.trim()); - } else { - authorList.add(author.trim()); - } - } - } - if (colonPos > endOfDesc) { - throw new IllegalArgumentException(); - } - String description = singleLineEntry.substring(colonPos + 1, endOfDesc).trim(); - List companyAffiliations = new ArrayList<>(); - for (String author : authorList) { - String normalizedAuthor = author.toLowerCase(Locale.ROOT); - if (COMMITTER_COMPANY_MAPPING.containsKey(normalizedAuthor)) { - companyAffiliations.add(COMMITTER_COMPANY_MAPPING.get(normalizedAuthor)); - } - } - - return new ChangelogEntry(convertIssueIdToUrl(issueId), description, authorList, companyAffiliations); - } - - private static void initializeCompanyMapping(Path parent) throws IOException { - Path mappingFile = parent.resolve("committer_companies.csv"); - if (Files.exists(mappingFile)) { - List lines = Files.readAllLines(mappingFile); - for (String line : lines) { - int commaPos = line.indexOf(','); - if (commaPos > 0) { - COMMITTER_COMPANY_MAPPING.put(line.substring(0, commaPos), line.substring(commaPos + 1)); - } - } - } - } - - private static String convertIssueIdToUrl(String issueId) { - if (issueId.isEmpty()) { - return issueId; - } - // Just in case - issueId = issueId.toUpperCase(Locale.ROOT); - if (issueId.startsWith("LUCENE-") || issueId.startsWith("SOLR-")) { - // JIRA issue - return "https://issues.apache.org/jira/browse/" + issueId; - } else if (issueId.startsWith("GITHUB#")) { - int octothorpPos = issueId.indexOf('#'); - return "https://github.com/apache/lucene/issues/" + issueId.substring(octothorpPos + 1); - } - throw new IllegalArgumentException("Unknown issue ID: " + issueId); - } - - private static Map> splitBySection(List releaseLines) { - int start = -1; - Map> linesBySection = new LinkedHashMap<>(); - String sectionName = null; - for (int i = 0; i < releaseLines.size(); i++) { - String curLine = releaseLines.get(i); - if (curLine.startsWith("-----------")) { - // The previous line has the section title - if (start > 0) { - linesBySection.put(sectionName, releaseLines.subList(start, i - 2)); - } - start = i + 1; - sectionName = releaseLines.get(i - 1).trim(); - } - } - if (start > 0) { - linesBySection.put(sectionName, releaseLines.subList(start, releaseLines.size())); - } - return linesBySection; - } - - private static Map> splitByRelease(List allLines, String earliestRelease) { - // There is some preamble. Skip that. - boolean started = false; - boolean done = false; - Map> linesByRelease = new LinkedHashMap<>(); - String currentRelease = null; - List releaseLines = new ArrayList<>(); - for (String line : allLines) { - if (line.startsWith("=================")) { - // Found a new release - if (started) { - linesByRelease.put(currentRelease, releaseLines); - if (done) { - return linesByRelease; - } - releaseLines = new ArrayList<>(); - } - started = true; - currentRelease = line.replaceAll("=", "").trim(); - if (currentRelease.contains(earliestRelease)) { - done = true; - } - } else if (started) { - releaseLines.add(line); - } - } - // Probably shouldn't get here, especially since we're not properly parsing the old changelogs. - linesByRelease.put(currentRelease, releaseLines); - return linesByRelease; - } -} diff --git a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java index 2ca11b7e5a9d0..98d2a7e84d672 100644 --- a/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java +++ b/server/src/test/java/org/opensearch/search/pipeline/SearchPipelineServiceTests.java @@ -67,10 +67,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import static org.mockito.ArgumentMatchers.anyString; @@ -666,9 +663,7 @@ private static SearchResponse syncTransformResponse(PipelinedRequest pipelinedRe Exception[] exceptionBox = new Exception[1]; ActionListener responseListener = pipelinedRequest.transformResponseListener(ActionListener.wrap(r -> { responseBox[0] = r; - }, e -> { - exceptionBox[0] = e; - })); + }, e -> { exceptionBox[0] = e; })); responseListener.onResponse(searchResponse); if (exceptionBox[0] != null) { @@ -684,10 +679,7 @@ private static PipelinedRequest syncTransformRequest(PipelinedRequest request) t PipelinedRequest[] requestBox = new PipelinedRequest[1]; Exception[] exceptionBox = new Exception[1]; - request.transformRequest(ActionListener.wrap( - r -> requestBox[0] = (PipelinedRequest) r, - e -> exceptionBox[0] = e - )); + request.transformRequest(ActionListener.wrap(r -> requestBox[0] = (PipelinedRequest) r, e -> exceptionBox[0] = e)); if (exceptionBox[0] != null) { throw exceptionBox[0]; } @@ -993,7 +985,10 @@ public void testExceptionOnRequestProcessing() { SearchRequest searchRequest = new SearchRequest().source(sourceBuilder); // Exception thrown when processing the request - expectThrows(SearchPipelineProcessingException.class, () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest))); + expectThrows( + SearchPipelineProcessingException.class, + () -> syncTransformRequest(searchPipelineService.resolvePipeline(searchRequest)) + ); } public void testExceptionOnResponseProcessing() throws Exception {