Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize composite aggregation based on index sorting. #48130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,18 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
fieldType instanceof StringFieldType == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
fieldType instanceof StringFieldType == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return false;
}
return true;
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(Query query) {
if (fieldType == null) {
return null;
}
return new TermsSortedDocsProducer(fieldType.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,27 @@

package org.elasticsearch.search.aggregations.bucket.composite;


import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -72,6 +80,8 @@ final class CompositeAggregator extends BucketsAggregator {
private LeafReaderContext currentLeaf;
private RoaringDocIdSet.Builder docIdSetBuilder;
private BucketCollector deferredCollectors;
private SortField leadingSortField;
private int startDocId = 0;

CompositeAggregator(String name, AggregatorFactories factories, SearchContext context, Aggregator parent,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData,
Expand All @@ -93,7 +103,7 @@ final class CompositeAggregator extends BucketsAggregator {
this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size);
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.searcher().getIndexReader(), context.query());
this.sortedDocsProducer = sources[0].createSortedDocsProducerOrNull(context.query());
}

@Override
Expand Down Expand Up @@ -154,13 +164,16 @@ private void finishLeaf() {
currentLeaf = null;
docIdSetBuilder = null;
}
queue.setEarlyTerminate(false);
startDocId = 0;
}

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
finishLeaf();
boolean fillDocIdSet = deferredCollectors != NO_OP_COLLECTOR;
if (sortedDocsProducer != null) {
if (sortedDocsProducer != null &&
sources[0].canBeOptimizedBySortedDocs(context.searcher().getIndexReader(), context.query())) {
/*
The producer will visit documents sorted by the leading source of the composite definition
and terminates when the leading source value is guaranteed to be greater than the lowest
Expand All @@ -182,12 +195,38 @@ protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucket
currentLeaf = ctx;
docIdSetBuilder = new RoaringDocIdSet.Builder(ctx.reader().maxDoc());
}

QueryShardContext shardContext = context.getQueryShardContext();
IndexSortConfig indexSortConfig = shardContext.getIndexSettings().getIndexSortConfig();
if (indexSortConfig.hasIndexSort()) {
Sort sort = indexSortConfig.buildIndexSort(
shardContext::fieldMapper, shardContext::getForField);
this.leadingSortField = sort.getSort()[0];
}

if (leadingSortField != null && isSingleValued(ctx.reader(), leadingSortField)
&& leadingSortField.getField().equals(sourceNames.get(0))) {
queue.setLeadingSort(true);
SingleDimensionValuesSource<?> leadingSource = sources[0];
int leadingFieldReverse = leadingSortField.getReverse() == false ? 1 : -1;
// if source and leading field have the same order, get start doc id for filtering
if (sortedDocsProducer != null && leadingSource.reverseMul * leadingFieldReverse > 0) {
startDocId = sortedDocsProducer.getStartDocId(queue, ctx);
}
}

final LeafBucketCollector inner = queue.getLeafCollector(ctx, getFirstPassCollector(docIdSetBuilder));
return new LeafBucketCollector() {
@Override
public void collect(int doc, long zeroBucket) throws IOException {
assert zeroBucket == 0L;
inner.collect(doc);
if (queue.isEarlyTerminate()) {
throw new CollectionTerminatedException();
}

if (doc >= startDocId) {
inner.collect(doc);
}
}
};
}
Expand Down Expand Up @@ -272,6 +311,14 @@ public void collect(int doc, long zeroBucket) throws IOException {
};
}

public CompositeValuesCollectorQueue getQueue() {
return queue;
}

public int getStartDocId() {
return startDocId;
}

private SingleDimensionValuesSource<?> createValuesSource(BigArrays bigArrays, IndexReader reader,
CompositeValuesSourceConfig config, int size) {

Expand Down Expand Up @@ -357,5 +404,23 @@ private static class Entry {
this.docIdSet = docIdSet;
}
}

private boolean isSingleValued(IndexReader reader, SortField field) throws IOException {
SortField.Type type = IndexSortConfig.getSortFieldType(field);
for (LeafReaderContext context : reader.leaves()) {
if (type == SortField.Type.STRING) {
final SortedSetDocValues values = DocValues.getSortedSet(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
} else {
final SortedNumericDocValues values = DocValues.getSortedNumeric(context.reader(), field.getField());
if (values.cost() > 0 && DocValues.unwrapSingleton(values) == null) {
return false;
}
}
}
return true;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public int hashCode() {
private final SingleDimensionValuesSource<?>[] arrays;
private IntArray docCounts;
private boolean afterKeyIsSet = false;
private boolean isLeadingSort = false;
private boolean isEarlyTerminate = false;

/**
* Constructs a composite queue with the specified size and sources.
Expand Down Expand Up @@ -242,6 +244,18 @@ LeafBucketCollector getLeafCollector(Comparable forceLeadSourceValue,
return collector;
}

public void setLeadingSort(boolean isLeadingSort) {
this.isLeadingSort = isLeadingSort;
}

public boolean isEarlyTerminate() {
return isEarlyTerminate;
}

public void setEarlyTerminate(boolean earlyTerminate) {
this.isEarlyTerminate = earlyTerminate;
}

/**
* Check if the current candidate should be added in the queue.
* @return The target slot of the candidate or -1 is the candidate is not competitive.
Expand All @@ -256,12 +270,24 @@ int addIfCompetitive() {
}
if (afterKeyIsSet && compareCurrentWithAfter() <= 0) {
// this key is greater than the top value collected in the previous round, skip it
if (isLeadingSort && size() >= maxSize) {
if (arrays[0].compareCurrentWithAfter() < 0) {
// the leading source field value is greater than after value, early terminate collection
isEarlyTerminate = true;
}
}
return -1;
}
if (size() >= maxSize
// the tree map is full, check if the candidate key should be kept
&& compare(CANDIDATE_SLOT, top()) > 0) {
// the tree map is full, check if the candidate key should be kept
&& compare(CANDIDATE_SLOT, top()) > 0) {
// the candidate key is not competitive, skip it
if (isLeadingSort) {
if (arrays[0].compareCurrent(top()) > 0) {
// the leading source field value is greater than top value of queue, early terminate collection
isEarlyTerminate = true;
}
}
return -1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,15 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
SortedDocsProducer createSortedDocsProducerOrNull(Query query) {
return null;
}

@Override
boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) {
return false;
}

@Override
public void close() {
Releasables.close(values, bits);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,23 @@ public void collect(int doc, long bucket) throws IOException {
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
fieldType instanceof StringFieldType == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
SortedDocsProducer createSortedDocsProducerOrNull(Query query) {
if (fieldType == null) {
return null;
}
return new TermsSortedDocsProducer(fieldType.name());
}

@Override
boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) {
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
fieldType instanceof StringFieldType == false ||
(query != null && query.getClass() != MatchAllDocsQuery.class)) {
return false;
}
return true;
}

@Override
public void close() {
Releasables.close(values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,17 @@ private static boolean checkMatchAllOrRangeQuery(Query query, String fieldName)
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(IndexReader reader, Query query) {
boolean canBeOptimizedBySortedDocs(IndexReader reader, Query query) {
query = extractQuery(query);
if (checkIfSortedDocsIsApplicable(reader, fieldType) == false ||
checkMatchAllOrRangeQuery(query, fieldType.name()) == false) {
return null;
checkMatchAllOrRangeQuery(query, fieldType.name()) == false) {
return false;
}
return true;
}

@Override
SortedDocsProducer createSortedDocsProducerOrNull(Query query) {
final byte[] lowerPoint;
final byte[] upperPoint;
if (query instanceof PointRangeQuery) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.DocIdSetBuilder;
import org.apache.lucene.util.FutureArrays;
Expand Down Expand Up @@ -53,6 +54,36 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue,
// no value for the field
return DocIdSet.EMPTY;
}

DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null;
Visitor visitor = getIntersectVisitor(values, queue, context, builder, false);
try {
values.intersect(visitor);
visitor.flush();
} catch (CollectionTerminatedException exc) {}
return fillDocIdSet ? builder.build() : DocIdSet.EMPTY;
}

@Override
int getStartDocId(CompositeValuesCollectorQueue queue, LeafReaderContext context) throws IOException {
final PointValues values = context.reader().getPointValues(field);
if (values == null) {
return 0;
}
DocIdSetBuilder builder = new DocIdSetBuilder(context.reader().maxDoc(), values, field);
Visitor visitor = getIntersectVisitor(values, queue, context, builder, true);
try {
values.intersect(visitor);
} catch (CollectionTerminatedException exc) {}
int docId = builder.build().iterator().nextDoc();
if (docId != DocIdSetIterator.NO_MORE_DOCS) {
return docId;
}
return 0;
}

Visitor getIntersectVisitor(PointValues values, CompositeValuesCollectorQueue queue, LeafReaderContext context,
DocIdSetBuilder builder, boolean terminateAfterStartDoc) throws IOException {
long lowerBucket = Long.MIN_VALUE;
Comparable lowerValue = queue.getLowerValueLeadSource();
if (lowerValue != null) {
Expand All @@ -70,23 +101,19 @@ DocIdSet processLeaf(Query query, CompositeValuesCollectorQueue queue,
}
upperBucket = (Long) upperValue;
}
DocIdSetBuilder builder = fillDocIdSet ? new DocIdSetBuilder(context.reader().maxDoc(), values, field) : null;
Visitor visitor = new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket);
try {
values.intersect(visitor);
visitor.flush();
} catch (CollectionTerminatedException exc) {}
return fillDocIdSet ? builder.build() : DocIdSet.EMPTY;

return new Visitor(context, queue, builder, values.getBytesPerDimension(), lowerBucket, upperBucket, terminateAfterStartDoc);
}

private class Visitor implements PointValues.IntersectVisitor {
protected class Visitor implements PointValues.IntersectVisitor {
final LeafReaderContext context;
final CompositeValuesCollectorQueue queue;
final DocIdSetBuilder builder;
final int maxDoc;
final int bytesPerDim;
final long lowerBucket;
final long upperBucket;
final boolean terminateAfterStartDoc;

DocIdSetBuilder bucketDocsBuilder;
DocIdSetBuilder.BulkAdder adder;
Expand All @@ -95,7 +122,7 @@ private class Visitor implements PointValues.IntersectVisitor {
boolean first = true;

Visitor(LeafReaderContext context, CompositeValuesCollectorQueue queue, DocIdSetBuilder builder,
int bytesPerDim, long lowerBucket, long upperBucket) {
int bytesPerDim, long lowerBucket, long upperBucket, boolean terminateAfterStartDoc) {
this.context = context;
this.maxDoc = context.reader().maxDoc();
this.queue = queue;
Expand All @@ -104,6 +131,7 @@ private class Visitor implements PointValues.IntersectVisitor {
this.upperBucket = upperBucket;
this.bucketDocsBuilder = new DocIdSetBuilder(maxDoc);
this.bytesPerDim = bytesPerDim;
this.terminateAfterStartDoc = terminateAfterStartDoc;
}

@Override
Expand Down Expand Up @@ -143,6 +171,11 @@ public void visit(int docID, byte[] packedValue) throws IOException {
first = false;
adder.add(docID);
remaining --;

if (terminateAfterStartDoc) {
builder.grow(1).add(docID);
throw new CollectionTerminatedException();
}
}

@Override
Expand Down
Loading