Skip to content

Commit

Permalink
Support counts+values in exponential_histogram doc
Browse files Browse the repository at this point in the history
  • Loading branch information
axw committed Dec 20, 2023
1 parent 80732a7 commit 795fb94
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentSubParser;
import org.elasticsearch.xpack.exponentialhistogram.agg.InternalExponentialHistogram;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -68,6 +70,10 @@ public class ExponentialHistogramFieldMapper extends FieldMapper {
public static final ParseField NEGATIVE_FIELD = new ParseField("negative");
public static final ParseField OFFSET_FIELD = new ParseField("offset");
public static final ParseField COUNTS_FIELD = new ParseField("counts");
// Documents may specify either counts and values, or counts with an optional offset.
// If values are not specified, then the bucket values are implied by the offset
// count indices.
public static final ParseField VALUES_FIELD = new ParseField("values");

private static class ExponentialHistogramFieldType extends MappedFieldType {
ExponentialHistogramFieldType(String name, Map<String, String> meta) {
Expand Down Expand Up @@ -307,10 +313,10 @@ private Field parseExponentialHistogramField(XContentParser parser) throws Excep
haveScale = true;
} else if (fieldName.equals(POSITIVE_FIELD.getPreferredName())) {
token = subParser.nextToken();
positive = ExponentialHistogramBuckets.parse(subParser);
positive = ExponentialHistogramBuckets.parse(subParser, false);
} else if (fieldName.equals(NEGATIVE_FIELD.getPreferredName())) {
token = subParser.nextToken();
negative = ExponentialHistogramBuckets.parse(subParser);
negative = ExponentialHistogramBuckets.parse(subParser, true);
} else {
throw new DocumentParsingException(
subParser.getTokenLocation(),
Expand Down Expand Up @@ -340,10 +346,16 @@ private Field parseExponentialHistogramField(XContentParser parser) throws Excep
+ "] to be specified and non-empty"
);
}
if (positive != null && positive.values != null) {
positive.aggregateValues(scale);
}
if (negative != null && negative.values != null) {
negative.aggregateValues(scale);
}
return encodeBinaryDocValuesField(name(), scale, negative, positive);
}

protected static BinaryDocValuesField encodeBinaryDocValuesField(String name, int scale, ExponentialHistogramBuckets negative, ExponentialHistogramBuckets positive) throws IOException {
private static BinaryDocValuesField encodeBinaryDocValuesField(String name, int scale, ExponentialHistogramBuckets negative, ExponentialHistogramBuckets positive) throws IOException {
final int numPositiveCounts = positive == null ? 0 : positive.counts.size();
final int numNegativeCounts = negative == null ? 0 : negative.counts.size();

Expand Down Expand Up @@ -382,20 +394,54 @@ protected static BinaryDocValuesField encodeBinaryDocValuesField(String name, in
protected static class ExponentialHistogramBuckets {
int offset;
List<Long> counts;
final List<Double> values; // absolute values

ExponentialHistogramBuckets(int offset, List<Long> counts) {
ExponentialHistogramBuckets(int offset, List<Long> counts, List<Double> values) {
this.offset = offset;
this.counts = counts;
this.values = values;
}

static ExponentialHistogramBuckets parse(XContentParser parser) throws Exception {
// aggregateValues aggregates values and counts into exponential buckets with
// the given scale, updating counts and offset.
void aggregateValues(final int scale) {
InternalExponentialHistogram histogram = new InternalExponentialHistogram(
"name", Integer.MAX_VALUE, scale, DocValueFormat.RAW, Map.of()
);
for (int i = 0; i < counts.size(); i++) {
final long count = counts.get(i);
final double value = values.get(i);
histogram.add(value, count);
}
counts.clear();

List<InternalExponentialHistogram.Bucket> buckets = histogram.getBuckets();
InternalExponentialHistogram.Bucket lastBucket = null;
for (InternalExponentialHistogram.Bucket bucket : buckets) {
if (lastBucket != null) {
// buckets may have holes, we need to fill these with zeroes for the offset+counts representation.
for (int indexDelta = bucket.getIndex() - lastBucket.getIndex(); indexDelta > 1; --indexDelta) {
counts.add(0L);
}
}
counts.add(bucket.getCount());
lastBucket = bucket;
}
offset = buckets.get(0).getIndex();

System.out.println("aggregated: offset=" + offset + ", numCounts=" + counts.size());
}

static ExponentialHistogramBuckets parse(final XContentParser parser, final boolean negative) throws Exception {
XContentParser.Token token = parser.currentToken();
if (token == XContentParser.Token.VALUE_NULL) {
return null;
}

int offset = 0;
boolean haveOffset = false;
ArrayList<Long> counts = null;
ArrayList<Double> values = null;

ensureExpectedToken(XContentParser.Token.START_OBJECT, token, parser);
token = parser.nextToken();
Expand All @@ -406,6 +452,7 @@ static ExponentialHistogramBuckets parse(XContentParser parser) throws Exception
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
offset = parser.intValue();
haveOffset = true;
} else if (fieldName.equals(COUNTS_FIELD.getPreferredName())) {
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
Expand All @@ -423,6 +470,33 @@ static ExponentialHistogramBuckets parse(XContentParser parser) throws Exception
counts.add(count);
token = parser.nextToken();
}
} else if (fieldName.equals(VALUES_FIELD.getPreferredName())) {
token = parser.nextToken();
ensureExpectedToken(XContentParser.Token.START_ARRAY, token, parser);
values = new ArrayList<>();
token = parser.nextToken();
while (token != XContentParser.Token.END_ARRAY) {
ensureExpectedToken(XContentParser.Token.VALUE_NUMBER, token, parser);
double value = parser.doubleValue();
if (negative) {
if (value >= 0) {
throw new DocumentParsingException(
parser.getTokenLocation(),
"error parsing negative exponential histogram values, value must be < 0 but got " + value
);
}
value = -value;
} else {
if (value <= 0) {
throw new DocumentParsingException(
parser.getTokenLocation(),
"error parsing positive exponential histogram values, value must be > 0 but got " + value
);
}
}
values.add(value);
token = parser.nextToken();
}
} else {
throw new DocumentParsingException(
parser.getTokenLocation(),
Expand All @@ -438,8 +512,26 @@ static ExponentialHistogramBuckets parse(XContentParser parser) throws Exception
"error parsing exponential histogram buckets, expected field called [" + COUNTS_FIELD.getPreferredName() + "]"
);
}
if (values != null) {
if (haveOffset) {
throw new DocumentParsingException(
parser.getTokenLocation(),
"error parsing exponential histogram buckets, [" +
OFFSET_FIELD.getPreferredName() + "] and [" +
VALUES_FIELD.getPreferredName() + "] are mutually exclusive"
);
}
if (counts.size() != values.size()) {
throw new DocumentParsingException(
parser.getTokenLocation(),
"error parsing exponential histogram buckets, [" +
COUNTS_FIELD.getPreferredName() + "] and [" +
VALUES_FIELD.getPreferredName() + "] have different sizes"
);
}
}

return new ExponentialHistogramBuckets(offset, counts);
return new ExponentialHistogramBuckets(offset, counts, values);
}
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class InternalExponentialHistogram extends InternalAggregation {
private SortedMap<Integer, Long> positive;
private SortedMap<Integer, Long> negative;

InternalExponentialHistogram(
public InternalExponentialHistogram(
String name,
int maxBuckets,
int maxScale,
Expand Down Expand Up @@ -112,6 +112,9 @@ public void add(final double value, final long count) {
// TODO(axw)
throw new RuntimeException("zero_count not implemented");
}
if (count == 0) {
return;
}
SortedMap<Integer, Long> counts;
if (value < 0) {
if (negative == null) {
Expand All @@ -126,11 +129,12 @@ public void add(final double value, final long count) {
}

final int index = indexer.computeIndex(Math.abs(value));
System.out.println("index="+ index + ", value=" + value);
final Long existing = counts.get(index);
if (existing != null) {
counts.put(index, existing+count);
} else {
// NOTE(axw) maxBuckets is maintained independently for the positive and negative ranges,
// NOTE(axw) maxBuckets is maintained independently for the positive and negative ranges
// for simplicity. Having maxBuckets that applies to both ranges simultaneously is harder
// to reason about when downsampling. e.g. consider what it would mean for a histogram with
// maxBuckets that initially has only positive values, and then a negative value is recorded.
Expand Down Expand Up @@ -169,14 +173,18 @@ private SortedMap<Integer, Long> downscaleCounts(final int scaleReduction, final
private int getScaleReduction(final int index, final SortedMap<Integer, Long> counts) {
long newStart = Math.min(index, counts.firstKey());
long newEnd = Math.max(index, counts.lastKey());
System.out.println("getScaleReduction: " + index + ", start/end=" + newStart + ", " + newEnd);
return getScaleReduction(newStart, newEnd);
}

private int getScaleReduction(long newStart, long newEnd) {
int scaleReduction = 0;
while (newEnd - newStart + 1 > maxBuckets) {
newStart >>= 1;
newEnd >>= 1;
// TODO(axw) is this correct? The original code shifted both
// newStart and newEnd to the right, which ends up shifting
// negative values to zero.
long delta = newEnd - newStart + 1;
while (delta > maxBuckets) {
delta >>= 1;
scaleReduction++;
}
return scaleReduction;
Expand Down Expand Up @@ -267,6 +275,10 @@ public static class Bucket {
this.count = count;
}

public int getIndex() {
return index;
}

// lowerBound returns the lower bound of the exponential histogram bucket.
//
// For buckets in the positive range the lower bound is exclusive, while
Expand Down
Loading

0 comments on commit 795fb94

Please sign in to comment.