diff --git a/CHANGELOG.md b/CHANGELOG.md index f70ca5e24..8cba12c34 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD) ### Features ### Enhancements +* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111) ### Bug Fixes * Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946) ### Infrastructure diff --git a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java index 6bfcb7eee..cae8960bb 100644 --- a/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java +++ b/src/main/java/org/opensearch/knn/plugin/KNNPlugin.java @@ -13,6 +13,7 @@ import org.opensearch.index.engine.EngineFactory; import org.opensearch.indices.SystemIndexDescriptor; import org.opensearch.knn.index.KNNCircuitBreaker; +import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider; import org.opensearch.knn.index.util.KNNClusterUtil; import org.opensearch.knn.index.query.KNNQueryBuilder; import org.opensearch.knn.index.KNNSettings; @@ -96,6 +97,7 @@ import org.opensearch.script.ScriptContext; import org.opensearch.script.ScriptEngine; import org.opensearch.script.ScriptService; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; import org.opensearch.threadpool.ThreadPool; @@ -373,4 +375,9 @@ public Settings additionalSettings() { ).collect(Collectors.toList()); return Settings.builder().putList(IndexModule.INDEX_STORE_HYBRID_MMAP_EXTENSIONS.getKey(), combinedSettings).build(); } + + @Override + public Optional getConcurrentSearchRequestDeciderFactory() { + return Optional.of(new KNNConcurrentSearchRequestDecider.Factory()); + } } diff --git a/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java b/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java new file mode 100644 index 000000000..92b0a3e3c --- /dev/null +++ b/src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.plugin.search; + +import lombok.EqualsAndHashCode; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.search.deciders.ConcurrentSearchDecision; +import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; + +import java.util.Optional; + +/** + * Decides if the knn query uses concurrent segment search + * As of 2.17, this is only used when + * - "index.search.concurrent_segment_search.mode": "auto" or + * - "search.concurrent_segment_search.mode": "auto" + * + * Note: the class is not thread-safe and a new instance needs to be created for each request + */ +@EqualsAndHashCode(callSuper = true) +public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider { + + private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.NO_OP, + "Default decision" + ); + private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.YES, + "Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index" + ); + + private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION; + + @Override + public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) { + if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) { + knnDecision = YES; + } else { + knnDecision = DEFAULT_KNN_DECISION; + } + } + + @Override + public ConcurrentSearchDecision getConcurrentSearchDecision() { + return knnDecision; + } + + /** + * Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true + */ + public static class Factory implements ConcurrentSearchRequestDecider.Factory { + public Optional create(final IndexSettings indexSettings) { + if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) { + return Optional.of(new KNNConcurrentSearchRequestDecider()); + } + return Optional.empty(); + } + } +} diff --git a/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java b/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java new file mode 100644 index 000000000..cc64cfe9c --- /dev/null +++ b/src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java @@ -0,0 +1,141 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.integ.search; + +import com.google.common.primitives.Floats; +import lombok.SneakyThrows; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.junit.BeforeClass; +import org.opensearch.client.Response; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.knn.KNNJsonIndexMappingsBuilder; +import org.opensearch.knn.KNNRestTestCase; +import org.opensearch.knn.KNNResult; +import org.opensearch.knn.TestUtils; +import org.opensearch.knn.index.SpaceType; +import org.opensearch.knn.index.engine.KNNEngine; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.knn.plugin.script.KNNScoringUtil; + +import java.io.IOException; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW; + +/** + * Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact + * There is no latency verification as it can be better encapsulated in nightly runs. + */ +public class ConcurrentSegmentSearchIT extends KNNRestTestCase { + + static TestUtils.TestData testData; + + @BeforeClass + public static void setUpClass() throws IOException { + if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) { + throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null"); + } + URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json"); + URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv"); + assert testIndexVectors != null; + assert testQueries != null; + testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath()); + } + + @SneakyThrows + public void testConcurrentSegmentSearch_thenSucceed() { + String indexName = "test-concurrent-segment"; + String fieldName = "test-field-1"; + int dimension = testData.indexData.vectors[0].length; + final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension); + Map mappingMap = xContentBuilderToMap(indexBuilder); + String mapping = indexBuilder.toString(); + createKnnIndex(indexName, mapping); + assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName))); + + // Index the test data + for (int i = 0; i < testData.indexData.docs.length; i++) { + addKnnDoc( + indexName, + Integer.toString(testData.indexData.docs[i]), + fieldName, + Floats.asList(testData.indexData.vectors[i]).toArray() + ); + } + refreshAllNonSystemIndices(); + updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); + + // Test search queries + int k = 10; + verifySearch(indexName, fieldName, k); + + updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all")); + verifySearch(indexName, fieldName, k); + + deleteKNNIndex(indexName); + } + + /* + { + "properties": { + "": { + "type": "knn_vector", + "dimension": , + "method": { + "name": "hnsw", + "space_type": "l2", + "engine": "faiss", + "parameters": { + "m": 16, + "ef_construction": 128, + "ef_search": 128 + } + } + } + } + */ + @SneakyThrows + private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) { + return KNNJsonIndexMappingsBuilder.builder() + .fieldName(fieldName) + .dimension(dimension) + .method( + KNNJsonIndexMappingsBuilder.Method.builder() + .engine(KNNEngine.FAISS.getName()) + .methodName(METHOD_HNSW) + .spaceType(SpaceType.L2.getValue()) + .parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build()) + .build() + ) + .build() + .getIndexMappingBuilder(); + } + + @SneakyThrows + private void verifySearch(String indexName, String fieldName, int k) { + for (int i = 0; i < testData.queries.length; i++) { + final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build(); + Response response = searchKNNIndex(indexName, queryBuilder, k); + String responseBody = EntityUtils.toString(response.getEntity()); + List knnResults = parseSearchResponse(responseBody, fieldName); + assertEquals(k, knnResults.size()); + + List actualScores = parseSearchResponseScore(responseBody, fieldName); + for (int j = 0; j < k; j++) { + float[] primitiveArray = knnResults.get(j).getVector(); + assertEquals( + KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2), + actualScores.get(j), + 0.0001 + ); + } + } + } +} diff --git a/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java b/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java new file mode 100644 index 000000000..53fd520f7 --- /dev/null +++ b/src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java @@ -0,0 +1,65 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.knn.plugin.search; + +import org.opensearch.index.IndexSettings; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.knn.KNNTestCase; +import org.opensearch.knn.index.KNNSettings; +import org.opensearch.knn.index.query.KNNQueryBuilder; +import org.opensearch.search.deciders.ConcurrentSearchDecision; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase { + + public void testDecider_thenSucceed() { + ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision"); + + KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider(); + assertDecision(noop, decider.getConcurrentSearchDecision()); + IndexSettings indexSettingsMock = mock(IndexSettings.class); + when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE); + + // Non KNNQueryBuilder + decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock); + assertDecision(noop, decider.getConcurrentSearchDecision()); + decider.evaluateForQuery( + KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), + indexSettingsMock + ); + assertDecision(noop, decider.getConcurrentSearchDecision()); + + when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE); + decider.evaluateForQuery( + KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), + indexSettingsMock + ); + ConcurrentSearchDecision yes = new ConcurrentSearchDecision( + ConcurrentSearchDecision.DecisionStatus.YES, + "Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index" + ); + assertDecision(yes, decider.getConcurrentSearchDecision()); + + decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock); + assertDecision(noop, decider.getConcurrentSearchDecision()); + } + + public void testDeciderFactory_thenSucceed() { + KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory(); + IndexSettings indexSettingsMock = mock(IndexSettings.class); + when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE); + assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get()); + when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE); + assertTrue(factory.create(indexSettingsMock).isEmpty()); + } + + private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) { + assertEquals(expected.getDecisionReason(), actual.getDecisionReason()); + assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus()); + } +} diff --git a/src/testFixtures/java/org/opensearch/knn/KNNJsonIndexMappingsBuilder.java b/src/testFixtures/java/org/opensearch/knn/KNNJsonIndexMappingsBuilder.java index 15ac0c211..817684f89 100644 --- a/src/testFixtures/java/org/opensearch/knn/KNNJsonIndexMappingsBuilder.java +++ b/src/testFixtures/java/org/opensearch/knn/KNNJsonIndexMappingsBuilder.java @@ -9,6 +9,7 @@ import lombok.NonNull; import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.knn.common.KNNConstants; import java.io.IOException; @@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder { private String vectorDataType; private Method method; - public String getIndexMapping() throws IOException { + public XContentBuilder getIndexMappingBuilder() throws IOException { if (nestedFieldName != null) { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() .startObject() @@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException { addVectorDataType(xContentBuilder); addMethod(xContentBuilder); xContentBuilder.endObject().endObject().endObject().endObject().endObject(); - return xContentBuilder.toString(); + return xContentBuilder; } else { XContentBuilder xContentBuilder = XContentFactory.jsonBuilder() .startObject() @@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException { addVectorDataType(xContentBuilder); addMethod(xContentBuilder); xContentBuilder.endObject().endObject().endObject(); - return xContentBuilder.toString(); + return xContentBuilder; } } + public String getIndexMapping() throws IOException { + return getIndexMappingBuilder().toString(); + } + private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException { if (vectorDataType == null) { return; @@ -104,6 +109,7 @@ public static class Parameters { private Encoder encoder; private Integer efConstruction; private Integer efSearch; + private Integer m; private void addTo(final XContentBuilder xContentBuilder) throws IOException { xContentBuilder.startObject("parameters"); @@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException { if (efSearch != null) { xContentBuilder.field("ef_search", efSearch); } + if (m != null) { + xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m); + } addEncoder(xContentBuilder); xContentBuilder.endObject(); }