Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sub aggregations on filter rewrite optimization #15253

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,111 @@ setup:
- match: { profile.shards.0.aggregations.0.debug.unoptimized_segments: 0 }
- match: { profile.shards.0.aggregations.0.debug.leaf_visited: 1 }
- match: { profile.shards.0.aggregations.0.debug.inner_visited: 0 }

---
"Complex aggregation with range, auto_date_histogram, and metric aggregations":
- do:
indices.create:
index: tmin_metrics
body:
mappings:
properties:
"@timestamp":
type: date
metrics:
properties:
size:
type: float
tmin:
type: float

- do:
bulk:
refresh: true
index: tmin_metrics
body:
- '{"index":{}}'
- '{"@timestamp": "2023-01-01T00:00:00Z", "metrics": {"size": -15, "tmin": -20}}'
- '{"index":{}}'
- '{"@timestamp": "2023-01-02T00:00:00Z", "metrics": {"size": 5, "tmin": 0}}'
- '{"index":{}}'
- '{"@timestamp": "2023-01-03T00:00:00Z", "metrics": {"size": 26, "tmin": 10}}'
- '{"index":{}}'
- '{"@timestamp": "2023-01-04T00:00:00Z", "metrics": {"size": 50, "tmin": 15}}'
- '{"index":{}}'
- '{"@timestamp": "2023-01-05T00:00:00Z", "metrics": {"size": 77, "tmin": 20}}'
- '{"index":{}}'
- '{"@timestamp": "2023-02-15T00:00:00Z", "metrics": {"size": 88, "tmin": 33}}'
- '{"index":{}}'
- '{"@timestamp": "2023-01-06T00:00:00Z", "metrics": {"size": 123, "tmin": 25}}'

- do:
search:
index: tmin_metrics
body:
size: 0
aggs:
tmax:
range:
field: metrics.size
ranges:
- to: 10
- from: 10
to: 100
- from: 100

aggs:
date:
auto_date_histogram:
field: "@timestamp"
buckets: 20
aggs:
tmin:
min:
field: metrics.tmin
tavg:
avg:
field: metrics.size
tmax:
max:
field: metrics.size

- match: { hits.total.value: 7 }
- length: { aggregations.tmax.buckets: 3 }

- match: { aggregations.tmax.buckets.0.key: "*-10.0" }
- match: { aggregations.tmax.buckets.0.doc_count: 2 }
- length: { aggregations.tmax.buckets.0.date.buckets: 9 }

- match: { aggregations.tmax.buckets.0.date.buckets.0.doc_count: 1 }
- match: { aggregations.tmax.buckets.0.date.buckets.0.tmin.value: -20.0 }
- match: { aggregations.tmax.buckets.0.date.buckets.0.tavg.value: -15.0 }
- match: { aggregations.tmax.buckets.0.date.buckets.0.tmax.value: -15.0 }

- match: { aggregations.tmax.buckets.0.date.buckets.8.doc_count: 1 }
- match: { aggregations.tmax.buckets.0.date.buckets.8.tmin.value: 0.0 }
- match: { aggregations.tmax.buckets.0.date.buckets.8.tavg.value: 5.0 }
- match: { aggregations.tmax.buckets.0.date.buckets.8.tmax.value: 5.0 }

- match: { aggregations.tmax.buckets.1.key: "10.0-100.0" }
- match: { aggregations.tmax.buckets.1.doc_count: 4 }
- length: { aggregations.tmax.buckets.1.date.buckets: 7 }

- match: { aggregations.tmax.buckets.1.date.buckets.0.doc_count: 3 }
- match: { aggregations.tmax.buckets.1.date.buckets.0.tmin.value: 10.0 }
- match: { aggregations.tmax.buckets.1.date.buckets.0.tavg.value: 51.0 }
- match: { aggregations.tmax.buckets.1.date.buckets.0.tmax.value: 77.0 }

- match: { aggregations.tmax.buckets.1.date.buckets.6.doc_count: 1 }
- match: { aggregations.tmax.buckets.1.date.buckets.6.tmin.value: 33.0 }
- match: { aggregations.tmax.buckets.1.date.buckets.6.tavg.value: 88.0 }
- match: { aggregations.tmax.buckets.1.date.buckets.6.tmax.value: 88.0 }

