diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a8d808f4..e8e5e6d7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD) ### Features ### Enhancements +* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111) ### Bug Fixes * Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946) ### Infrastructure diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index efb4bdf93..ff079031f 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.knn.index.KNNCircuitBreaker; +import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider; import org.opensearch.knn.index.util.KNNClusterUtil; import org.opensearch.knn.index.query.KNNQueryBuilder; import org.opensearch.knn.index.KNNSettings; @@ -95,6 +96,7 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -349,4 +351,9 @@ public List getNamedXContent() { public Collection getSystemIndexDescriptors(Settings settings) { return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices")); } + + @Override + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new KNNConcurrentSearchRequestDecider.Factory()); + } } diff --git a/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java b/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java new file mode 100644 index 000000000..524291a31 --- /dev/null +++ b/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.plugin.search; + +import lombok.EqualsAndHashCode; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; + +import java.util.Optional; + +/** + * Decides if the knn query uses concurrent search + * As of 2.17, this is only used when + * - "index.search.concurrent_segment_search.mode": "auto" or + * - "search.concurrent_segment_search.mode": "auto" + * + * Note: the class is not thread-safe and a new instance needs to be created for each request + */ +@EqualsAndHashCode(callSuper = true) +public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider { + + private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.NO_OP, + "Default decision" + ); + private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.YES, + "Enable concurrent search for knn" + ); + + private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION; + + @Override + public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) { + if (queryBuilder instanceof KNNQueryBuilder) { + knnDecision = YES; + } else { + knnDecision = DEFAULT_KNN_DECISION; + } + } + + @Override + public ConcurrentSearchDecision getConcurrentSearchDecision() { + return knnDecision; + } + + public static class Factory implements ConcurrentSearchRequestDecider.Factory { + public Optional create(IndexSettings indexSettings) { + return Optional.of(new KNNConcurrentSearchRequestDecider()); + } + } +} diff --git a/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java b/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java new file mode 100644 index 000000000..a3e581a3a --- /dev/null +++ b/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java @@ -0,0 +1,151 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.integ.search; + +import com.google.common.primitives.Floats; +import lombok.SneakyThrows; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.BeforeClass; +import org.opensearch.client.Response; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.xcontent.XContentFactory; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.knn.KNNRestTestCase; +import org.opensearch.knn.KNNResult; +import org.opensearch.knn.TestUtils; +import org.opensearch.knn.common.KNNConstants; +import org.opensearch.knn.index.SpaceType; +import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.knn.plugin.script.KNNScoringUtil; + +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.opensearch.knn.common.KNNConstants.KNN_ENGINE; +import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW; +import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_SPACE_TYPE; +import static org.opensearch.knn.common.KNNConstants.NAME; +import static org.opensearch.knn.common.KNNConstants.PARAMETERS; + +/** + * Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact + * There is no latency verification as it can be better encapsulated in nightly runs. + */ +public class ConcurrentSegmentSearchIT extends KNNRestTestCase { + + static TestUtils.TestData testData; + + @BeforeClass + public static void setUpClass() throws IOException { + if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) { + throw new IllegalStateException("ClassLoader of FaissIT Class is null"); + } + URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json"); + URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv"); + assert testIndexVectors != null; + assert testQueries != null; + testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath()); + } + + @SneakyThrows + public void testConcurrentSegmentSearch() { + String indexName = "test-concurrent-segment"; + String fieldName = "test-field-1"; + int dimension = testData.indexData.vectors[0].length; + final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension); + Map mappingMap = xContentBuilderToMap(indexBuilder); + String mapping = indexBuilder.toString(); + createKnnIndex(indexName, mapping); + assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName))); + + // Index the test data + for (int i = 0; i < testData.indexData.docs.length; i++) { + addKnnDoc( + indexName, + Integer.toString(testData.indexData.docs[i]), + fieldName, + Floats.asList(testData.indexData.vectors[i]).toArray() + ); + } + refreshAllNonSystemIndices(); + updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); + + // Test search queries + int k = 10; + verifySearch(indexName, fieldName, k); + + updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); + verifySearch(indexName, fieldName, k); + } + + /* + { + "properties": { + "": { + "type": "knn_vector", + "dimension": , + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "faiss", + "parameters": { + "m": 16, + "ef_construction": 128, + "ef_search": 128 + } + } + } + } + */ + @SneakyThrows + private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) { + // Create an index + return XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + .startObject(fieldName) + .field("type", "knn_vector") + .field("dimension", dimension) + .startObject(KNNConstants.KNN_METHOD) + .field(NAME, METHOD_HNSW) + .field(METHOD_PARAMETER_SPACE_TYPE, SpaceType.L2.getValue()) + .field(KNN_ENGINE, KNNEngine.FAISS.getName()) + .startObject(PARAMETERS) + .field(KNNConstants.METHOD_PARAMETER_M, 16) + .field(KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION, 128) + .field(KNNConstants.METHOD_PARAMETER_EF_SEARCH, 128) + .endObject() + .endObject() + .endObject() + .endObject() + .endObject(); + } + + @SneakyThrows + private void verifySearch(String indexName, String fieldName, int k) { + for (int i = 0; i < testData.queries.length; i++) { + final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build(); + Response response = searchKNNIndex(indexName, queryBuilder, k); + String responseBody = EntityUtils.toString(response.getEntity()); + List knnResults = parseSearchResponse(responseBody, fieldName); + assertEquals(k, knnResults.size()); + + List actualScores = parseSearchResponseScore(responseBody, fieldName); + for (int j = 0; j < k; j++) { + float[] primitiveArray = knnResults.get(j).getVector(); + assertEquals( + KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2), + actualScores.get(j), + 0.0001 + ); + } + } + } +} diff --git a/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java b/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java new file mode 100644 index 000000000..c5cdeb23d --- /dev/null +++ b/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.plugin.search; + +import org.opensearch.index.IndexSettings; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.test.OpenSearchTestCase; + +import static org.mockito.Mockito.mock; + +public class KNNConcurrentSearchRequestDeciderTests extends OpenSearchTestCase { + + public void testDecider() { + ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision"); + + KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider(); + assertDecision(noop, decider.getConcurrentSearchDecision()); + + // Non KNNQueryBuilder + decider.evaluateForQuery(new MatchAllQueryBuilder(), mock(IndexSettings.class)); + assertDecision(noop, decider.getConcurrentSearchDecision()); + + decider.evaluateForQuery( + KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), + mock(IndexSettings.class) + ); + ConcurrentSearchDecision yes = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.YES, + "Enable concurrent search for knn" + ); + assertDecision(yes, decider.getConcurrentSearchDecision()); + + decider.evaluateForQuery(new MatchAllQueryBuilder(), mock(IndexSettings.class)); + assertDecision(noop, decider.getConcurrentSearchDecision()); + } + + public void testDeciderFactory() { + KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory(); + IndexSettings indexSettings = mock(IndexSettings.class); + assertNotSame(factory.create(indexSettings).get(), factory.create(indexSettings).get()); + } + + private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) { + assertEquals(expected.getDecisionReason(), actual.getDecisionReason()); + assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus()); + } + +}