Skip to content

Commit

Permalink
Adds Unit tests for NativeEngines990KnnVectorsWriter (#2097)
Browse files Browse the repository at this point in the history
Had to separate out the common code to make it easy to write tests
Mocking was difficult to do with the functional interfaces and it was
throwing NPE in the test especially with the mock of NativeIndexWriter.

Signed-off-by: Tejas Shah <shatejas@amazon.com>
  • Loading branch information
shatejas committed Sep 13, 2024
1 parent 7aedefd commit 0df0613
Show file tree
Hide file tree
Showing 4 changed files with 585 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.RamUsageEstimator;
import org.opensearch.common.StopWatch;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.codec.nativeindex.NativeIndexWriter;
import org.opensearch.knn.index.quantizationservice.QuantizationService;
import org.opensearch.knn.index.vectorvalues.KNNVectorValues;
import org.opensearch.knn.index.vectorvalues.KNNVectorValuesFactory;
import org.opensearch.knn.plugin.stats.KNNGraphValue;
import org.opensearch.knn.quantization.models.quantizationParams.QuantizationParams;
import org.opensearch.knn.quantization.models.quantizationState.QuantizationState;
Expand All @@ -39,6 +38,7 @@
import java.util.List;

import static org.opensearch.knn.common.FieldInfoExtractor.extractVectorDataType;
import static org.opensearch.knn.index.vectorvalues.KNNVectorValuesFactory.getVectorValues;

/**
* A KNNVectorsWriter class for writing the vector data strcutures and flat vectors for Native Engines.
Expand All @@ -47,15 +47,11 @@
public class NativeEngines990KnnVectorsWriter extends KnnVectorsWriter {
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(NativeEngines990KnnVectorsWriter.class);

private static final String FLUSH_OPERATION = "flush";
private static final String MERGE_OPERATION = "merge";

private final SegmentWriteState segmentWriteState;
private final FlatVectorsWriter flatVectorsWriter;
private KNN990QuantizationStateWriter quantizationStateWriter;
private final List<NativeEngineFieldVectorsWriter<?>> fields = new ArrayList<>();
private boolean finished;
private final QuantizationService quantizationService = QuantizationService.getInstance();

public NativeEngines990KnnVectorsWriter(SegmentWriteState segmentWriteState, FlatVectorsWriter flatVectorsWriter) {
this.segmentWriteState = segmentWriteState;
Expand Down Expand Up @@ -84,14 +80,27 @@ public void flush(int maxDoc, final Sorter.DocMap sortMap) throws IOException {
flatVectorsWriter.flush(maxDoc, sortMap);

for (final NativeEngineFieldVectorsWriter<?> field : fields) {
trainAndIndex(
field.getFieldInfo(),
(vectorDataType, fieldInfo, fieldVectorsWriter) -> getKNNVectorValues(vectorDataType, fieldVectorsWriter),
NativeIndexWriter::flushIndex,
field,
KNNGraphValue.REFRESH_TOTAL_TIME_IN_MILLIS,
FLUSH_OPERATION
);
final FieldInfo fieldInfo = field.getFieldInfo();
final VectorDataType vectorDataType = extractVectorDataType(fieldInfo);
int totalLiveDocs = getLiveDocs(getVectorValues(vectorDataType, field.getDocsWithField(), field.getVectors()));
if (totalLiveDocs > 0) {
KNNVectorValues<?> knnVectorValues = getVectorValues(vectorDataType, field.getDocsWithField(), field.getVectors());

final QuantizationState quantizationState = train(field.getFieldInfo(), knnVectorValues, totalLiveDocs);
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);

knnVectorValues = getVectorValues(vectorDataType, field.getDocsWithField(), field.getVectors());

StopWatch stopWatch = new StopWatch().start();

writer.flushIndex(knnVectorValues, totalLiveDocs);

long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.REFRESH_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
log.debug("Flush took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName());
} else {
log.debug("[Flush] No live docs for field {}", fieldInfo.getName());
}
}
}

Expand All @@ -100,15 +109,26 @@ public void mergeOneField(final FieldInfo fieldInfo, final MergeState mergeState
// This will ensure that we are merging the FlatIndex during force merge.
flatVectorsWriter.mergeOneField(fieldInfo, mergeState);

// For merge, pick values from flat vector and reindex again. This will use the flush operation to create graphs
trainAndIndex(
fieldInfo,
this::getKNNVectorValuesForMerge,
NativeIndexWriter::mergeIndex,
mergeState,
KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS,
MERGE_OPERATION
);
final VectorDataType vectorDataType = extractVectorDataType(fieldInfo);
int totalLiveDocs = getLiveDocs(getKNNVectorValuesForMerge(vectorDataType, fieldInfo, mergeState));
if (totalLiveDocs == 0) {
log.debug("[Merge] No live docs for field {}", fieldInfo.getName());
return;
}

KNNVectorValues<?> knnVectorValues = getKNNVectorValuesForMerge(vectorDataType, fieldInfo, mergeState);
final QuantizationState quantizationState = train(fieldInfo, knnVectorValues, totalLiveDocs);
final NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);

knnVectorValues = getKNNVectorValuesForMerge(vectorDataType, fieldInfo, mergeState);

StopWatch stopWatch = new StopWatch().start();

writer.mergeIndex(knnVectorValues, totalLiveDocs);

long time_in_millis = stopWatch.stop().totalTime().millis();
KNNGraphValue.MERGE_TOTAL_TIME_IN_MILLIS.incrementBy(time_in_millis);
log.debug("Merge took {} ms for vector field [{}]", time_in_millis, fieldInfo.getName());
}

/**
Expand Down Expand Up @@ -157,18 +177,6 @@ public long ramBytesUsed() {
.sum();
}

/**
* Retrieves the {@link KNNVectorValues} for a specific field based on the vector data type and field writer.
*
* @param vectorDataType The {@link VectorDataType} representing the type of vectors stored.
* @param field The {@link NativeEngineFieldVectorsWriter} representing the field from which to retrieve vectors.
* @param <T> The type of vectors being processed.
* @return The {@link KNNVectorValues} associated with the field.
*/
private <T> KNNVectorValues<T> getKNNVectorValues(final VectorDataType vectorDataType, final NativeEngineFieldVectorsWriter<?> field) {
return (KNNVectorValues<T>) KNNVectorValuesFactory.getVectorValues(vectorDataType, field.getDocsWithField(), field.getVectors());
}

/**
* Retrieves the {@link KNNVectorValues} for a specific field during a merge operation, based on the vector data type.
*
Expand All @@ -187,84 +195,28 @@ private <T> KNNVectorValues<T> getKNNVectorValuesForMerge(
switch (fieldInfo.getVectorEncoding()) {
case FLOAT32:
FloatVectorValues mergedFloats = MergedVectorValues.mergeFloatVectorValues(fieldInfo, mergeState);
return (KNNVectorValues<T>) KNNVectorValuesFactory.getVectorValues(vectorDataType, mergedFloats);
return getVectorValues(vectorDataType, mergedFloats);
case BYTE:
ByteVectorValues mergedBytes = MergedVectorValues.mergeByteVectorValues(fieldInfo, mergeState);
return (KNNVectorValues<T>) KNNVectorValuesFactory.getVectorValues(vectorDataType, mergedBytes);
return getVectorValues(vectorDataType, mergedBytes);
default:
throw new IllegalStateException("Unsupported vector encoding [" + fieldInfo.getVectorEncoding() + "]");
}
}

/**
* Functional interface representing an operation that indexes the provided {@link KNNVectorValues}.
*
* @param <T> The type of vectors being processed.
*/
@FunctionalInterface
private interface IndexOperation<T> {
void buildAndWrite(NativeIndexWriter writer, KNNVectorValues<T> knnVectorValues, int totalLiveDocs) throws IOException;
}

/**
* Functional interface representing a method that retrieves {@link KNNVectorValues} based on
* the vector data type, field information, and the merge state.
*
* @param <DataType> The type of the data representing the vector (e.g., {@link VectorDataType}).
* @param <FieldInfo> The metadata about the field.
* @param <MergeState> The state of the merge operation.
* @param <Result> The result of the retrieval, typically {@link KNNVectorValues}.
*/
@FunctionalInterface
private interface VectorValuesRetriever<DataType, FieldInfo, MergeState, Result> {
Result apply(DataType vectorDataType, FieldInfo fieldInfo, MergeState mergeState) throws IOException;
}
private QuantizationState train(final FieldInfo fieldInfo, final KNNVectorValues<?> knnVectorValues, final int totalLiveDocs)
throws IOException {

/**
* Unified method for processing a field during either the indexing or merge operation. This method retrieves vector values
* based on the provided vector data type and applies the specified index operation, potentially including quantization if needed.
*
* @param fieldInfo The {@link FieldInfo} object containing metadata about the field.
* @param vectorValuesRetriever A functional interface that retrieves {@link KNNVectorValues} based on the vector data type,
* field information, and additional context (e.g., merge state or field writer).
* @param indexOperation A functional interface that performs the indexing operation using the retrieved
* {@link KNNVectorValues}.
* @param VectorProcessingContext The additional context required for retrieving the vector values (e.g., {@link MergeState} or {@link NativeEngineFieldVectorsWriter}).
* From Flush we need NativeFieldWriter which contains total number of vectors while from Merge we need merge state which contains vector information
* @param <T> The type of vectors being processed.
* @param <C> The type of the context needed for retrieving the vector values.
* @throws IOException If an I/O error occurs during the processing.
*/
private <T, C> void trainAndIndex(
final FieldInfo fieldInfo,
final VectorValuesRetriever<VectorDataType, FieldInfo, C, KNNVectorValues<T>> vectorValuesRetriever,
final IndexOperation<T> indexOperation,
final C VectorProcessingContext,
final KNNGraphValue graphBuildTime,
final String operationName
) throws IOException {
final VectorDataType vectorDataType = extractVectorDataType(fieldInfo);
// Count the docIds
int totalLiveDocs = getLiveDocs(vectorValuesRetriever.apply(vectorDataType, fieldInfo, VectorProcessingContext));
if (totalLiveDocs == 0) {
log.debug("No live docs for field " + fieldInfo.name);
return;
}
final QuantizationService quantizationService = QuantizationService.getInstance();
final QuantizationParams quantizationParams = quantizationService.getQuantizationParams(fieldInfo);
QuantizationState quantizationState = null;
QuantizationParams quantizationParams = quantizationService.getQuantizationParams(fieldInfo);
if (quantizationParams != null) {
if (quantizationParams != null && totalLiveDocs > 0) {
initQuantizationStateWriterIfNecessary();
KNNVectorValues<T> knnVectorValues = vectorValuesRetriever.apply(vectorDataType, fieldInfo, VectorProcessingContext);
quantizationState = quantizationService.train(quantizationParams, knnVectorValues, totalLiveDocs);
quantizationStateWriter.writeState(fieldInfo.getFieldNumber(), quantizationState);
}
NativeIndexWriter writer = NativeIndexWriter.getWriter(fieldInfo, segmentWriteState, quantizationState);
KNNVectorValues<T> knnVectors = vectorValuesRetriever.apply(vectorDataType, fieldInfo, VectorProcessingContext);
StopWatch stopWatch = new StopWatch().start();
indexOperation.buildAndWrite(writer, knnVectors, totalLiveDocs);
long time_in_millis = stopWatch.stop().totalTime().millis();
graphBuildTime.incrementBy(time_in_millis);
log.warn("Graph build took " + time_in_millis + " ms for " + operationName);

return quantizationState;
}

/**
Expand Down
Loading

0 comments on commit 0df0613

Please sign in to comment.