Skip to content

Commit

Permalink
star tree file formats refactoring and fixing offset bug
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
  • Loading branch information
bharath-techie committed Sep 18, 2024
1 parent 7c427d9 commit f6d3ca9
Show file tree
Hide file tree
Showing 22 changed files with 532 additions and 462 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public class CompositeIndexConstants {
public static final long COMPOSITE_FIELD_MARKER = 0xC0950513F1E1DL; // Composite Field

/**
* Represents the key to fetch number of non-star aggregated segment documents.
* Represents the key to fetch number of non-star aggregated records/entries
*/
public static final String SEGMENT_DOCS_COUNT = "segmentDocsCount";
public static final String SEGMENT_ENTRIES_COUNT = "segmentEntriesCount";

/**
* Represents the key to fetch number of total star tree documents in a segment.
* Represents the key to fetch number of total star tree records/entries in a segment.
*/
public static final String STAR_TREE_DOCS_COUNT = "starTreeDocsCount";
public static final String STAR_TREE_ENTRIES_COUNT = "starTreeEntriesCount";

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.node.InMemoryTreeNode;
import org.opensearch.index.compositeindex.datacube.startree.node.StarTreeNodeType;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.SortedNumericStarTreeValuesIterator;
import org.opensearch.index.mapper.DocCountFieldMapper;
import org.opensearch.index.mapper.FieldMapper;
import org.opensearch.index.mapper.FieldValueConverter;
Expand Down Expand Up @@ -177,13 +178,13 @@ public List<MetricAggregatorInfo> generateMetricAggregatorInfos(MapperService ma
*
* @return list of MetricAggregatorInfo
*/
public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState state, Map<String, DocValuesProducer> fieldProducerMap)
public List<SequentialValuesIterator> getMetricReaders(SegmentWriteState state, Map<String, DocValuesProducer> fieldProducerMap)
throws IOException {

List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
List<SequentialValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
SequentialDocValuesIterator metricReader;
SequentialValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricStat.equals(MetricStat.DOC_COUNT)) {
// _doc_count is numeric field , so we convert to sortedNumericDocValues and get iterator
Expand All @@ -192,8 +193,10 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField(), DocValuesType.SORTED_NUMERIC);
}
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
metricReader = new SequentialValuesIterator(
new SortedNumericStarTreeValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
)
);
}
metricReaders.add(metricReader);
Expand All @@ -218,17 +221,17 @@ public void build(
long startTime = System.currentTimeMillis();
logger.debug("Star-tree build is a go with star tree field {}", starTreeField.getName());

List<SequentialDocValuesIterator> metricReaders = getMetricReaders(writeState, fieldProducerMap);
List<SequentialValuesIterator> metricReaders = getMetricReaders(writeState, fieldProducerMap);
List<Dimension> dimensionsSplitOrder = starTreeField.getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[dimensionsSplitOrder.size()];
SequentialValuesIterator[] dimensionReaders = new SequentialValuesIterator[dimensionsSplitOrder.size()];
for (int i = 0; i < numDimensions; i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
FieldInfo dimensionFieldInfo = writeState.fieldInfos.fieldInfo(dimension);
if (dimensionFieldInfo == null) {
dimensionFieldInfo = getFieldInfo(dimension, DocValuesType.SORTED_NUMERIC);
}
dimensionReaders[i] = new SequentialDocValuesIterator(
fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo)
dimensionReaders[i] = new SequentialValuesIterator(
new SortedNumericStarTreeValuesIterator(fieldProducerMap.get(dimensionFieldInfo.name).getSortedNumeric(dimensionFieldInfo))
);
}
Iterator<StarTreeDocument> starTreeDocumentIterator = sortAndAggregateSegmentDocuments(dimensionReaders, metricReaders);
Expand Down Expand Up @@ -287,7 +290,7 @@ void appendDocumentsToStarTree(Iterator<StarTreeDocument> starTreeDocumentIterat
}
}