- match: { aggregations.tmax.buckets.2.key: "100.0-*" }
- match: { aggregations.tmax.buckets.2.doc_count: 1 }
- length: { aggregations.tmax.buckets.2.date.buckets: 1 }

- match: { aggregations.tmax.buckets.2.date.buckets.0.doc_count: 1 }
- match: { aggregations.tmax.buckets.2.date.buckets.0.tmin.value: 25.0 }
- match: { aggregations.tmax.buckets.2.date.buckets.0.tavg.value: 123.0 }
- match: { aggregations.tmax.buckets.2.date.buckets.0.tmax.value: 123.0 }
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.search.aggregations.bucket.composite;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
Expand Down Expand Up @@ -75,6 +76,7 @@
import org.opensearch.search.aggregations.bucket.BucketsAggregator;
import org.opensearch.search.aggregations.bucket.filterrewrite.CompositeAggregatorBridge;
import org.opensearch.search.aggregations.bucket.filterrewrite.FilterRewriteOptimizationContext;
import org.opensearch.search.aggregations.bucket.filterrewrite.PackedValueRanges;
import org.opensearch.search.aggregations.bucket.missing.MissingOrder;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.internal.SearchContext;
Expand All @@ -89,7 +91,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -213,16 +214,25 @@ protected long[] processAfterKey(long[] bounds, long interval) {
}

@Override
protected int getSize() {
protected int rangeMax() {
return size;
}

@Override
protected Function<Long, Long> bucketOrdProducer() {
return (key) -> bucketOrds.add(0, getRoundingPrepared().round((long) key));
protected long getOrd(int rangeIdx, PackedValueRanges ranges) {
assert(ranges != null);
long rangeStart = LongPoint.decodeDimension(ranges.getLower(rangeIdx), 0);
rangeStart = this.getFieldType().convertNanosToMillis(rangeStart);
long ord = bucketOrds.add(0, getRoundingPrepared().round(rangeStart));

if (ord < 0) { // already seen
ord = -1 - ord;
}

return ord;
}
};
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, subAggregators.length, context);
filterRewriteOptimizationContext = new FilterRewriteOptimizationContext(bridge, parent, context);
}

@Override
Expand Down Expand Up @@ -557,7 +567,12 @@ private void processLeafFromQuery(LeafReaderContext ctx, Sort indexSortPrefix) t

