Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport fix of reduceRandom fix #32508

Merged
merged 1 commit into from
Jul 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.search.aggregations.support.ValuesSourceParserHelper;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.internal.SearchContext;
import org.joda.time.DateTimeZone;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -68,6 +69,29 @@ public class AutoDateHistogramAggregationBuilder
PARSER.declareInt(AutoDateHistogramAggregationBuilder::setNumBuckets, NUM_BUCKETS_FIELD);
}

/**
*
* Build roundings, computed dynamically as roundings are time zone dependent.
* The current implementation probably should not be invoked in a tight loop.
* @return Array of RoundingInfo
*/
static RoundingInfo[] buildRoundings(DateTimeZone timeZone) {
RoundingInfo[] roundings = new RoundingInfo[6];
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE, timeZone),
1000L, 1, 5, 10, 30);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR, timeZone),
60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY, timeZone),
60 * 60 * 1000L, 1, 3, 12);
roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH, timeZone),
24 * 60 * 60 * 1000L, 1, 7);
roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR, timeZone),
30 * 24 * 60 * 60 * 1000L, 1, 3);
roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY, timeZone),
365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100);
return roundings;
}

public static AutoDateHistogramAggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException {
return PARSER.parse(parser, new AutoDateHistogramAggregationBuilder(aggregationName), null);
}
Expand Down Expand Up @@ -121,14 +145,7 @@ public int getNumBuckets() {
@Override
protected ValuesSourceAggregatorFactory<Numeric, ?> innerBuild(SearchContext context, ValuesSourceConfig<Numeric> config,
AggregatorFactory<?> parent, Builder subFactoriesBuilder) throws IOException {
RoundingInfo[] roundings = new RoundingInfo[6];
roundings[0] = new RoundingInfo(createRounding(DateTimeUnit.SECOND_OF_MINUTE), 1000L, 1, 5, 10, 30);
roundings[1] = new RoundingInfo(createRounding(DateTimeUnit.MINUTES_OF_HOUR), 60 * 1000L, 1, 5, 10, 30);
roundings[2] = new RoundingInfo(createRounding(DateTimeUnit.HOUR_OF_DAY), 60 * 60 * 1000L, 1, 3, 12);
roundings[3] = new RoundingInfo(createRounding(DateTimeUnit.DAY_OF_MONTH), 24 * 60 * 60 * 1000L, 1, 7);
roundings[4] = new RoundingInfo(createRounding(DateTimeUnit.MONTH_OF_YEAR), 30 * 24 * 60 * 60 * 1000L, 1, 3);
roundings[5] = new RoundingInfo(createRounding(DateTimeUnit.YEAR_OF_CENTURY), 365 * 24 * 60 * 60 * 1000L, 1, 5, 10, 20, 50, 100);

RoundingInfo[] roundings = buildRoundings(timeZone());
int maxRoundingInterval = Arrays.stream(roundings,0, roundings.length-1)
.map(rounding -> rounding.innerIntervals)
.flatMapToInt(Arrays::stream)
Expand All @@ -153,10 +170,10 @@ public int getNumBuckets() {
return new AutoDateHistogramAggregatorFactory(name, config, numBuckets, roundings, context, parent, subFactoriesBuilder, metaData);
}

private Rounding createRounding(DateTimeUnit interval) {
private static Rounding createRounding(DateTimeUnit interval, DateTimeZone timeZone) {
Rounding.Builder tzRoundingBuilder = Rounding.builder(interval);
if (timeZone() != null) {
tzRoundingBuilder.timeZone(timeZone());
if (timeZone != null) {
tzRoundingBuilder.timeZone(timeZone);
}
Rounding rounding = tzRoundingBuilder.build();
return rounding;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,8 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
return new BucketReduceResult(list, roundingInfo, roundingIdx);
}

private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, RoundingInfo[] roundings) {
private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx,
RoundingInfo[] roundings) {
if (roundingIdx == roundings.length - 1) {
return roundingIdx;
}
Expand Down Expand Up @@ -509,7 +510,8 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
pipelineAggregators(), getMetaData());
}

private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult, ReduceContext reduceContext) {
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult reducedBucketsResult,
ReduceContext reduceContext) {
List<Bucket> buckets = reducedBucketsResult.buckets;
RoundingInfo roundingInfo = reducedBucketsResult.roundingInfo;
int roundingIdx = reducedBucketsResult.roundingIdx;
Expand Down Expand Up @@ -539,7 +541,7 @@ private BucketReduceResult mergeConsecutiveBuckets(List<Bucket> reducedBuckets,
key = roundingInfo.rounding.round(bucket.key);
}
reduceContext.consumeBucketsAndMaybeBreak(-countInnerBucket(bucket) - 1);
sameKeyedBuckets.add(createBucket(key, bucket.docCount, bucket.aggregations));
sameKeyedBuckets.add(new Bucket(Math.round(key), bucket.docCount, format, bucket.aggregations));
}
if (sameKeyedBuckets.isEmpty() == false) {
reduceContext.consumeBucketsAndMaybeBreak(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.elasticsearch.search.aggregations.bucket.histogram;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.rounding.DateTimeUnit;
import org.elasticsearch.common.rounding.Rounding;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
Expand Down Expand Up @@ -51,21 +49,15 @@ public class InternalAutoDateHistogramTests extends InternalMultiBucketAggregati
public void setUp() throws Exception {
super.setUp();
format = randomNumericDocValueFormat();

roundingInfos = new RoundingInfo[6];
roundingInfos[0] = new RoundingInfo(Rounding.builder(DateTimeUnit.SECOND_OF_MINUTE).build(), 1, 5, 10, 30);
roundingInfos[1] = new RoundingInfo(Rounding.builder(DateTimeUnit.MINUTES_OF_HOUR).build(), 1, 5, 10, 30);
roundingInfos[2] = new RoundingInfo(Rounding.builder(DateTimeUnit.HOUR_OF_DAY).build(), 1, 3, 12);
roundingInfos[3] = new RoundingInfo(Rounding.builder(DateTimeUnit.DAY_OF_MONTH).build(), 1, 7);
roundingInfos[4] = new RoundingInfo(Rounding.builder(DateTimeUnit.MONTH_OF_YEAR).build(), 1, 3);
roundingInfos[5] = new RoundingInfo(Rounding.builder(DateTimeUnit.YEAR_OF_CENTURY).build(), 1, 10, 20, 50, 100);
}

@Override
protected InternalAutoDateHistogram createTestInstance(String name,
List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData,
InternalAggregations aggregations) {

roundingInfos = AutoDateHistogramAggregationBuilder.buildRoundings(null);
int nbBuckets = randomNumberOfBuckets();
int targetBuckets = randomIntBetween(1, nbBuckets * 2 + 1);
List<InternalAutoDateHistogram.Bucket> buckets = new ArrayList<>(nbBuckets);
Expand All @@ -81,6 +73,7 @@ protected InternalAutoDateHistogram createTestInstance(String name,
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
BucketInfo bucketInfo = new BucketInfo(roundingInfos, randomIntBetween(0, roundingInfos.length - 1), subAggregations);


return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
}

Expand All @@ -92,13 +85,50 @@ protected void assertReduced(InternalAutoDateHistogram reduced, List<InternalAut
roundingIdx = histogram.getBucketInfo().roundingIdx;
}
}
Map<Long, Long> expectedCounts = new TreeMap<>();
for (Histogram histogram : inputs) {
RoundingInfo roundingInfo = roundingInfos[roundingIdx];

long lowest = Long.MAX_VALUE;
long highest = 0;
for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
expectedCounts.compute(roundingInfos[roundingIdx].rounding.round(((DateTime) bucket.getKey()).getMillis()),
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + bucket.getDocCount());
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
if (bucketKey < lowest) {
lowest = bucketKey;
}
if (bucketKey > highest) {
highest = bucketKey;
}
}
}
long normalizedDuration = (highest - lowest) / roundingInfo.getRoughEstimateDurationMillis();
long innerIntervalToUse = 0;
for (int interval : roundingInfo.innerIntervals) {
if (normalizedDuration / interval < maxNumberOfBuckets()) {
innerIntervalToUse = interval;
}
}
Map<Long, Long> expectedCounts = new TreeMap<>();
long intervalInMillis = innerIntervalToUse*roundingInfo.getRoughEstimateDurationMillis();
for (long keyForBucket = roundingInfo.rounding.round(lowest);
keyForBucket <= highest;
keyForBucket = keyForBucket + intervalInMillis) {
expectedCounts.put(keyForBucket, 0L);

for (InternalAutoDateHistogram histogram : inputs) {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
long bucketKey = ((DateTime) bucket.getKey()).getMillis();
long roundedBucketKey = roundingInfo.rounding.round(bucketKey);
if (roundedBucketKey >= keyForBucket
&& roundedBucketKey < keyForBucket + intervalInMillis) {
long count = bucket.getDocCount();
expectedCounts.compute(keyForBucket,
(key, oldValue) -> (oldValue == null ? 0 : oldValue) + count);
}
}
}
}


Map<Long, Long> actualCounts = new TreeMap<>();
for (Histogram.Bucket bucket : reduced.getBuckets()) {
actualCounts.compute(((DateTime) bucket.getKey()).getMillis(),
Expand Down