diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java index 268b87250c9a0..bba468f44f12c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java @@ -61,6 +61,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator { private LongHash bucketOrds; private int targetBuckets; private MergingBucketsDeferringCollector deferringCollector; + private int numCollectedValues = 0; AutoDateHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, Rounding[] roundings, @Nullable ValuesSource.Numeric valuesSource, DocValueFormat formatter, SearchContext aggregationContext, Aggregator parent, @@ -109,6 +110,7 @@ public void collect(int doc, long bucket) throws IOException { long previousRounded = Long.MIN_VALUE; for (int i = 0; i < valuesCount; ++i) { long value = values.nextValue(); + numCollectedValues++; long rounded = roundings[roundingIdx].round(value); assert rounded >= previousRounded; if (rounded == previousRounded) { @@ -120,7 +122,8 @@ public void collect(int doc, long bucket) throws IOException { collectExistingBucket(sub, doc, bucketOrd); } else { collectBucket(sub, doc, bucketOrd); - while (bucketOrds.size() > targetBuckets) { + double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numCollectedValues)); + while (bucketOrds.size() > maxBuckets) { increaseRounding(); } } @@ -179,14 +182,15 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx, buildEmptySubAggregations()); - return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, pipelineAggregators(), metaData()); + return new InternalAutoDateHistogram(name, buckets, targetBuckets, numCollectedValues, emptyBucketInfo, formatter, + pipelineAggregators(), metaData()); } @Override public InternalAggregation buildEmptyAggregation() { InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx, buildEmptySubAggregations()); - return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter, + return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, numCollectedValues, emptyBucketInfo, formatter, pipelineAggregators(), metaData()); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java index fcfbd2c66d12b..30b23d2166008 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -43,6 +43,7 @@ import java.util.ListIterator; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; /** * Implementation of {@link Histogram}. @@ -207,15 +208,17 @@ public int hashCode() { private final DocValueFormat format; private final BucketInfo bucketInfo; private final int targetBuckets; + private final long numValuesCollected; - InternalAutoDateHistogram(String name, List buckets, int targetBuckets, BucketInfo emptyBucketInfo, DocValueFormat formatter, - List pipelineAggregators, Map metaData) { + InternalAutoDateHistogram(String name, List buckets, int targetBuckets, long numValuesCollected, BucketInfo emptyBucketInfo, + DocValueFormat formatter, List pipelineAggregators, Map metaData) { super(name, pipelineAggregators, metaData); this.buckets = buckets; this.bucketInfo = emptyBucketInfo; this.format = formatter; this.targetBuckets = targetBuckets; + this.numValuesCollected = numValuesCollected; } /** @@ -227,6 +230,7 @@ public InternalAutoDateHistogram(StreamInput in) throws IOException { format = in.readNamedWriteable(DocValueFormat.class); buckets = in.readList(stream -> new Bucket(stream, format)); this.targetBuckets = in.readVInt(); + this.numValuesCollected = in.readVLong(); } @Override @@ -235,6 +239,7 @@ protected void doWriteTo(StreamOutput out) throws IOException { out.writeNamedWriteable(format); out.writeList(buckets); out.writeVInt(targetBuckets); + out.writeVLong(numValuesCollected); } @Override @@ -255,13 +260,18 @@ public int getTargetBuckets() { return targetBuckets; } + public long getNumValuesCollected() { + return numValuesCollected; + } + public BucketInfo getBucketInfo() { return bucketInfo; } @Override public InternalAutoDateHistogram create(List buckets) { - return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators(), metaData); + return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(), + metaData); } @Override @@ -365,7 +375,8 @@ private BucketReduceResult mergeBucketsIfNeeded(List reducedBuckets, int return new BucketReduceResult(reducedBuckets, reduceRounding, reduceRoundingIdx); } - private List mergeBuckets(List reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) { + private List mergeBuckets(List reducedBuckets, Rounding reduceRounding, + ReduceContext reduceContext) { List mergedBuckets = new ArrayList<>(); List sameKeyedBuckets = new ArrayList<>(); @@ -409,12 +420,13 @@ private static class BucketReduceResult { } } - private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) { + private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, long numValuesCollected, ReduceContext reduceContext) { List list = currentResult.buckets; if (list.isEmpty()) { return currentResult; } - int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx, + double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numValuesCollected)); + int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, maxBuckets, currentResult.roundingIdx, bucketInfo.roundings); Rounding rounding = bucketInfo.roundings[roundingIdx]; // merge buckets using the new rounding @@ -443,7 +455,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red return new BucketReduceResult(list, rounding, roundingIdx); } - private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Rounding[] roundings) { + private int getAppropriateRounding(long minKey, long maxKey, double maxBuckets, int roundingIdx, Rounding[] roundings) { if (roundingIdx == roundings.length - 1) { return roundingIdx; } @@ -458,7 +470,7 @@ private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Ro currentKey = currentRounding.nextRoundingValue(currentKey); } currentRoundingIdx++; - } while (requiredBuckets > targetBuckets && currentRoundingIdx < roundings.length); + } while (requiredBuckets > maxBuckets && currentRoundingIdx < roundings.length); // The loop will increase past the correct rounding index here so we // need to subtract one to get the rounding index we need return currentRoundingIdx - 1; @@ -466,10 +478,13 @@ private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Ro @Override public InternalAggregation doReduce(List aggregations, ReduceContext reduceContext) { + long numValuesCollected = aggregations.stream() + .collect(Collectors.summingLong(agg -> ((InternalAutoDateHistogram) agg).getNumValuesCollected())); + BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext); // adding empty buckets if needed - reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext); + reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, numValuesCollected, reduceContext); // Adding empty buckets may have tipped us over the target so merge the buckets again if needed reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx, @@ -478,7 +493,7 @@ public InternalAggregation doReduce(List aggregations, Redu BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundings, reducedBucketsResult.roundingIdx, this.bucketInfo.emptySubAggregations); - return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, bucketInfo, format, + return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(), getMetaData()); } @@ -512,7 +527,8 @@ public InternalAggregation createAggregation(List aggregation.setNumBuckets(6).field(DATE_FIELD), + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD), histogram -> assertEquals(6, histogram.getBuckets().size()) ); testSearchAndReduceCase(query, dataset, @@ -82,7 +82,7 @@ public void testSubAggregations() throws IOException { Query query = new MatchAllDocsQuery(); testSearchCase(query, dataset, - aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD) + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD) .subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)), histogram -> { List buckets = histogram.getBuckets(); @@ -231,7 +231,7 @@ public void testAggregateWrongField() throws IOException { public void testIntervalYear() throws IOException { testBothCases(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset, - aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD), histogram -> { List buckets = histogram.getBuckets(); assertEquals(3, buckets.size()); @@ -254,7 +254,7 @@ public void testIntervalYear() throws IOException { public void testIntervalMonth() throws IOException { testBothCases(new MatchAllDocsQuery(), Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"), - aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD), histogram -> { List buckets = histogram.getBuckets(); assertEquals(3, buckets.size()); @@ -349,7 +349,7 @@ public void testIntervalHour() throws IOException { "2017-02-01T16:48:00.000Z", "2017-02-01T16:59:00.000Z" ), - aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD), + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD), histogram -> { List buckets = histogram.getBuckets(); assertEquals(6, buckets.size()); @@ -441,7 +441,7 @@ public void testIntervalMinute() throws IOException { "2017-02-01T09:16:04.000Z", "2017-02-01T09:16:42.000Z" ), - aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD), + aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD), histogram -> { List buckets = histogram.getBuckets(); assertEquals(3, buckets.size()); diff --git a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java index 5c7eff6bec061..5aa7a7cb1b1d6 100644 --- a/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java +++ b/server/src/test/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalAutoDateHistogramTests.java @@ -79,8 +79,9 @@ protected InternalAutoDateHistogram createTestInstance(String name, } InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList()); BucketInfo bucketInfo = new BucketInfo(roundings, randomIntBetween(0, roundings.length - 1), subAggregations); - - return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData); + long numValuesCollected = randomNonNegativeLong(); + return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators, + metaData); } @Override @@ -121,10 +122,11 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins String name = instance.getName(); List buckets = instance.getBuckets(); int targetBuckets = instance.getTargetBuckets(); + long numValuesCollected = instance.getNumValuesCollected(); BucketInfo bucketInfo = instance.getBucketInfo(); List pipelineAggregators = instance.pipelineAggregators(); Map metaData = instance.getMetaData(); - switch (between(0, 3)) { + switch (between(0, 5)) { case 0: name += randomAlphaOfLength(5); break; @@ -145,9 +147,16 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins } metaData.put(randomAlphaOfLength(15), randomInt()); break; + case 4: + targetBuckets += between(1, 100); + break; + case 5: + numValuesCollected += between(1, 100); + break; default: throw new AssertionError("Illegal randomisation branch"); } - return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData); + return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators, + metaData); } }