diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a044b0366a..222be0b3623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Support Go 1.19. Include compatibility testing and document support. (#3077) - Upgrade go.opentelemetry.io/proto/otlp from v0.18.0 to v0.19.0 (#3107) +- OpenTelemetry Exponential histogram data structure for public use. (#3022) ### Changed diff --git a/sdk/metric/aggregator/exponential/README.md b/sdk/metric/aggregator/exponential/README.md index 490e1147557..c170c1aa298 100644 --- a/sdk/metric/aggregator/exponential/README.md +++ b/sdk/metric/aggregator/exponential/README.md @@ -2,15 +2,48 @@ ## Design -This document is a placeholder for future Aggregator, once seen in [PR -2393](https://github.com/open-telemetry/opentelemetry-go/pull/2393). +This is a fixed-size data structure for aggregating the OpenTelemetry +base-2 exponential histogram introduced in [OTEP +149](https://github.com/open-telemetry/oteps/blob/main/text/0149-exponential-histogram.md) +and [described in the metrics data +model](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exponentialhistogram). +The exponential histogram data point is characterized by a `scale` +factor that determines resolution. Positive scales correspond with +more resolution, and negatives scales correspond with less resolution. -Only the mapping functions have been made available at this time. The -equations tested here are specified in the [data model for Exponential -Histogram data points](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exponentialhistogram). +Given a maximum size, in terms of the number of buckets, the +implementation determines the best scale possible given the set of +measurements received. The size of the histogram is configured using +the `WithMaxSize()` option, which defaults to 160. + +The implementation here maintains the best resolution possible. Since +the scale parameter is shared by the positive and negative ranges, the +best value of the scale parameter is determined by the range with the +greater difference between minimum and maximum bucket index: + +```golang +func bucketsNeeded(minValue, maxValue float64, scale int32) int32 { + return bucketIndex(maxValue, scale) - bucketIndex(minValue, scale) + 1 +} + +func bucketIndex(value float64, scale int32) int32 { + return math.Log(value) * math.Ldexp(math.Log2E, scale) +} +``` + +The best scale is uniquely determined when `maxSize/2 < +bucketsNeeded(minValue, maxValue, scale) <= maxSize`. This +implementation maintains the best scale by rescaling as needed to stay +within the maximum size. + +## Layout ### Mapping function +The `mapping` sub-package contains the equations specified in the [data +model for Exponential Histogram data +points](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/data-model.md#exponentialhistogram). + There are two mapping functions used, depending on the sign of the scale. Negative and zero scales use the `mapping/exponent` mapping function, which computes the bucket index directly from the bits of @@ -25,3 +58,158 @@ function is used with `0 < scale <= 20`. The maximum scale is selected because at scale 21, simply, it becomes difficult to test correctness--at this point `math.MaxFloat64` maps to index `math.MaxInt32` and the `math/big` logic used in testing breaks down. + +### Data structure + +The `structure` sub-package contains a Histogram aggregator for use by +the OpenTelemetry-Go Metrics SDK as well as OpenTelemetry Collector +receivers, processors, and exporters. + +## Implementation + +The implementation maintains a slice of buckets and grows the array in +size only as necessary given the actual range of values, up to the +maximum size. The structure of a single range of buckets is: + +```golang +type buckets struct { + backing bucketsVarwidth[T] // for T = uint8 | uint16 | uint32 | uint64 + indexBase int32 + indexStart int32 + indexEnd int32 +} +``` + +The `backing` field is a generic slice of `[]uint8`, `[]uint16`, +`[]uint32`, or `[]uint64`. + +The positive and negative backing arrays are independent, so the +maximum space used for `buckets` by one `Aggregator` is twice the +configured maximum size. + +### Backing array + +The backing array is circular. The first observation is counted in +the 0th index of the backing array and the initial bucket number is +stored in `indexBase`. After the initial observation, the backing +array grows in either direction (i.e., larger or smaller bucket +numbers), until rescaling is necessary. This mechanism allows the +histogram to maintain the ideal scale without shifting values inside +the array. + +The `indexStart` and `indexEnd` fields store the current minimum and +maximum bucket number. The initial condition is `indexBase == +indexStart == indexEnd`, representing a single bucket. + +Following the first observation, new observations may fall into a +bucket up to `size-1` in either direction. Growth is possible by +adjusting either `indexEnd` or `indexStart` as long as the constraint +`indexEnd-indexStart < size` remains true. + +Bucket numbers in the range `[indexBase, indexEnd]` are stored in the +interval `[0, indexEnd-indexBase]` of the backing array. Buckets in +the range `[indexStart, indexBase-1]` are stored in the interval +`[size+indexStart-indexBase, size-1]` of the backing array. + +Considering the `aggregation.Buckets` interface, `Offset()` returns +`indexStart`, `Len()` returns `indexEnd-indexStart+1`, and `At()` +locates the correct bucket in the circular array. + +### Determining change of scale + +The algorithm used to determine the (best) change of scale when a new +value arrives is: + +```golang +func newScale(minIndex, maxIndex, scale, maxSize int32) int32 { + return scale - changeScale(minIndex, maxIndex, scale, maxSize) +} + +func changeScale(minIndex, maxIndex, scale, maxSize int32) int32 { + var change int32 + for maxIndex - minIndex >= maxSize { + maxIndex >>= 1 + minIndex >>= 1 + change++ + } + return change +} +``` + +The `changeScale` function is also used to determine how many bits to +shift during `Merge`. + +### Downscale function + +The downscale function rotates the circular backing array so that +`indexStart == indexBase`, using the "3 reversals" method, before +combining the buckets in place. + +### Merge function + +`Merge` first calculates the correct final scale by comparing the +combined positive and negative ranges. The destination aggregator is +then downscaled, if necessary, and the `UpdateByIncr` code path to add +the source buckets to the destination buckets. + +### Scale function + +The `Scale` function returns the current scale of the histogram. + +If the scale is variable and there are no non-zero values in the +histogram, the scale is zero by definition; when there is only a +single value in this case, its scale is MinScale (20) by definition. + +If the scale is fixed because of range limits, the fixed scale will be +returned even for any size histogram. + +### Handling subnormal values + +Subnormal values are those in the range [0x1p-1074, 0x1p-1022), these +being numbers that "gradually underflow" and use less than 52 bits of +precision in the significand at the smallest representable exponent +(i.e., -1022). Subnormal numbers present special challenges for both +the exponent- and logarithm-based mapping function, and to avoid +additional complexity induced by corner cases, subnormal numbers are +rounded up to 0x1p-1022 in this implementation. + +Handling subnormal numbers is difficult for the logarithm mapping +function because Golang's `math.Log()` function rounds subnormal +numbers up to 0x1p-1022. Handling subnormal numbers is difficult for +the exponent mapping function because Golang's `math.Frexp()`, the +natural API for extracting a value's base-2 exponent, also rounds +subnormal numbers up to 0x1p-1022. + +While the additional complexity needed to correctly map subnormal +numbers is small in both cases, there are few real benefits in doing +so because of the inherent loss of precision. As secondary +motivation, clamping values to the range [0x1p-1022, math.MaxFloat64] +increases symmetry. This limit means that minimum bucket index and the +maximum bucket index have similar magnitude, which helps support +greater maximum scale. Supporting numbers smaller than 0x1p-1022 +would mean changing the valid scale interval to [-11,19] compared with +[-10,20]. + +### UpdateByIncr interface + +The OpenTelemetry metrics SDK `Aggregator` type supports an `Update()` +interface which implies updating the histogram by a count of 1. This +implementation also supports `UpdateByIncr()`, which makes it possible +to support counting multiple observations in a single API call. This +extension is useful in applying `Histogram` aggregation to _sampled_ +metric events (e.g. in the [OpenTelemetry statsd +receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/statsdreceiver)). + +Another use for `UpdateByIncr` is in a Span-to-metrics pipeline +following [probability sampling in OpenTelemetry tracing](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/tracestate-probability-sampling.md) (e.g., for computing an exponential +histogram of probability-sampled span duration). + +## Acknowledgements + +This implementation is based on work by [Yuke +Zhuge](https://github.com/yzhuge) and [Otmar +Ertl](https://github.com/oertl). See +[NrSketch](https://github.com/newrelic-experimental/newrelic-sketch-java/blob/1ce245713603d61ba3a4510f6df930a5479cd3f6/src/main/java/com/newrelic/nrsketch/indexer/LogIndexer.java) +and +[DynaHist](https://github.com/dynatrace-oss/dynahist/blob/9a6003fd0f661a9ef9dfcced0b428a01e303805e/src/main/java/com/dynatrace/dynahist/layout/OpenTelemetryExponentialBucketsLayout.java) +repositories for more detail. diff --git a/sdk/metric/aggregator/exponential/structure/config.go b/sdk/metric/aggregator/exponential/structure/config.go new file mode 100644 index 00000000000..dbe63203f3c --- /dev/null +++ b/sdk/metric/aggregator/exponential/structure/config.go @@ -0,0 +1,98 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package structure // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/structure" + +import "fmt" + +// DefaultMaxSize is the default maximum number of buckets per +// positive or negative number range. The value 160 is specified by +// OpenTelemetry--yields a maximum relative error of less than 5% for +// data with contrast 10**5 (e.g., latencies in the range 1ms to 100s). +// See the derivation here: +// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation +const DefaultMaxSize int32 = 160 + +// MinSize is the smallest reasonable configuration, which is small +// enough to contain the entire normal floating point range at +// MinScale. +const MinSize = 2 + +// MaximumMaxSize is an arbitrary limit meant to limit accidental use +// of giant histograms. +const MaximumMaxSize = 16384 + +// Config contains configuration for exponential histogram creation. +type Config struct { + maxSize int32 +} + +// Option is the interface that applies a configuration option. +type Option interface { + // apply sets the Option value of a config. + apply(Config) Config +} + +// WithMaxSize sets the maximum size of each range (positive and/or +// negative) in the histogram. +func WithMaxSize(size int32) Option { + return maxSize(size) +} + +// maxSize is an option to set the maximum histogram size. +type maxSize int32 + +// apply implements Option. +func (ms maxSize) apply(cfg Config) Config { + cfg.maxSize = int32(ms) + return cfg +} + +// NewConfig returns an exponential histogram configuration with +// defaults and limits applied. +func NewConfig(opts ...Option) Config { + var cfg Config + for _, opt := range opts { + cfg = opt.apply(cfg) + } + return cfg +} + +// Validate returns true for valid configurations. +func (c Config) Valid() bool { + _, err := c.Validate() + return err == nil +} + +// Validate returns the nearest valid Config object to the input and a +// boolean indicating whether the the input was a valid +// configurations. +func (c Config) Validate() (Config, error) { + if c.maxSize >= MinSize && c.maxSize <= MaximumMaxSize { + return c, nil + } + if c.maxSize == 0 { + c.maxSize = DefaultMaxSize + return c, nil + } + err := fmt.Errorf("invalid histogram size: %d", c.maxSize) + if c.maxSize < 0 { + c.maxSize = DefaultMaxSize + } else if c.maxSize < MinSize { + c.maxSize = MinSize + } else if c.maxSize > MaximumMaxSize { + c.maxSize = MaximumMaxSize + } + return c, err +} diff --git a/sdk/metric/aggregator/exponential/structure/config_test.go b/sdk/metric/aggregator/exponential/structure/config_test.go new file mode 100644 index 00000000000..f412ccb1ba9 --- /dev/null +++ b/sdk/metric/aggregator/exponential/structure/config_test.go @@ -0,0 +1,33 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package structure // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/structure" + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestConfigValid(t *testing.T) { + require.True(t, Config{}.Valid()) + require.True(t, NewConfig().Valid()) + require.True(t, NewConfig(WithMaxSize(MinSize)).Valid()) + require.True(t, NewConfig(WithMaxSize(MaximumMaxSize)).Valid()) + require.True(t, NewConfig(WithMaxSize((MinSize+MaximumMaxSize)/2)).Valid()) + + require.False(t, NewConfig(WithMaxSize(-1)).Valid()) + require.False(t, NewConfig(WithMaxSize(1<<20)).Valid()) + require.False(t, NewConfig(WithMaxSize(1)).Valid()) +} diff --git a/sdk/metric/aggregator/exponential/structure/exponential.go b/sdk/metric/aggregator/exponential/structure/exponential.go new file mode 100644 index 00000000000..6040c520c40 --- /dev/null +++ b/sdk/metric/aggregator/exponential/structure/exponential.go @@ -0,0 +1,699 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package structure // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/structure" + +import ( + "fmt" + + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping" + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping/exponent" + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping/logarithm" +) + +type ( + // Histogram observes counts observations in exponentially-spaced + // buckets. It is configured with a maximum scale factor + // which determines resolution. Scale is automatically + // adjusted to accommodate the range of input data. + // + // Note that the generic type `N` determines the type of the + // Sum, Min, and Max fields. Bucket boundaries are handled in + // floating point regardless of the type of N. + Histogram[N ValueType] struct { + // maxSize is the maximum capacity of the positive and + // negative ranges. it is set by Init(), preserved by + // Copy and Move. + maxSize int32 + + // sum is the sum of all Updates reflected in the + // aggregator. It has the same type number as the + // corresponding sdkinstrument.Descriptor. + sum N + // count is incremented by 1 per Update. + count uint64 + // zeroCount is incremented by 1 when the measured + // value is exactly 0. + zeroCount uint64 + // min is set when count > 0 + min N + // max is set when count > 0 + max N + // positive holds the positive values + positive Buckets + // negative holds the negative values in these buckets + // by their absolute value. + negative Buckets + // mapping corresponds to the current scale, is shared + // by both positive and negative ranges. + mapping mapping.Mapping + } + + // Buckets stores counts for measurement values in the range + // (0, +Inf). + Buckets struct { + // backing is a slice of nil, []uint8, []uint16, []uint32, or []uint64 + backing bucketsBacking + + // The term "index" refers to the number of the + // histogram bucket used to determine its boundaries. + // The lower-boundary of a bucket is determined by + // formula base**index and the upper-boundary of a + // bucket is base**(index+1). Index values are signed + // to account for values less than or equal to 1. + // + // Note that the width of this field is determined by + // the field being stated as int32 in the OTLP + // protocol. The meaning of this field can be + // extended to wider types, however this it would + // would be an extremely high-resolution histogram. + + // indexBase is index of the 0th position in the + // backing array, i.e., backing[0] is the count + // in the bucket with index `indexBase`. + indexBase int32 + + // indexStart is the smallest index value represented + // in the backing array. + indexStart int32 + + // indexEnd is the largest index value represented in + // the backing array. + indexEnd int32 + } + + // ValueType is an interface constraint for the numeric type + // aggregated by this histogram. + ValueType interface { + int64 | float64 + } + + // bucketsCount are the possible backing array widths. + bucketsCount interface { + uint8 | uint16 | uint32 | uint64 + } + + // bucketsVarwidth is a variable-width slice of unsigned int counters. + bucketsVarwidth[N bucketsCount] struct { + counts []N + } + + // bucketsBacking is implemented by bucektsVarwidth[N]. + bucketsBacking interface { + // size returns the physical size of the backing + // array, which is >= buckets.Len() the number allocated. + // + // Note this is logically an unsigned quantity, + // however it creates fewer type conversions in the + // code with this as int32, because: (a) this is not + // allowed to grow to outside the range of a signed + // int32, and (b) this is frequently involved in + // arithmetic with signed index values. + size() int32 + // growTo grows a backing array and copies old entries + // into their correct new positions. + growTo(newSize, oldPositiveLimit, newPositiveLimit int32) + // reverse reverse the items in a backing array in the + // range [from, limit). + reverse(from, limit int32) + // emptyBucket empties the count from a bucket, for + // moving into another. + emptyBucket(src int32) uint64 + // tryIncrement increments a bucket by `incr`, returns + // false if the result would overflow the current + // backing width. + tryIncrement(bucketIndex int32, incr uint64) bool + // countAt returns the count in a specific bucket. + countAt(pos uint32) uint64 + // reset resets all buckets to zero count. + reset() + } + + // highLow is used to establish the maximum range of bucket + // indices needed, in order to establish the best value of the + // scale parameter. + highLow struct { + low int32 + high int32 + } + + // Int64 is an integer-valued histogram. + Int64 = Histogram[int64] + + // Float64 is a float64-valued histogram. + Float64 = Histogram[float64] +) + +// Init initializes a new histogram. +func (h *Histogram[N]) Init(cfg Config) { + cfg, _ = cfg.Validate() + + h.maxSize = cfg.maxSize + + m, _ := newMapping(logarithm.MaxScale) + h.mapping = m +} + +// Sum implements aggregation.Histogram. +func (h *Histogram[N]) Sum() N { + return h.sum +} + +// Min implements aggregation.Histogram. +func (h *Histogram[N]) Min() N { + return h.min +} + +// Max implements aggregation.Histogram. +func (h *Histogram[N]) Max() N { + return h.max +} + +// Count implements aggregation.Histogram. +func (h *Histogram[N]) Count() uint64 { + return h.count +} + +// Scale implements aggregation.Histogram. +func (h *Histogram[N]) Scale() int32 { + if h.count == h.zeroCount { + // all zeros! scale doesn't matter, use zero. + return 0 + } + return h.mapping.Scale() +} + +// ZeroCount implements aggregation.Histogram. +func (h *Histogram[N]) ZeroCount() uint64 { + return h.zeroCount +} + +// Positive implements aggregation.Histogram. +func (h *Histogram[N]) Positive() *Buckets { + return &h.positive +} + +// Negative implements aggregation.Histogram. +func (h *Histogram[N]) Negative() *Buckets { + return &h.negative +} + +// Offset implements aggregation.Bucket. +func (b *Buckets) Offset() int32 { + return b.indexStart +} + +// Len implements aggregation.Bucket. +func (b *Buckets) Len() uint32 { + if b.backing == nil { + return 0 + } + if b.indexEnd == b.indexStart && b.At(0) == 0 { + return 0 + } + return uint32(b.indexEnd - b.indexStart + 1) +} + +// At returns the count of the bucket at a position in the logical +// array of counts. +func (b *Buckets) At(pos0 uint32) uint64 { + pos := pos0 + bias := uint32(b.indexBase - b.indexStart) + + if pos < bias { + pos += uint32(b.backing.size()) + } + pos -= bias + + return b.backing.countAt(pos) +} + +// Clear resets a histogram to the empty state without changing +// backing array. +func (h *Histogram[N]) Clear() { + h.positive.clear() + h.negative.clear() + h.sum = 0 + h.count = 0 + h.zeroCount = 0 + h.min = 0 + h.max = 0 + h.mapping, _ = newMapping(logarithm.MaxScale) +} + +// clear zeros the backing array. +func (b *Buckets) clear() { + b.indexStart = 0 + b.indexEnd = 0 + b.indexBase = 0 + if b.backing != nil { + b.backing.reset() + } +} + +func newMapping(scale int32) (mapping.Mapping, error) { + if scale <= 0 { + return exponent.NewMapping(scale) + } + return logarithm.NewMapping(scale) +} + +// Swap exchanges the contents of `h` and `dest`. +func (h *Histogram[N]) Swap(dest *Histogram[N]) { + *dest, *h = *h, *dest +} + +// CopyInto copies `h` into `dest`. +func (h *Histogram[N]) CopyInto(dest *Histogram[N]) { + dest.Clear() + dest.MergeFrom(h) +} + +// Update supports updating a histogram with a single count. +func (h *Histogram[N]) Update(number N) { + h.UpdateByIncr(number, 1) +} + +// UpdateByIncr supports updating a histogram with a non-negative +// increment. +func (h *Histogram[N]) UpdateByIncr(number N, incr uint64) { + value := float64(number) + + // Maintain min and max + if h.count == 0 { + h.min = number + h.max = number + } else { + if number < h.min { + h.min = number + } + if number > h.max { + h.max = number + } + } + + // Note: Not checking for overflow here. TODO. + h.count += incr + + if value == 0 { + h.zeroCount += incr + return + } + + // Sum maintains the original type, otherwise we use the floating point value. + h.sum += number * N(incr) + + var b *Buckets + if value > 0 { + b = &h.positive + } else { + value = -value + b = &h.negative + } + + h.update(b, value, incr) +} + +// downscale subtracts `change` from the current mapping scale. +func (h *Histogram[N]) downscale(change int32) { + if change == 0 { + return + } + if change < 0 { + panic(fmt.Sprint("impossible change of scale", change)) + } + newScale := h.mapping.Scale() - change + + h.positive.downscale(change) + h.negative.downscale(change) + var err error + h.mapping, err = newMapping(newScale) + if err != nil { + panic(fmt.Sprint("impossible scale", newScale)) + } +} + +// changeScale computes how much downscaling is needed by shifting the +// high and low values until they are separated by no more than size. +func changeScale(hl highLow, size int32) int32 { + var change int32 + for hl.high-hl.low >= size { + hl.high >>= 1 + hl.low >>= 1 + change++ + } + return change +} + +// update increments the appropriate buckets for a given absolute +// value by the provided increment. +func (h *Histogram[N]) update(b *Buckets, value float64, incr uint64) { + index := h.mapping.MapToIndex(value) + + hl, success := h.incrementIndexBy(b, index, incr) + if success { + return + } + + h.downscale(changeScale(hl, h.maxSize)) + + index = h.mapping.MapToIndex(value) + if _, success := h.incrementIndexBy(b, index, incr); !success { + panic("downscale logic error") + } +} + +// incrementIndexBy determines if the index lies inside the current range +// [indexStart, indexEnd] and, if not, returns the minimum size (up to +// maxSize) will satisfy the new value. +func (h *Histogram[N]) incrementIndexBy(b *Buckets, index int32, incr uint64) (highLow, bool) { + if incr == 0 { + // Skipping a bunch of work for 0 increment. This + // happens when merging sparse data, for example. + // This also happens UpdateByIncr is used with a 0 + // increment, means it can be safely skipped. + return highLow{}, true + } + if b.Len() == 0 { + if b.backing == nil { + b.backing = &bucketsVarwidth[uint8]{ + counts: []uint8{0}, + } + } + b.indexStart = index + b.indexEnd = b.indexStart + b.indexBase = b.indexStart + } else if index < b.indexStart { + if span := b.indexEnd - index; span >= h.maxSize { + // rescale needed: mapped value to the right + return highLow{ + low: index, + high: b.indexEnd, + }, false + } else if span >= b.backing.size() { + h.grow(b, span+1) + } + b.indexStart = index + } else if index > b.indexEnd { + if span := index - b.indexStart; span >= h.maxSize { + // rescale needed: mapped value to the left + return highLow{ + low: b.indexStart, + high: index, + }, false + } else if span >= b.backing.size() { + h.grow(b, span+1) + } + b.indexEnd = index + } + + bucketIndex := index - b.indexBase + if bucketIndex < 0 { + bucketIndex += b.backing.size() + } + b.incrementBucket(bucketIndex, incr) + return highLow{}, true +} + +// powTwoRoundedUp computes the next largest power-of-two, which +// ensures power-of-two slices are allocated. +func powTwoRoundedUp(v int32) int32 { + // The following expression computes the least power-of-two + // that is >= v. There are a number of tricky ways to + // do this, see https://stackoverflow.com/questions/466204/rounding-up-to-next-power-of-2 + // + // One equivalent expression: + // + // v = int32(1) << (32 - bits.LeadingZeros32(uint32(v-1))) + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} + +// grow resizes the backing array by doubling in size up to maxSize. +// this extends the array with a bunch of zeros and copies the +// existing counts to the same position. +func (h *Histogram[N]) grow(b *Buckets, needed int32) { + size := b.backing.size() + bias := b.indexBase - b.indexStart + oldPositiveLimit := size - bias + newSize := powTwoRoundedUp(needed) + if newSize > h.maxSize { + newSize = h.maxSize + } + newPositiveLimit := newSize - bias + b.backing.growTo(newSize, oldPositiveLimit, newPositiveLimit) +} + +// downscale first rotates, then collapses 2**`by`-to-1 buckets. +func (b *Buckets) downscale(by int32) { + b.rotate() + + size := 1 + b.indexEnd - b.indexStart + each := int64(1) << by + inpos := int32(0) + outpos := int32(0) + + for pos := b.indexStart; pos <= b.indexEnd; { + mod := int64(pos) % each + if mod < 0 { + mod += each + } + for i := mod; i < each && inpos < size; i++ { + b.relocateBucket(outpos, inpos) + inpos++ + pos++ + } + outpos++ + } + + b.indexStart >>= by + b.indexEnd >>= by + b.indexBase = b.indexStart +} + +// rotate shifts the backing array contents so that indexStart == +// indexBase to simplify the downscale logic. +func (b *Buckets) rotate() { + bias := b.indexBase - b.indexStart + + if bias == 0 { + return + } + + // Rotate the array so that indexBase == indexStart + b.indexBase = b.indexStart + + b.backing.reverse(0, b.backing.size()) + b.backing.reverse(0, bias) + b.backing.reverse(bias, b.backing.size()) +} + +// relocateBucket adds the count in counts[src] to counts[dest] and +// resets count[src] to zero. +func (b *Buckets) relocateBucket(dest, src int32) { + if dest == src { + return + } + + b.incrementBucket(dest, b.backing.emptyBucket(src)) +} + +// incrementBucket increments the backing array index by `incr`. +func (b *Buckets) incrementBucket(bucketIndex int32, incr uint64) { + for { + if b.backing.tryIncrement(bucketIndex, incr) { + return + } + + switch bt := b.backing.(type) { + case *bucketsVarwidth[uint8]: + b.backing = widenBuckets[uint8, uint16](bt) + case *bucketsVarwidth[uint16]: + b.backing = widenBuckets[uint16, uint32](bt) + case *bucketsVarwidth[uint32]: + b.backing = widenBuckets[uint32, uint64](bt) + case *bucketsVarwidth[uint64]: + // Problem. The exponential histogram has overflowed a uint64. + // However, this shouldn't happen because the total count would + // overflow first. + panic("bucket overflow must be avoided") + } + } +} + +// Merge combines data from `o` into `h`. +func (h *Histogram[N]) MergeFrom(o *Histogram[N]) { + if h.count == 0 { + h.min = o.min + h.max = o.max + } else if o.count != 0 { + if o.min < h.min { + h.min = o.min + } + if o.max > h.max { + h.max = o.max + } + } + + // Note: Not checking for overflow here. TODO. + h.sum += o.sum + h.count += o.count + h.zeroCount += o.zeroCount + + minScale := int32min(h.Scale(), o.Scale()) + + hlp := h.highLowAtScale(&h.positive, minScale) + hlp = hlp.with(o.highLowAtScale(&o.positive, minScale)) + + hln := h.highLowAtScale(&h.negative, minScale) + hln = hln.with(o.highLowAtScale(&o.negative, minScale)) + + minScale = int32min( + minScale-changeScale(hlp, h.maxSize), + minScale-changeScale(hln, h.maxSize), + ) + + h.downscale(h.Scale() - minScale) + + h.mergeBuckets(&h.positive, o, &o.positive, minScale) + h.mergeBuckets(&h.negative, o, &o.negative, minScale) +} + +// mergeBuckets translates index values from another histogram into +// the corresponding buckets of this histogram. +func (h *Histogram[N]) mergeBuckets(mine *Buckets, other *Histogram[N], theirs *Buckets, scale int32) { + theirOffset := theirs.Offset() + theirChange := other.Scale() - scale + + for i := uint32(0); i < theirs.Len(); i++ { + _, success := h.incrementIndexBy( + mine, + (theirOffset+int32(i))>>theirChange, + theirs.At(i), + ) + if !success { + panic("incorrect merge scale") + } + } +} + +// highLowAtScale is an accessory for Merge() to calculate ideal combined scale. +func (h *Histogram[N]) highLowAtScale(b *Buckets, scale int32) highLow { + if b.Len() == 0 { + return highLow{ + low: 0, + high: -1, + } + } + shift := h.Scale() - scale + return highLow{ + low: b.indexStart >> shift, + high: b.indexEnd >> shift, + } +} + +// with is an accessory for Merge() to calculate ideal combined scale. +func (h *highLow) with(o highLow) highLow { + if o.empty() { + return *h + } + if h.empty() { + return o + } + return highLow{ + low: int32min(h.low, o.low), + high: int32max(h.high, o.high), + } +} + +// empty indicates whether there are any values in a highLow. +func (h *highLow) empty() bool { + return h.low > h.high +} + +func int32min(a, b int32) int32 { + if a < b { + return a + } + return b +} + +func int32max(a, b int32) int32 { + if a > b { + return a + } + return b +} + +// bucketsVarwidth[] +// +// Each of the methods below is generic with respect to the underlying +// backing array. See the interface-level comments. + +func (b *bucketsVarwidth[N]) countAt(pos uint32) uint64 { + return uint64(b.counts[pos]) +} + +func (b *bucketsVarwidth[N]) reset() { + for i := range b.counts { + b.counts[i] = 0 + } +} + +func (b *bucketsVarwidth[N]) size() int32 { + return int32(len(b.counts)) +} + +func (b *bucketsVarwidth[N]) growTo(newSize, oldPositiveLimit, newPositiveLimit int32) { + tmp := make([]N, newSize) + copy(tmp[newPositiveLimit:], b.counts[oldPositiveLimit:]) + copy(tmp[0:oldPositiveLimit], b.counts[0:oldPositiveLimit]) + b.counts = tmp +} + +func (b *bucketsVarwidth[N]) reverse(from, limit int32) { + num := ((from + limit) / 2) - from + for i := int32(0); i < num; i++ { + b.counts[from+i], b.counts[limit-i-1] = b.counts[limit-i-1], b.counts[from+i] + } +} + +func (b *bucketsVarwidth[N]) emptyBucket(src int32) uint64 { + tmp := b.counts[src] + b.counts[src] = 0 + return uint64(tmp) +} + +func (b *bucketsVarwidth[N]) tryIncrement(bucketIndex int32, incr uint64) bool { + var limit = uint64(N(0) - 1) + if uint64(b.counts[bucketIndex])+incr <= limit { + b.counts[bucketIndex] += N(incr) + return true + } + return false +} + +func widenBuckets[From, To bucketsCount](in *bucketsVarwidth[From]) *bucketsVarwidth[To] { + tmp := make([]To, len(in.counts)) + for i := range in.counts { + tmp[i] = To(in.counts[i]) + } + return &bucketsVarwidth[To]{counts: tmp} +} diff --git a/sdk/metric/aggregator/exponential/structure/exponential_test.go b/sdk/metric/aggregator/exponential/structure/exponential_test.go new file mode 100644 index 00000000000..7a786ae5303 --- /dev/null +++ b/sdk/metric/aggregator/exponential/structure/exponential_test.go @@ -0,0 +1,853 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package structure // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/structure" + +import ( + "fmt" + "math" + "math/rand" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping" + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping/exponent" + "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/mapping/logarithm" +) + +const ( + plusOne = 1 + minusOne = -1 +) + +type printableBucket struct { + index int32 + count uint64 + lower float64 +} + +func (h *Histogram[N]) printBuckets(b *Buckets) (r []printableBucket) { + for i := uint32(0); i < b.Len(); i++ { + lower, _ := h.mapping.LowerBoundary(b.Offset() + int32(i)) + r = append(r, printableBucket{ + index: b.Offset() + int32(i), + count: b.At(i), + lower: lower, + }) + } + return r +} + +func getCounts(b *Buckets) (r []uint64) { + for i := uint32(0); i < b.Len(); i++ { + r = append(r, b.At(i)) + } + return r +} + +func (b printableBucket) String() string { + return fmt.Sprintf("%v=%v(%.2g)", b.index, b.count, b.lower) +} + +// requireEqual is a helper used to require that two aggregators +// should have equal contents. Because the backing array is cyclic, +// the two may are expected to have different underlying +// representations. This method is more useful than RequireEqualValues +// for debugging the internals, because it prints numeric boundaries. +func requireEqual(t *testing.T, a, b *Histogram[float64]) { + aSum := a.Sum() + bSum := b.Sum() + if aSum == 0 || bSum == 0 { + require.InDelta(t, aSum, bSum, 1e-6) + } else { + require.InEpsilon(t, aSum, bSum, 1e-6) + } + require.Equal(t, a.Count(), b.Count()) + require.Equal(t, a.ZeroCount(), b.ZeroCount()) + require.Equal(t, a.Scale(), b.Scale()) + + bstr := func(data *Buckets) string { + var sb strings.Builder + sb.WriteString(fmt.Sprintln("[@", data.Offset())) + for i := uint32(0); i < data.Len(); i++ { + sb.WriteString(fmt.Sprintln(data.At(i))) + } + sb.WriteString("]\n") + return sb.String() + } + require.Equal(t, bstr(&a.positive), bstr(&b.positive), "positive %v %v", a.printBuckets(&a.positive), a.printBuckets(&b.positive)) + require.Equal(t, bstr(&a.negative), bstr(&b.negative), "negative %v %v", a.printBuckets(&a.negative), a.printBuckets(&b.negative)) +} + +// centerVal returns the midpoint of the histogram bucket with index +// `x`, used in tests to avoid rounding errors that happen near the +// bucket boundaries. +func centerVal(mapper mapping.Mapping, x int32) float64 { + lb, err1 := mapper.LowerBoundary(x) + ub, err2 := mapper.LowerBoundary(x + 1) + if err1 != nil || err2 != nil { + panic(fmt.Sprintf("unexpected errors: %v %v", err1, err2)) + } + return (lb + ub) / 2 +} + +// Tests insertion of [2, 4, 1]. The index of 2 (i.e., 0) becomes +// `indexBase`, the 4 goes to its right and the 1 goes in the last +// position of the backing array. With 3 binary orders of magnitude +// and MaxSize=4, this must finish with scale=0; with minimum value 1 +// this must finish with offset=-1 (all scales). +func TestAlternatingGrowth1(t *testing.T) { + agg := NewFloat64(NewConfig(WithMaxSize(4))) + agg.Update(2) + agg.Update(4) + agg.Update(1) + + require.Equal(t, int32(-1), agg.Positive().Offset()) + require.Equal(t, int32(0), agg.Scale()) + require.Equal(t, []uint64{1, 1, 1}, getCounts(agg.Positive())) +} + +// Tests insertion of [2, 2, 4, 1, 8, 0.5]. The test proceeds as +// above but then downscales once further to scale=-1, thus index -1 +// holds range [0.25, 1.0), index 0 holds range [1.0, 4), index 1 +// holds range [4, 16). +func TestAlternatingGrowth2(t *testing.T) { + agg := NewFloat64(NewConfig(WithMaxSize(4))) + agg.Update(2) + agg.Update(2) + agg.Update(2) + agg.Update(1) + agg.Update(8) + agg.Update(0.5) + + require.Equal(t, int32(-1), agg.Positive().Offset()) + require.Equal(t, int32(-1), agg.Scale()) + require.Equal(t, []uint64{2, 3, 1}, getCounts(agg.Positive())) +} + +// Tests that every permutation of {1/2, 1, 2} with maxSize=2 results +// in the same scale=-1 histogram. +func TestScaleNegOneCentered(t *testing.T) { + for j, order := range [][]float64{ + {1, 0.5, 2}, + {1, 2, 0.5}, + {2, 0.5, 1}, + {2, 1, 0.5}, + {0.5, 1, 2}, + {0.5, 2, 1}, + } { + t.Run(fmt.Sprint(j), func(t *testing.T) { + agg := NewFloat64(NewConfig(WithMaxSize(2)), order...) + + // After three updates: scale set to -1, expect counts[0] == 2 (the + // (1/2 and 1), counts[1] == 1 (the 2). + + require.Equal(t, int32(-1), agg.Scale()) + require.Equal(t, int32(-1), agg.Positive().Offset()) + require.Equal(t, uint32(2), agg.Positive().Len()) + require.Equal(t, uint64(2), agg.Positive().At(0)) + require.Equal(t, uint64(1), agg.Positive().At(1)) + }) + } +} + +// Tests that every permutation of {1, 2, 4} with maxSize=2 results in +// the same scale=-1 histogram. +func TestScaleNegOnePositive(t *testing.T) { + for j, order := range [][]float64{ + {1, 2, 4}, + {1, 4, 2}, + {2, 4, 1}, + {2, 1, 4}, + {4, 1, 2}, + {4, 2, 1}, + } { + t.Run(fmt.Sprint(j), func(t *testing.T) { + agg := NewFloat64(NewConfig(WithMaxSize(2)), order...) + + // After three updates: scale set to -1, expect counts[0] == 1 (the + // 1), counts[1] == 2 (the 2 and 4). + require.Equal(t, int32(-1), agg.Scale()) + require.Equal(t, int32(-1), agg.Positive().Offset()) + require.Equal(t, uint32(2), agg.Positive().Len()) + require.Equal(t, uint64(1), agg.Positive().At(0)) + require.Equal(t, uint64(2), agg.Positive().At(1)) + }) + } +} + +// Tests that every permutation of {1, 1/2, 1/4} with maxSize=2 +// results in the same scale=-1 histogram. +func TestScaleNegOneNegative(t *testing.T) { + for j, order := range [][]float64{ + {1, 0.5, 0.25}, + {1, 0.25, 0.5}, + {0.5, 0.25, 1}, + {0.5, 1, 0.25}, + {0.25, 1, 0.5}, + {0.25, 0.5, 1}, + } { + t.Run(fmt.Sprint(j), func(t *testing.T) { + agg := NewFloat64(NewConfig(WithMaxSize(2)), order...) + + // After 3 updates: scale set to -1, expect counts[0] == 2 (the + // 1/4 and 1/2, counts[1] == 2 (the 1). + require.Equal(t, int32(-1), agg.Scale()) + require.Equal(t, int32(-2), agg.Positive().Offset()) + require.Equal(t, uint32(2), agg.Positive().Len()) + require.Equal(t, uint64(1), agg.Positive().At(0)) + require.Equal(t, uint64(2), agg.Positive().At(1)) + }) + } +} + +// Tests a variety of ascending sequences, calculated using known +// index ranges. For example, with maxSize=3, using scale=0 and +// offset -5, add a sequence of numbers. Because the numbers have +// known range, we know the expected scale. +func TestAscendingSequence(t *testing.T) { + for _, maxSize := range []int32{3, 4, 6, 9} { + t.Run(fmt.Sprintf("maxSize=%d", maxSize), func(t *testing.T) { + for offset := int32(-5); offset <= 5; offset++ { + for _, initScale := range []int32{ + 0, 4, + } { + testAscendingSequence(t, maxSize, offset, initScale) + } + } + }) + } +} + +func testAscendingSequence(t *testing.T, maxSize, offset, initScale int32) { + for step := maxSize; step < 4*maxSize; step++ { + agg := NewFloat64(NewConfig(WithMaxSize(maxSize))) + mapper, err := newMapping(initScale) + require.NoError(t, err) + + minVal := centerVal(mapper, offset) + maxVal := centerVal(mapper, offset+step) + sum := 0.0 + + for i := int32(0); i < maxSize; i++ { + value := centerVal(mapper, offset+i) + agg.Update(value) + sum += value + } + + require.Equal(t, initScale, agg.Scale()) + require.Equal(t, offset, agg.Positive().Offset()) + + agg.Update(maxVal) + sum += maxVal + + // The zeroth bucket is not empty. + require.NotEqual(t, uint64(0), agg.Positive().At(0)) + + // The maximum-index filled bucket is at or + // above the mid-point, (otherwise we + // downscaled too much). + maxFill := uint32(0) + totalCount := uint64(0) + + for i := uint32(0); i < agg.Positive().Len(); i++ { + totalCount += agg.Positive().At(i) + if agg.Positive().At(i) != 0 { + maxFill = i + } + } + require.GreaterOrEqual(t, maxFill, uint32(maxSize)/2) + + // Count is correct + require.GreaterOrEqual(t, uint64(maxSize+1), totalCount) + require.GreaterOrEqual(t, uint64(maxSize+1), agg.Count()) + // Sum is correct + require.GreaterOrEqual(t, sum, agg.Sum()) + + // The offset is correct at the computed scale. + mapper, err = newMapping(agg.Scale()) + require.NoError(t, err) + idx := mapper.MapToIndex(minVal) + require.Equal(t, int32(idx), agg.Positive().Offset()) + + // The maximum range is correct at the computed scale. + idx = mapper.MapToIndex(maxVal) + require.Equal(t, int32(idx), agg.Positive().Offset()+int32(agg.Positive().Len())-1) + } +} + +// Tests a simple case of merging [2, 4, 8, 16] with [1, 1/2, 1/4, 1/8]. +func TestMergeSimpleEven(t *testing.T) { + agg0 := NewFloat64(NewConfig(WithMaxSize(4))) + agg1 := NewFloat64(NewConfig(WithMaxSize(4))) + agg2 := NewFloat64(NewConfig(WithMaxSize(4))) + + for i := 0; i < 4; i++ { + f1 := float64(int64(2) << i) // 2, 4, 8, 16 + f2 := 1 / float64(int64(1)< value { + below++ + } + } + + // The sample results here not guaranteed. Test that this is approximately unbiased. + // (Results on dev machine: 1015 above, 1007 below, 24 equal, total = 2046.) + require.InEpsilon(t, 0.5, float64(above)/float64(total), 0.05) + require.InEpsilon(t, 0.5, float64(below)/float64(total), 0.06) + } +} diff --git a/sdk/metric/aggregator/exponential/structure/test.go b/sdk/metric/aggregator/exponential/structure/test.go new file mode 100644 index 00000000000..a4f7bfa9cc7 --- /dev/null +++ b/sdk/metric/aggregator/exponential/structure/test.go @@ -0,0 +1,36 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package structure // import "go.opentelemetry.io/otel/sdk/metric/aggregator/exponential/structure" + +// NewFloat64 is a test helper for constructing float64-valued histograms. +func NewFloat64(cfg Config, values ...float64) *Float64 { + return newHist[float64](cfg, values) +} + +// NewFloat64 is a test helper for constructing int64-valued histograms. +func NewInt64(cfg Config, values ...int64) *Int64 { + return newHist[int64](cfg, values) +} + +func newHist[N ValueType](cfg Config, values []N) *Histogram[N] { + state := &Histogram[N]{} + + state.Init(cfg) + + for _, val := range values { + state.Update(val) + } + return state +} diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index 480390821fd..db94923327d 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -1,6 +1,6 @@ module go.opentelemetry.io/otel/sdk/metric -go 1.17 +go 1.18 replace go.opentelemetry.io/otel => ../..