Skip to content

Commit

Permalink
Collects more buckets than needed on shards
Browse files Browse the repository at this point in the history
This gives us more options at reduce time in terms of how we do the
final merge of the buckeets to produce the final result
  • Loading branch information
colings86 committed Mar 13, 2018
1 parent 678802a commit 993c782
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class AutoDateHistogramAggregator extends DeferableBucketAggregator {
private LongHash bucketOrds;
private int targetBuckets;
private MergingBucketsDeferringCollector deferringCollector;
private int numCollectedValues = 0;

AutoDateHistogramAggregator(String name, AggregatorFactories factories, int numBuckets, Rounding[] roundings,
@Nullable ValuesSource.Numeric valuesSource, DocValueFormat formatter, SearchContext aggregationContext, Aggregator parent,
Expand Down Expand Up @@ -109,6 +110,7 @@ public void collect(int doc, long bucket) throws IOException {
long previousRounded = Long.MIN_VALUE;
for (int i = 0; i < valuesCount; ++i) {
long value = values.nextValue();
numCollectedValues++;
long rounded = roundings[roundingIdx].round(value);
assert rounded >= previousRounded;
if (rounded == previousRounded) {
Expand All @@ -120,7 +122,8 @@ public void collect(int doc, long bucket) throws IOException {
collectExistingBucket(sub, doc, bucketOrd);
} else {
collectBucket(sub, doc, bucketOrd);
while (bucketOrds.size() > targetBuckets) {
double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numCollectedValues));
while (bucketOrds.size() > maxBuckets) {
increaseRounding();
}
}
Expand Down Expand Up @@ -179,14 +182,15 @@ public InternalAggregation buildAggregation(long owningBucketOrdinal) throws IOE
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx,
buildEmptySubAggregations());

return new InternalAutoDateHistogram(name, buckets, targetBuckets, emptyBucketInfo, formatter, pipelineAggregators(), metaData());
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numCollectedValues, emptyBucketInfo, formatter,
pipelineAggregators(), metaData());
}

@Override
public InternalAggregation buildEmptyAggregation() {
InternalAutoDateHistogram.BucketInfo emptyBucketInfo = new InternalAutoDateHistogram.BucketInfo(roundings, roundingIdx,
buildEmptySubAggregations());
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, emptyBucketInfo, formatter,
return new InternalAutoDateHistogram(name, Collections.emptyList(), targetBuckets, numCollectedValues, emptyBucketInfo, formatter,
pipelineAggregators(), metaData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

/**
* Implementation of {@link Histogram}.
Expand Down Expand Up @@ -207,15 +208,17 @@ public int hashCode() {
private final DocValueFormat format;
private final BucketInfo bucketInfo;
private final int targetBuckets;
private final long numValuesCollected;


InternalAutoDateHistogram(String name, List<Bucket> buckets, int targetBuckets, BucketInfo emptyBucketInfo, DocValueFormat formatter,
List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
InternalAutoDateHistogram(String name, List<Bucket> buckets, int targetBuckets, long numValuesCollected, BucketInfo emptyBucketInfo,
DocValueFormat formatter, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
this.buckets = buckets;
this.bucketInfo = emptyBucketInfo;
this.format = formatter;
this.targetBuckets = targetBuckets;
this.numValuesCollected = numValuesCollected;
}

/**
Expand All @@ -227,6 +230,7 @@ public InternalAutoDateHistogram(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
buckets = in.readList(stream -> new Bucket(stream, format));
this.targetBuckets = in.readVInt();
this.numValuesCollected = in.readVLong();
}

@Override
Expand All @@ -235,6 +239,7 @@ protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeList(buckets);
out.writeVInt(targetBuckets);
out.writeVLong(numValuesCollected);
}

@Override
Expand All @@ -255,13 +260,18 @@ public int getTargetBuckets() {
return targetBuckets;
}

public long getNumValuesCollected() {
return numValuesCollected;
}

public BucketInfo getBucketInfo() {
return bucketInfo;
}

@Override
public InternalAutoDateHistogram create(List<Bucket> buckets) {
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators(), metaData);
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(),
metaData);
}

@Override
Expand Down Expand Up @@ -365,7 +375,8 @@ private BucketReduceResult mergeBucketsIfNeeded(List<Bucket> reducedBuckets, int
return new BucketReduceResult(reducedBuckets, reduceRounding, reduceRoundingIdx);
}

private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding, ReduceContext reduceContext) {
private List<Bucket> mergeBuckets(List<Bucket> reducedBuckets, Rounding reduceRounding,
ReduceContext reduceContext) {
List<Bucket> mergedBuckets = new ArrayList<>();

List<Bucket> sameKeyedBuckets = new ArrayList<>();
Expand Down Expand Up @@ -409,12 +420,13 @@ private static class BucketReduceResult {
}
}

private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, ReduceContext reduceContext) {
private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, long numValuesCollected, ReduceContext reduceContext) {
List<Bucket> list = currentResult.buckets;
if (list.isEmpty()) {
return currentResult;
}
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, currentResult.roundingIdx,
double maxBuckets = Math.max(targetBuckets, targetBuckets * Math.log(numValuesCollected));
int roundingIdx = getAppropriateRounding(list.get(0).key, list.get(list.size() - 1).key, maxBuckets, currentResult.roundingIdx,
bucketInfo.roundings);
Rounding rounding = bucketInfo.roundings[roundingIdx];
// merge buckets using the new rounding
Expand Down Expand Up @@ -443,7 +455,7 @@ private BucketReduceResult addEmptyBuckets(BucketReduceResult currentResult, Red
return new BucketReduceResult(list, rounding, roundingIdx);
}

private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Rounding[] roundings) {
private int getAppropriateRounding(long minKey, long maxKey, double maxBuckets, int roundingIdx, Rounding[] roundings) {
if (roundingIdx == roundings.length - 1) {
return roundingIdx;
}
Expand All @@ -458,18 +470,21 @@ private int getAppropriateRounding(long minKey, long maxKey, int roundingIdx, Ro
currentKey = currentRounding.nextRoundingValue(currentKey);
}
currentRoundingIdx++;
} while (requiredBuckets > targetBuckets && currentRoundingIdx < roundings.length);
} while (requiredBuckets > maxBuckets && currentRoundingIdx < roundings.length);
// The loop will increase past the correct rounding index here so we
// need to subtract one to get the rounding index we need
return currentRoundingIdx - 1;
}

@Override
public InternalAggregation doReduce(List<InternalAggregation> aggregations, ReduceContext reduceContext) {
long numValuesCollected = aggregations.stream()
.collect(Collectors.summingLong(agg -> ((InternalAutoDateHistogram) agg).getNumValuesCollected()));

BucketReduceResult reducedBucketsResult = reduceBuckets(aggregations, reduceContext);

// adding empty buckets if needed
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, reduceContext);
reducedBucketsResult = addEmptyBuckets(reducedBucketsResult, numValuesCollected, reduceContext);

// Adding empty buckets may have tipped us over the target so merge the buckets again if needed
reducedBucketsResult = mergeBucketsIfNeeded(reducedBucketsResult.buckets, reducedBucketsResult.roundingIdx,
Expand All @@ -478,7 +493,7 @@ public InternalAggregation doReduce(List<InternalAggregation> aggregations, Redu
BucketInfo bucketInfo = new BucketInfo(this.bucketInfo.roundings, reducedBucketsResult.roundingIdx,
this.bucketInfo.emptySubAggregations);

return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, bucketInfo, format,
return new InternalAutoDateHistogram(getName(), reducedBucketsResult.buckets, targetBuckets, numValuesCollected, bucketInfo, format,
pipelineAggregators(), getMetaData());
}

Expand Down Expand Up @@ -512,7 +527,8 @@ public InternalAggregation createAggregation(List<MultiBucketsAggregation.Bucket
buckets2.add((Bucket) b);
}
buckets2 = Collections.unmodifiableList(buckets2);
return new InternalAutoDateHistogram(name, buckets2, targetBuckets, bucketInfo, format, pipelineAggregators(), getMetaData());
return new InternalAutoDateHistogram(name, buckets2, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators(),
getMetaData());
}

@Override
Expand All @@ -524,6 +540,8 @@ public Bucket createBucket(Number key, long docCount, InternalAggregations aggre
protected boolean doEquals(Object obj) {
InternalAutoDateHistogram that = (InternalAutoDateHistogram) obj;
return Objects.equals(buckets, that.buckets)
&& Objects.equals(targetBuckets, that.targetBuckets)
&& Objects.equals(numValuesCollected, that.numValuesCollected)
&& Objects.equals(format, that.format)
&& Objects.equals(bucketInfo, that.bucketInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testMatchAllDocs() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset,
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD),
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
histogram -> assertEquals(6, histogram.getBuckets().size())
);
testSearchAndReduceCase(query, dataset,
Expand All @@ -82,7 +82,7 @@ public void testSubAggregations() throws IOException {
Query query = new MatchAllDocsQuery();

testSearchCase(query, dataset,
aggregation -> aggregation.setNumBuckets(6).field(DATE_FIELD)
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD)
.subAggregation(AggregationBuilders.stats("stats").field(DATE_FIELD)),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
Expand Down Expand Up @@ -231,7 +231,7 @@ public void testAggregateWrongField() throws IOException {

public void testIntervalYear() throws IOException {
testBothCases(LongPoint.newRangeQuery(INSTANT_FIELD, asLong("2015-01-01"), asLong("2017-12-31")), dataset,
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(3, buckets.size());
Expand All @@ -254,7 +254,7 @@ public void testIntervalYear() throws IOException {
public void testIntervalMonth() throws IOException {
testBothCases(new MatchAllDocsQuery(),
Arrays.asList("2017-01-01", "2017-02-02", "2017-02-03", "2017-03-04", "2017-03-05", "2017-03-06"),
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(3, buckets.size());
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testIntervalHour() throws IOException {
"2017-02-01T16:48:00.000Z",
"2017-02-01T16:59:00.000Z"
),
aggregation -> aggregation.setNumBuckets(8).field(DATE_FIELD),
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(6, buckets.size());
Expand Down Expand Up @@ -441,7 +441,7 @@ public void testIntervalMinute() throws IOException {
"2017-02-01T09:16:04.000Z",
"2017-02-01T09:16:42.000Z"
),
aggregation -> aggregation.setNumBuckets(4).field(DATE_FIELD),
aggregation -> aggregation.setNumBuckets(3).field(DATE_FIELD),
histogram -> {
List<? extends Histogram.Bucket> buckets = histogram.getBuckets();
assertEquals(3, buckets.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ protected InternalAutoDateHistogram createTestInstance(String name,
}
InternalAggregations subAggregations = new InternalAggregations(Collections.emptyList());
BucketInfo bucketInfo = new BucketInfo(roundings, randomIntBetween(0, roundings.length - 1), subAggregations);

return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
long numValuesCollected = randomNonNegativeLong();
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators,
metaData);
}

@Override
Expand Down Expand Up @@ -121,10 +122,11 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins
String name = instance.getName();
List<InternalAutoDateHistogram.Bucket> buckets = instance.getBuckets();
int targetBuckets = instance.getTargetBuckets();
long numValuesCollected = instance.getNumValuesCollected();
BucketInfo bucketInfo = instance.getBucketInfo();
List<PipelineAggregator> pipelineAggregators = instance.pipelineAggregators();
Map<String, Object> metaData = instance.getMetaData();
switch (between(0, 3)) {
switch (between(0, 5)) {
case 0:
name += randomAlphaOfLength(5);
break;
Expand All @@ -145,9 +147,16 @@ protected InternalAutoDateHistogram mutateInstance(InternalAutoDateHistogram ins
}
metaData.put(randomAlphaOfLength(15), randomInt());
break;
case 4:
targetBuckets += between(1, 100);
break;
case 5:
numValuesCollected += between(1, 100);
break;
default:
throw new AssertionError("Illegal randomisation branch");
}
return new InternalAutoDateHistogram(name, buckets, targetBuckets, bucketInfo, format, pipelineAggregators, metaData);
return new InternalAutoDateHistogram(name, buckets, targetBuckets, numValuesCollected, bucketInfo, format, pipelineAggregators,
metaData);
}
}

0 comments on commit 993c782

Please sign in to comment.