private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDocs) throws IOException {
private void serializeStarTree(int numSegmentStarTreeEntries, int numStarTreeEntries) throws IOException {
// serialize the star tree data
long dataFilePointer = dataOut.getFilePointer();
StarTreeWriter starTreeWriter = new StarTreeWriter();
Expand All @@ -299,8 +302,8 @@ private void serializeStarTree(int numSegmentStarTreeDocument, int numStarTreeDo
starTreeField,
metricAggregatorInfos,
numStarTreeNodes,
numSegmentStarTreeDocument,
numStarTreeDocs,
numSegmentStarTreeEntries,
numStarTreeEntries,
dataFilePointer,
totalStarTreeDataLength
);
Expand Down Expand Up @@ -395,27 +398,25 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) {
*/
protected StarTreeDocument getStarTreeDocument(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
SequentialValuesIterator[] dimensionReaders,
List<SequentialValuesIterator> metricReaders
) throws IOException {
Long[] dims = new Long[numDimensions];
int i = 0;
for (SequentialDocValuesIterator dimensionDocValueIterator : dimensionReaders) {
dimensionDocValueIterator.nextDoc(currentDocId);
Long val = dimensionDocValueIterator.value(currentDocId);
for (SequentialValuesIterator dimensionValueIterator : dimensionReaders) {
dimensionValueIterator.nextEntry(currentDocId);
Long val = dimensionValueIterator.value(currentDocId);
dims[i] = val;
i++;
}
i = 0;
Object[] metrics = new Object[metricReaders.size()];
for (SequentialDocValuesIterator metricDocValuesIterator : metricReaders) {
metricDocValuesIterator.nextDoc(currentDocId);
for (SequentialValuesIterator metricValuesIterator : metricReaders) {
metricValuesIterator.nextEntry(currentDocId);
// As part of merge, we traverse the star tree doc values
// The type of data stored in metric fields is different from the
// actual indexing field they're based on
metrics[i] = metricAggregatorInfos.get(i)
.getValueAggregators()
.toAggregatedValueType(metricDocValuesIterator.value(currentDocId));
metrics[i] = metricAggregatorInfos.get(i).getValueAggregators().toAggregatedValueType(metricValuesIterator.value(currentDocId));
i++;
}
return new StarTreeDocument(dims, metrics);
Expand Down Expand Up @@ -463,8 +464,8 @@ protected StarTreeDocument getStarTreeDocument(
* @return Iterator for the aggregated star-tree document
*/
public abstract Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
SequentialValuesIterator[] dimensionReaders,
List<SequentialValuesIterator> metricReaders
) throws IOException;

/**
Expand All @@ -483,8 +484,8 @@ public abstract Iterator<StarTreeDocument> generateStarTreeDocumentsForStarNode(
*/
protected StarTreeDocument getSegmentStarTreeDocument(
int currentDocId,
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
SequentialValuesIterator[] dimensionReaders,
List<SequentialValuesIterator> metricReaders
) throws IOException {
Long[] dimensions = getStarTreeDimensionsFromSegment(currentDocId, dimensionReaders);
Object[] metrics = getStarTreeMetricsFromSegment(currentDocId, metricReaders);
Expand All @@ -497,12 +498,12 @@ protected StarTreeDocument getSegmentStarTreeDocument(
* @return dimension values for each of the star-tree dimension
* @throws IOException when we are unable to iterate to the next doc for the given dimension readers
*/
Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIterator[] dimensionReaders) throws IOException {
Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialValuesIterator[] dimensionReaders) throws IOException {
Long[] dimensions = new Long[numDimensions];
for (int i = 0; i < numDimensions; i++) {
if (dimensionReaders[i] != null) {
try {
dimensionReaders[i].nextDoc(currentDocId);
dimensionReaders[i].nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand All @@ -524,13 +525,13 @@ Long[] getStarTreeDimensionsFromSegment(int currentDocId, SequentialDocValuesIte
* @return metric values for each of the star-tree metric
* @throws IOException when we are unable to iterate to the next doc for the given metric readers
*/
private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<SequentialDocValuesIterator> metricsReaders) throws IOException {
private Object[] getStarTreeMetricsFromSegment(int currentDocId, List<SequentialValuesIterator> metricsReaders) throws IOException {
Object[] metrics = new Object[numMetrics];
for (int i = 0; i < numMetrics; i++) {
SequentialDocValuesIterator metricStatReader = metricsReaders.get(i);
SequentialValuesIterator metricStatReader = metricsReaders.get(i);
if (metricStatReader != null) {
try {
metricStatReader.nextDoc(currentDocId);
metricStatReader.nextEntry(currentDocId);
} catch (IOException e) {
logger.error("unable to iterate to next doc", e);
throw new RuntimeException("unable to iterate to next doc", e);
Expand Down Expand Up @@ -661,18 +662,18 @@ public StarTreeDocument reduceStarTreeDocuments(StarTreeDocument aggregatedDocum
/**
* Converts numericDocValues to sortedNumericDocValues and returns SequentialDocValuesIterator
*/
private SequentialDocValuesIterator getIteratorForNumericField(
private SequentialValuesIterator getIteratorForNumericField(
Map<String, DocValuesProducer> fieldProducerMap,
FieldInfo fieldInfo,
String name
) throws IOException {
if (fieldInfo == null) {
fieldInfo = getFieldInfo(name, DocValuesType.NUMERIC);
}
SequentialDocValuesIterator sequentialDocValuesIterator;
SequentialValuesIterator sequentialDocValuesIterator;
assert fieldProducerMap.containsKey(fieldInfo.name);
sequentialDocValuesIterator = new SequentialDocValuesIterator(
DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo))
sequentialDocValuesIterator = new SequentialValuesIterator(
new SortedNumericStarTreeValuesIterator(DocValues.singleton(fieldProducerMap.get(fieldInfo.name).getNumeric(fieldInfo)))
);
return sequentialDocValuesIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.DocValuesConsumer;
import org.apache.lucene.index.SegmentWriteState;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.util.io.IOUtils;
Expand All @@ -22,8 +21,9 @@
import org.opensearch.index.compositeindex.datacube.startree.StarTreeDocument;
import org.opensearch.index.compositeindex.datacube.startree.StarTreeField;
import org.opensearch.index.compositeindex.datacube.startree.index.StarTreeValues;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialDocValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.SequentialValuesIterator;
import org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeDocumentsSorter;
import org.opensearch.index.compositeindex.datacube.startree.utils.iterator.StarTreeValuesIterator;
import org.opensearch.index.mapper.MapperService;

import java.io.IOException;
Expand All @@ -36,7 +36,7 @@
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.index.compositeindex.CompositeIndexConstants.SEGMENT_DOCS_COUNT;
import static org.opensearch.index.compositeindex.CompositeIndexConstants.SEGMENT_ENTRIES_COUNT;
import static org.opensearch.index.compositeindex.datacube.startree.utils.StarTreeUtils.fullyQualifiedFieldNameForStarTreeMetricsDocValues;

/**
Expand Down Expand Up @@ -113,8 +113,8 @@ public void build(
*/
@Override
public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
SequentialDocValuesIterator[] dimensionReaders,
List<SequentialDocValuesIterator> metricReaders
SequentialValuesIterator[] dimensionReaders,
List<SequentialValuesIterator> metricReaders
) throws IOException {
// Write all dimensions for segment documents into the buffer,
// and sort all documents using an int array
Expand Down Expand Up @@ -143,19 +143,19 @@ public Iterator<StarTreeDocument> sortAndAggregateSegmentDocuments(
* @return iterator of star tree documents
*/
Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSubs) throws IOException {
int numDocs = 0;
int[] docIds;
int numEntries = 0;
int[] entryIds;
try {
for (StarTreeValues starTreeValues : starTreeValuesSubs) {
List<Dimension> dimensionsSplitOrder = starTreeValues.getStarTreeField().getDimensionsOrder();
SequentialDocValuesIterator[] dimensionReaders = new SequentialDocValuesIterator[starTreeValues.getStarTreeField()
SequentialValuesIterator[] dimensionReaders = new SequentialValuesIterator[starTreeValues.getStarTreeField()
.getDimensionsOrder()
.size()];
for (int i = 0; i < dimensionsSplitOrder.size(); i++) {
String dimension = dimensionsSplitOrder.get(i).getField();
dimensionReaders[i] = new SequentialDocValuesIterator(starTreeValues.getDimensionDocIdSetIterator(dimension));
dimensionReaders[i] = new SequentialValuesIterator(starTreeValues.getDimensionValuesIterator(dimension));
}
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
List<SequentialValuesIterator> metricReaders = new ArrayList<>();
// get doc id set iterators for metrics
for (Metric metric : starTreeValues.getStarTreeField().getMetrics()) {
for (MetricStat metricStat : metric.getBaseMetrics()) {
Expand All @@ -164,34 +164,35 @@ Iterator<StarTreeDocument> mergeStarTrees(List<StarTreeValues> starTreeValuesSub
metric.getField(),
metricStat.getTypeName()
);
metricReaders.add(new SequentialDocValuesIterator(starTreeValues.getMetricDocIdSetIterator(metricFullName)));
metricReaders.add(new SequentialValuesIterator(starTreeValues.getMetricValuesIterator(metricFullName)));
}
}
int currentDocId = 0;
int numSegmentDocs = Integer.parseInt(
starTreeValues.getAttributes().getOrDefault(SEGMENT_DOCS_COUNT, String.valueOf(DocIdSetIterator.NO_MORE_DOCS))
int currentEntryId = 0;
int numSegmentEntries = Integer.parseInt(
starTreeValues.getAttributes()
.getOrDefault(SEGMENT_ENTRIES_COUNT, String.valueOf(StarTreeValuesIterator.NO_MORE_ENTRIES))
);
while (currentDocId < numSegmentDocs) {
StarTreeDocument starTreeDocument = getStarTreeDocument(currentDocId, dimensionReaders, metricReaders);
while (currentEntryId < numSegmentEntries) {
StarTreeDocument starTreeDocument = getStarTreeDocument(currentEntryId, dimensionReaders, metricReaders);
segmentDocumentFileManager.writeStarTreeDocument(starTreeDocument, true);
numDocs++;
currentDocId++;
numEntries++;
currentEntryId++;
}
}
docIds = new int[numDocs];
for (int i = 0; i < numDocs; i++) {
docIds[i] = i;
entryIds = new int[numEntries];
for (int i = 0; i < numEntries; i++) {
entryIds[i] = i;
}
} catch (IOException ex) {
segmentDocumentFileManager.close();
throw ex;
}

if (numDocs == 0) {
if (numEntries == 0) {
return Collections.emptyIterator();
}

return sortAndReduceDocuments(docIds, numDocs, true);
return sortAndReduceDocuments(entryIds, numEntries, true);
}

/**
Expand Down
Loading

0 comments on commit f6d3ca9

Please sign in to comment.