@Override
protected LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBucketCollector sub) throws IOException {
boolean optimized = filterRewriteOptimizationContext.tryOptimize(ctx, this::incrementBucketDocCount, segmentMatchAll(context, ctx));
boolean optimized = filterRewriteOptimizationContext.tryOptimize(
ctx,
this::incrementBucketDocCount,
sub,
segmentMatchAll(context, ctx)
);
if (optimized) throw new CollectionTerminatedException();

finishLeaf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.search.aggregations.LeafBucketCollector;

import java.io.IOException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;

/**
* This interface provides a bridge between an aggregator and the optimization context, allowing
* the aggregator to provide data and optimize the aggregation process.
Expand All @@ -37,9 +40,9 @@ public abstract class AggregatorBridge {
*/
MappedFieldType fieldType;

Consumer<Ranges> setRanges;
Consumer<PackedValueRanges> setRanges;

void setRangesConsumer(Consumer<Ranges> setRanges) {
void setRangesConsumer(Consumer<PackedValueRanges> setRanges) {
this.setRanges = setRanges;
}

Expand Down Expand Up @@ -67,18 +70,66 @@ void setRangesConsumer(Consumer<Ranges> setRanges) {
*
* @param leaf the leaf reader context for the segment
*/
abstract Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;
abstract PackedValueRanges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException;

/**
* @return max range to stop collecting at.
* Utilized by aggs which stop early.
*/
protected int rangeMax() {
return Integer.MAX_VALUE;
}

/**
* Translate an index of the packed value range array to an agg bucket ordinal.
*/
protected long getOrd(int rangeIdx, PackedValueRanges ranges) {
return rangeIdx;
}

/**
* Attempts to build aggregation results for a segment
* Attempts to build aggregation results for a segment.
* With no sub agg count docs and avoid iterating docIds.
* If a sub agg is present we must iterate through and collect docIds to support it.
*
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
* @param ranges
* @param values the point values (index structure for numeric values) for a segment
* @param incrementDocCount a consumer to increment the document count for a range bucket. The First parameter is document count, the second is the key of the bucket
*/
abstract FilterRewriteOptimizationContext.DebugInfo tryOptimize(
public final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException;
PackedValueRanges ranges,
final LeafBucketCollector sub
) throws IOException {
PointTreeTraversal.RangeAwareIntersectVisitor treeVisitor;

if (sub != null) {
treeVisitor = new PointTreeTraversal.DocCollectRangeAwareIntersectVisitor(
values.getPointTree(),
ranges,
rangeMax(),
(activeIndex, docID) -> {
long ord = this.getOrd(activeIndex, ranges);
try {
incrementDocCount.accept(ord, (long) 1);
sub.collect(docID, ord);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}
);
} else {
treeVisitor = new PointTreeTraversal.DocCountRangeAwareIntersectVisitor(
values.getPointTree(),
ranges,
rangeMax(),
(activeIndex, docCount) -> {
long ord = this.getOrd(activeIndex, ranges);
incrementDocCount.accept(ord, (long) docCount);
}
);
}

return multiRangesTraverse(treeVisitor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

package org.opensearch.search.aggregations.bucket.filterrewrite;

import org.apache.lucene.document.LongPoint;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PointValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.opensearch.common.Rounding;
Expand All @@ -22,10 +20,6 @@

import java.io.IOException;
import java.util.OptionalLong;
import java.util.function.BiConsumer;
import java.util.function.Function;

import static org.opensearch.search.aggregations.bucket.filterrewrite.PointTreeTraversal.multiRangesTraverse;

/**
* For date histogram aggregation
Expand Down Expand Up @@ -54,12 +48,12 @@ protected void buildRanges(SearchContext context) throws IOException {
}

@Override
final Ranges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException {
final PackedValueRanges tryBuildRangesFromSegment(LeafReaderContext leaf) throws IOException {
long[] bounds = Helper.getSegmentBounds(leaf, fieldType.name());
return buildRanges(bounds, maxRewriteFilters);
}

private Ranges buildRanges(long[] bounds, int maxRewriteFilters) {
private PackedValueRanges buildRanges(long[] bounds, int maxRewriteFilters) {
bounds = processHardBounds(bounds);
if (bounds == null) {
return null;
Expand Down Expand Up @@ -116,47 +110,11 @@ protected long[] processHardBounds(long[] bounds, LongBounds hardBounds) {
return bounds;
}

private DateFieldMapper.DateFieldType getFieldType() {
public DateFieldMapper.DateFieldType getFieldType() {
assert fieldType instanceof DateFieldMapper.DateFieldType;
return (DateFieldMapper.DateFieldType) fieldType;
}

protected int getSize() {
return Integer.MAX_VALUE;
}

@Override
final FilterRewriteOptimizationContext.DebugInfo tryOptimize(
PointValues values,
BiConsumer<Long, Long> incrementDocCount,
Ranges ranges
) throws IOException {
int size = getSize();

DateFieldMapper.DateFieldType fieldType = getFieldType();
BiConsumer<Integer, Integer> incrementFunc = (activeIndex, docCount) -> {
long rangeStart = LongPoint.decodeDimension(ranges.lowers[activeIndex], 0);
rangeStart = fieldType.convertNanosToMillis(rangeStart);
long bucketOrd = getBucketOrd(bucketOrdProducer().apply(rangeStart));
incrementDocCount.accept(bucketOrd, (long) docCount);
};

return multiRangesTraverse(values.getPointTree(), ranges, incrementFunc, size);
}

private static long getBucketOrd(long bucketOrd) {
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}

return bucketOrd;
}

/**
* Provides a function to produce bucket ordinals from the lower bound of the range
*/
protected abstract Function<Long, Long> bucketOrdProducer();

/**
* Checks whether the top level query matches all documents on the segment
*
Expand Down
Loading
Loading