Skip to content

Commit

Permalink
api: Extend and improve json-iterator usage
Browse files Browse the repository at this point in the history
For one, this pulls up the histogram-related json-iterator usage from
prometheus/common into the API client. Previously, the only
json-iterater usage was here in the API client. But then json-iterator
was used for the native histogram additions directly in
prometheus/common, see
https://github.com/prometheus/common/pull/440/files . This however
meant that any user of prometheus/common/model would now link in
json-iterator, even if they are not using the JSON marshaling at
all. To keep prometheus/common/model more leightweight, this commit
moves all the json-iterator usage into the API client itself, as it
was done before for the normal float samples.

This commit also adds an unmarshaling function for native histograms,
which didn't even exist in prometheus/common/model so far.

It also adds json-iterator marshaling and un-marshaling for
model.SampleStream, which is only needed for the benchmark
(BenchmarkSamplesJsonSerialization). This fixes the benchmark such
that it actually compares json-iterator and std-lib json encoding
(which didn't work before because the custom marshaling methods of
model.SampleStream enforced std-lib json encoding for floats and
json-iterator encoding for histograms in all cases).

I expect this to fix #1179.

Signed-off-by: beorn7 <beorn@grafana.com>
  • Loading branch information
beorn7 committed Feb 23, 2023
1 parent 2771bcc commit 0d40e5c
Show file tree
Hide file tree
Showing 5 changed files with 468 additions and 49 deletions.
284 changes: 263 additions & 21 deletions api/prometheus/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ import (
)

func init() {
json.RegisterTypeEncoderFunc("model.SamplePair", marshalPointJSON, marshalPointJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unMarshalPointJSON)
json.RegisterTypeEncoderFunc("model.SamplePair", marshalSamplePairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unmarshalSamplePairJSON)
json.RegisterTypeEncoderFunc("model.SampleHistogramPair", marshalSampleHistogramPairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SampleHistogramPair", unmarshalSampleHistogramPairJSON)
json.RegisterTypeEncoderFunc("model.SampleStream", marshalSampleStreamJSON, marshalJSONIsEmpty) // Only needed for benchmark.
json.RegisterTypeDecoderFunc("model.SampleStream", unmarshalSampleStreamJSON) // Only needed for benchmark.
}

func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
func unmarshalSamplePairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SamplePair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SamplePair", "SamplePair must be [timestamp, value]")
Expand Down Expand Up @@ -68,12 +72,165 @@ func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
}
}

func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
func marshalSamplePairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SamplePair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalFloat(float64(p.Value), stream)
stream.WriteArrayEnd()
}

func unmarshalSampleHistogramPairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SampleHistogramPair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair must be [timestamp, {histogram}]")
return
}
t := iter.ReadNumber()
if err := p.Timestamp.UnmarshalJSON([]byte(t)); err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", err.Error())
return
}
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SamplePair missing histogram")
return
}
h := &model.SampleHistogram{}
p.Histogram = h
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "count":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "count of histogram is not a float")
return
}
h.Count = model.FloatString(f)
case "sum":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "sum of histogram is not a float")
return
}
h.Sum = model.FloatString(f)
case "buckets":
for iter.ReadArray() {
b, err := unmarshalHistogramBucket(iter)
if err != nil {
iter.ReportError("unmarshal model.HistogramBucket", err.Error())
return
}
h.Buckets = append(h.Buckets, b)
}
default:
iter.ReportError("unmarshal model.SampleHistogramPair", fmt.Sprint("unexpected key in histogram:", key))
return
}
}
if iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair has too many values, must be [timestamp, {histogram}]")
return
}
}

func marshalSampleHistogramPairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SampleHistogramPair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalHistogram(*p.Histogram, stream)
stream.WriteArrayEnd()
}

func unmarshalSampleStreamJSON(ptr unsafe.Pointer, iter *json.Iterator) {
ss := (*model.SampleStream)(ptr)
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "metric":
metricString := iter.ReadAny().ToString()
if err := json.UnmarshalFromString(metricString, &ss.Metric); err != nil {
iter.ReportError("unmarshal model.SampleStream", err.Error())
return
}
case "values":
for iter.ReadArray() {
v := model.SamplePair{}
unmarshalSamplePairJSON(unsafe.Pointer(&v), iter)
ss.Values = append(ss.Values, v)
}
case "histograms":
for iter.ReadArray() {
h := model.SampleHistogramPair{}
unmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, h)
}
default:
iter.ReportError("unmarshal model.SampleStream", fmt.Sprint("unexpected key:", key))
return
}
}
}

