-
Notifications
You must be signed in to change notification settings - Fork 116
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Integrates KNN plugin with ConcurrentSearchRequestDecider interface
This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or search.concurrent_segment_search.mode in auto mode. Without this the default behavior of auto mode is non-concurrent search Signed-off-by: Tejas Shah <shatejas@amazon.com>
- Loading branch information
Showing
5 changed files
with
289 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
65 changes: 65 additions & 0 deletions
65
src/main/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDecider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.plugin.search; | ||
|
||
import lombok.EqualsAndHashCode; | ||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.query.QueryBuilder; | ||
import org.opensearch.knn.index.KNNSettings; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.search.deciders.ConcurrentSearchDecision; | ||
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider; | ||
|
||
import java.util.Optional; | ||
|
||
/** | ||
* Decides if the knn query uses concurrent search | ||
* As of 2.17, this is only used when | ||
* - "index.search.concurrent_segment_search.mode": "auto" or | ||
* - "search.concurrent_segment_search.mode": "auto" | ||
* | ||
* Note: the class is not thread-safe and a new instance needs to be created for each request | ||
*/ | ||
@EqualsAndHashCode(callSuper = true) | ||
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider { | ||
|
||
private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.NO_OP, | ||
"Default decision" | ||
); | ||
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.YES, | ||
"Enable concurrent search for knn" | ||
); | ||
|
||
private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION; | ||
|
||
@Override | ||
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) { | ||
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) { | ||
knnDecision = YES; | ||
} else { | ||
knnDecision = DEFAULT_KNN_DECISION; | ||
} | ||
} | ||
|
||
@Override | ||
public ConcurrentSearchDecision getConcurrentSearchDecision() { | ||
return knnDecision; | ||
} | ||
|
||
/** | ||
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true | ||
*/ | ||
public static class Factory implements ConcurrentSearchRequestDecider.Factory { | ||
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) { | ||
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) { | ||
return Optional.of(new KNNConcurrentSearchRequestDecider()); | ||
} | ||
return Optional.empty(); | ||
} | ||
} | ||
} |
151 changes: 151 additions & 0 deletions
151
src/test/java/org/opensearch/knn/integ/search/ConcurrentSegmentSearchIT.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.integ.search; | ||
|
||
import com.google.common.primitives.Floats; | ||
import lombok.SneakyThrows; | ||
import org.apache.hc.core5.http.io.entity.EntityUtils; | ||
import org.junit.BeforeClass; | ||
import org.opensearch.client.Response; | ||
import org.opensearch.common.settings.Settings; | ||
import org.opensearch.common.xcontent.XContentFactory; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.knn.KNNRestTestCase; | ||
import org.opensearch.knn.KNNResult; | ||
import org.opensearch.knn.TestUtils; | ||
import org.opensearch.knn.common.KNNConstants; | ||
import org.opensearch.knn.index.SpaceType; | ||
import org.opensearch.knn.index.engine.KNNEngine; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.knn.plugin.script.KNNScoringUtil; | ||
|
||
import java.io.IOException; | ||
import java.net.URL; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.TreeMap; | ||
|
||
import static org.opensearch.knn.common.KNNConstants.KNN_ENGINE; | ||
import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW; | ||
import static org.opensearch.knn.common.KNNConstants.METHOD_PARAMETER_SPACE_TYPE; | ||
import static org.opensearch.knn.common.KNNConstants.NAME; | ||
import static org.opensearch.knn.common.KNNConstants.PARAMETERS; | ||
|
||
/** | ||
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact | ||
* There is no latency verification as it can be better encapsulated in nightly runs. | ||
*/ | ||
public class ConcurrentSegmentSearchIT extends KNNRestTestCase { | ||
|
||
static TestUtils.TestData testData; | ||
|
||
@BeforeClass | ||
public static void setUpClass() throws IOException { | ||
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) { | ||
throw new IllegalStateException("ClassLoader of FaissIT Class is null"); | ||
} | ||
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json"); | ||
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv"); | ||
assert testIndexVectors != null; | ||
assert testQueries != null; | ||
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath()); | ||
} | ||
|
||
@SneakyThrows | ||
public void testConcurrentSegmentSearch() { | ||
String indexName = "test-concurrent-segment"; | ||
String fieldName = "test-field-1"; | ||
int dimension = testData.indexData.vectors[0].length; | ||
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension); | ||
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder); | ||
String mapping = indexBuilder.toString(); | ||
createKnnIndex(indexName, mapping); | ||
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName))); | ||
|
||
// Index the test data | ||
for (int i = 0; i < testData.indexData.docs.length; i++) { | ||
addKnnDoc( | ||
indexName, | ||
Integer.toString(testData.indexData.docs[i]), | ||
fieldName, | ||
Floats.asList(testData.indexData.vectors[i]).toArray() | ||
); | ||
} | ||
refreshAllNonSystemIndices(); | ||
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); | ||
|
||
// Test search queries | ||
int k = 10; | ||
verifySearch(indexName, fieldName, k); | ||
|
||
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto")); | ||
verifySearch(indexName, fieldName, k); | ||
} | ||
|
||
/* | ||
{ | ||
"properties": { | ||
"<fieldName>": { | ||
"type": "knn_vector", | ||
"dimension": <dimension>, | ||
"method": { | ||
"name": "hnsw", | ||
"space_type": "l2", | ||
"engine": "faiss", | ||
"parameters": { | ||
"m": 16, | ||
"ef_construction": 128, | ||
"ef_search": 128 | ||
} | ||
} | ||
} | ||
} | ||
*/ | ||
@SneakyThrows | ||
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) { | ||
// Create an index | ||
return XContentFactory.jsonBuilder() | ||
.startObject() | ||
.startObject("properties") | ||
.startObject(fieldName) | ||
.field("type", "knn_vector") | ||
.field("dimension", dimension) | ||
.startObject(KNNConstants.KNN_METHOD) | ||
.field(NAME, METHOD_HNSW) | ||
.field(METHOD_PARAMETER_SPACE_TYPE, SpaceType.L2.getValue()) | ||
.field(KNN_ENGINE, KNNEngine.FAISS.getName()) | ||
.startObject(PARAMETERS) | ||
.field(KNNConstants.METHOD_PARAMETER_M, 16) | ||
.field(KNNConstants.METHOD_PARAMETER_EF_CONSTRUCTION, 128) | ||
.field(KNNConstants.METHOD_PARAMETER_EF_SEARCH, 128) | ||
.endObject() | ||
.endObject() | ||
.endObject() | ||
.endObject() | ||
.endObject(); | ||
} | ||
|
||
@SneakyThrows | ||
private void verifySearch(String indexName, String fieldName, int k) { | ||
for (int i = 0; i < testData.queries.length; i++) { | ||
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build(); | ||
Response response = searchKNNIndex(indexName, queryBuilder, k); | ||
String responseBody = EntityUtils.toString(response.getEntity()); | ||
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName); | ||
assertEquals(k, knnResults.size()); | ||
|
||
List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName); | ||
for (int j = 0; j < k; j++) { | ||
float[] primitiveArray = knnResults.get(j).getVector(); | ||
assertEquals( | ||
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2), | ||
actualScores.get(j), | ||
0.0001 | ||
); | ||
} | ||
} | ||
} | ||
} |
65 changes: 65 additions & 0 deletions
65
src/test/java/org/opensearch/knn/plugin/search/KNNConcurrentSearchRequestDeciderTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,65 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.plugin.search; | ||
|
||
import org.opensearch.index.IndexSettings; | ||
import org.opensearch.index.query.MatchAllQueryBuilder; | ||
import org.opensearch.knn.KNNTestCase; | ||
import org.opensearch.knn.index.KNNSettings; | ||
import org.opensearch.knn.index.query.KNNQueryBuilder; | ||
import org.opensearch.search.deciders.ConcurrentSearchDecision; | ||
|
||
import static org.mockito.Mockito.mock; | ||
import static org.mockito.Mockito.when; | ||
|
||
public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase { | ||
|
||
public void testDecider() { | ||
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision"); | ||
|
||
KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider(); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
IndexSettings indexSettingsMock = mock(IndexSettings.class); | ||
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE); | ||
|
||
// Non KNNQueryBuilder | ||
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
decider.evaluateForQuery( | ||
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), | ||
indexSettingsMock | ||
); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
|
||
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE); | ||
decider.evaluateForQuery( | ||
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(), | ||
indexSettingsMock | ||
); | ||
ConcurrentSearchDecision yes = new ConcurrentSearchDecision( | ||
ConcurrentSearchDecision.DecisionStatus.YES, | ||
"Enable concurrent search for knn" | ||
); | ||
assertDecision(yes, decider.getConcurrentSearchDecision()); | ||
|
||
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock); | ||
assertDecision(noop, decider.getConcurrentSearchDecision()); | ||
} | ||
|
||
public void testDeciderFactory() { | ||
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory(); | ||
IndexSettings indexSettingsMock = mock(IndexSettings.class); | ||
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE); | ||
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get()); | ||
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE); | ||
assertTrue(factory.create(indexSettingsMock).isEmpty()); | ||
} | ||
|
||
private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) { | ||
assertEquals(expected.getDecisionReason(), actual.getDecisionReason()); | ||
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus()); | ||
} | ||
} |