Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Latency improvements to Multi Term Aggregations #14993

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.DeferableBucketAggregator;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.support.AggregationPath;
Expand Down Expand Up @@ -215,19 +216,11 @@ public InternalAggregation buildEmptyAggregation() {

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx);
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, this, sub);
return new LeafBucketCollector() {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
for (BytesRef compositeKey : collector.apply(doc)) {
long bucketOrd = bucketOrds.add(owningBucketOrd, compositeKey);
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
}
}
collector.apply(doc, owningBucketOrd);
}
};
}
Expand Down Expand Up @@ -268,12 +261,10 @@ private void collectZeroDocEntriesIfNeeded(long owningBucketOrd) throws IOExcept
}
// we need to fill-in the blanks
for (LeafReaderContext ctx : context.searcher().getTopReaderContext().leaves()) {
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx);
// brute force
MultiTermsValuesSourceCollector collector = multiTermsValue.getValues(ctx, bucketOrds, null, null);
for (int docId = 0; docId < ctx.reader().maxDoc(); ++docId) {
for (BytesRef compositeKey : collector.apply(docId)) {
bucketOrds.add(owningBucketOrd, compositeKey);
}
collector.apply(docId, owningBucketOrd);
}
}
}
Expand All @@ -287,7 +278,8 @@ interface MultiTermsValuesSourceCollector {
* Collect a list values of multi_terms on each doc.
* Each terms could have multi_values, so the result is the cartesian product of each term's values.
*/
List<BytesRef> apply(int doc) throws IOException;
void apply(int doc, long owningBucketOrd) throws IOException;

expani marked this conversation as resolved.
Show resolved Hide resolved
}

@FunctionalInterface
Expand Down Expand Up @@ -361,47 +353,72 @@ public MultiTermsValuesSource(List<InternalValuesSource> valuesSources) {
this.valuesSources = valuesSources;
}

public MultiTermsValuesSourceCollector getValues(LeafReaderContext ctx) throws IOException {
public MultiTermsValuesSourceCollector getValues(
LeafReaderContext ctx,
BytesKeyedBucketOrds bucketOrds,
BucketsAggregator aggregator,
LeafBucketCollector sub
) throws IOException {
List<InternalValuesSourceCollector> collectors = new ArrayList<>();
for (InternalValuesSource valuesSource : valuesSources) {
collectors.add(valuesSource.apply(ctx));
}
boolean collectBucketOrds = aggregator != null && sub != null;
return new MultiTermsValuesSourceCollector() {

/**
* This method does the following : <br>
* <li>Fetches the values of every field present in the doc List<List<TermValue<?>>> via @{@link InternalValuesSourceCollector}</li>
* <li>Generates Composite keys from the fetched values for all fields present in the aggregation.</li>
* <li>Adds every composite key to the @{@link BytesKeyedBucketOrds} and Optionally collects them via @{@link BucketsAggregator#collectBucket(LeafBucketCollector, int, long)}</li>
*/
@Override
public List<BytesRef> apply(int doc) throws IOException {
public void apply(int doc, long owningBucketOrd) throws IOException {
// TODO A new list creation can be avoided for every doc.
List<List<TermValue<?>>> collectedValues = new ArrayList<>();
for (InternalValuesSourceCollector collector : collectors) {
collectedValues.add(collector.apply(doc));
}
List<BytesRef> result = new ArrayList<>();
scratch.seek(0);
scratch.writeVInt(collectors.size()); // number of fields per composite key
cartesianProduct(result, scratch, collectedValues, 0);
return result;
generateAndCollectCompositeKeys(collectedValues, 0, owningBucketOrd, doc);
}

/**
* Cartesian product using depth first search.
*
* <p>
* Composite keys are encoded to a {@link BytesRef} in a format compatible with {@link StreamOutput::writeGenericValue},
* but reuses the encoding of the shared prefixes from the previous levels to avoid wasteful work.
* This generates and collects all Composite keys in their buckets by performing a cartesian product <br>
* of all the values in all the fields ( used in agg ) for the given doc recursively.
* @param collectedValues : Values of all fields present in the aggregation for the @doc
* @param index : Points to the field being added to generate the composite key
*/
private void cartesianProduct(
List<BytesRef> compositeKeys,
BytesStreamOutput scratch,
private void generateAndCollectCompositeKeys(
List<List<TermValue<?>>> collectedValues,
int index
int index,
long owningBucketOrd,
int doc
) throws IOException {
if (collectedValues.size() == index) {
compositeKeys.add(BytesRef.deepCopyOf(scratch.bytes().toBytesRef()));
// Avoid performing a deep copy of the composite key by inlining.
long bucketOrd = bucketOrds.add(owningBucketOrd, scratch.bytes().toBytesRef());
if (collectBucketOrds) {
if (bucketOrd < 0) {
bucketOrd = -1 - bucketOrd;
aggregator.collectExistingBucket(sub, doc, bucketOrd);
} else {
aggregator.collectBucket(sub, doc, bucketOrd);
}
}
return;
}

long position = scratch.position();
for (TermValue<?> value : collectedValues.get(index)) {
List<TermValue<?>> values = collectedValues.get(index);
int numIterations = values.size();
// For each loop is not done to reduce the allocations done for Iterator objects
// once for every field in every doc.
for (int i = 0; i < numIterations; i++) {
TermValue<?> value = values.get(i);
value.writeTo(scratch); // encode the value
cartesianProduct(compositeKeys, scratch, collectedValues, index + 1); // dfs
generateAndCollectCompositeKeys(collectedValues, index + 1, owningBucketOrd, doc); // dfs
scratch.seek(position); // backtrack
}
}
Expand Down Expand Up @@ -441,9 +458,14 @@ static InternalValuesSource bytesValuesSource(ValuesSource valuesSource, Include
if (i > 0 && bytes.equals(previous)) {
continue;
}
BytesRef copy = BytesRef.deepCopyOf(bytes);
termValues.add(TermValue.of(copy));
previous = copy;
// Performing a deep copy is not required for field containing only one value.
expani marked this conversation as resolved.
Show resolved Hide resolved
if (valuesCount > 1) {
expani marked this conversation as resolved.
Show resolved Hide resolved
BytesRef copy = BytesRef.deepCopyOf(bytes);
termValues.add(TermValue.of(copy));
previous = copy;
} else {
termValues.add(TermValue.of(bytes));
}
}
return termValues;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,10 @@ public void testDatesFieldFormat() throws IOException {
}

public void testIpAndKeyword() throws IOException {
testAggregation(new MatchAllDocsQuery(), fieldConfigs(asList(KEYWORD_FIELD, IP_FIELD)), NONE_DECORATOR, iw -> {
testAggregation(new MatchAllDocsQuery(), fieldConfigs(asList(KEYWORD_FIELD, IP_FIELD)), multiTermsAggregationBuilder -> {
multiTermsAggregationBuilder.minDocCount(0);
multiTermsAggregationBuilder.size(100);
}, iw -> {
iw.addDocument(
asList(
new SortedDocValuesField(KEYWORD_FIELD, new BytesRef("a")),
Expand Down
Loading