func marshalSampleStreamJSON(ptr unsafe.Pointer, stream *json.Stream) {
ss := *((*model.SampleStream)(ptr))
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
m, err := json.Marshal(ss.Metric)
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), m...))
if len(ss.Values) > 0 {
stream.WriteMore()
stream.WriteObjectField(`values`)
stream.WriteArrayStart()
for i, v := range ss.Values {
if i < 0 {
stream.WriteMore()
}
marshalSamplePairJSON(unsafe.Pointer(&v), stream)
}
stream.WriteArrayEnd()
}
if len(ss.Histograms) > 0 {
stream.WriteMore()
stream.WriteObjectField(`histograms`)
stream.WriteArrayStart()
for i, h := range ss.Histograms {
if i < 0 {
stream.WriteMore()
}
marshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
}
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalFloat(v float64, stream *json.Stream) {
stream.WriteRaw(`"`)
// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (json-iterator, to follow json standard, doesn't allow inf/nan).
buf := stream.Buffer()
abs := math.Abs(v)
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
}
buf = strconv.AppendFloat(buf, v, fmt, -1, 64)
stream.SetBuffer(buf)
stream.WriteRaw(`"`)
}

func marshalTimestamp(timestamp model.Time, stream *json.Stream) {
t := int64(timestamp)
// Write out the timestamp as a float divided by 1000.
// This is ~3x faster than converting to a float.
t := int64(p.Timestamp)
if t < 0 {
stream.WriteRaw(`-`)
t = -t
Expand All @@ -90,28 +247,113 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
}
stream.WriteInt64(fraction)
}
stream.WriteMore()
stream.WriteRaw(`"`)
}

// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (jsoniter, to follow json standard, doesn't allow inf/nan)
buf := stream.Buffer()
abs := math.Abs(float64(p.Value))
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
func unmarshalHistogramBucket(iter *json.Iterator) (*model.HistogramBucket, error) {
b := model.HistogramBucket{}
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
buf = strconv.AppendFloat(buf, float64(p.Value), fmt, -1, 64)
stream.SetBuffer(buf)
boundaries, err := iter.ReadNumber().Int64()
if err != nil {
return nil, err
}
b.Boundaries = int32(boundaries)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Lower = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Upper = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Count = model.FloatString(f)
if iter.ReadArray() {
return nil, errors.New("HistogramBucket has too many values, must be [boundaries, lower, upper, count]")
}
return &b, nil
}

stream.WriteRaw(`"`)
// marshalHistogramBucket writes something like: [ 3, "-0.25", "0.25", "3"]
// See marshalHistogram to understand what the numbers mean
func marshalHistogramBucket(b model.HistogramBucket, stream *json.Stream) {
stream.WriteArrayStart()
stream.WriteInt32(b.Boundaries)
stream.WriteMore()
marshalFloat(float64(b.Lower), stream)
stream.WriteMore()
marshalFloat(float64(b.Upper), stream)
stream.WriteMore()
marshalFloat(float64(b.Count), stream)
stream.WriteArrayEnd()
}

func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
// marshalHistogram writes something like:
//
// {
// "count": "42",
// "sum": "34593.34",
// "buckets": [
// [ 3, "-0.25", "0.25", "3"],
// [ 0, "0.25", "0.5", "12"],
// [ 0, "0.5", "1", "21"],
// [ 0, "2", "4", "6"]
// ]
// }
//
// The 1st element in each bucket array determines if the boundaries are
// inclusive (AKA closed) or exclusive (AKA open):
//
// 0: lower exclusive, upper inclusive
// 1: lower inclusive, upper exclusive
// 2: both exclusive
// 3: both inclusive
//
// The 2nd and 3rd elements are the lower and upper boundary. The 4th element is
// the bucket count.
func marshalHistogram(h model.SampleHistogram, stream *json.Stream) {
stream.WriteObjectStart()
stream.WriteObjectField(`count`)
marshalFloat(float64(h.Count), stream)
stream.WriteMore()
stream.WriteObjectField(`sum`)
marshalFloat(float64(h.Sum), stream)

bucketFound := false
for _, bucket := range h.Buckets {
if bucket.Count == 0 {
continue // No need to expose empty buckets in JSON.
}
stream.WriteMore()
if !bucketFound {
stream.WriteObjectField(`buckets`)
stream.WriteArrayStart()
}
bucketFound = true
marshalHistogramBucket(*bucket, stream)
}
if bucketFound {
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}

Expand Down
Loading

0 comments on commit 0d40e5c

Please sign in to comment.