Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: Extend and improve json-iterator usage #1225

Merged
merged 1 commit into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 263 additions & 21 deletions api/prometheus/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ import (
)

func init() {
json.RegisterTypeEncoderFunc("model.SamplePair", marshalPointJSON, marshalPointJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unMarshalPointJSON)
json.RegisterTypeEncoderFunc("model.SamplePair", marshalSamplePairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unmarshalSamplePairJSON)
json.RegisterTypeEncoderFunc("model.SampleHistogramPair", marshalSampleHistogramPairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SampleHistogramPair", unmarshalSampleHistogramPairJSON)
json.RegisterTypeEncoderFunc("model.SampleStream", marshalSampleStreamJSON, marshalJSONIsEmpty) // Only needed for benchmark.
json.RegisterTypeDecoderFunc("model.SampleStream", unmarshalSampleStreamJSON) // Only needed for benchmark.
}

func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
func unmarshalSamplePairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SamplePair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SamplePair", "SamplePair must be [timestamp, value]")
Expand Down Expand Up @@ -68,12 +72,165 @@ func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
}
}

func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
func marshalSamplePairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SamplePair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalFloat(float64(p.Value), stream)
stream.WriteArrayEnd()
}

func unmarshalSampleHistogramPairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SampleHistogramPair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair must be [timestamp, {histogram}]")
return
}
t := iter.ReadNumber()
if err := p.Timestamp.UnmarshalJSON([]byte(t)); err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", err.Error())
return
}
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SamplePair missing histogram")
return
}
h := &model.SampleHistogram{}
p.Histogram = h
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "count":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "count of histogram is not a float")
return
}
h.Count = model.FloatString(f)
case "sum":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "sum of histogram is not a float")
return
}
h.Sum = model.FloatString(f)
case "buckets":
for iter.ReadArray() {
b, err := unmarshalHistogramBucket(iter)
if err != nil {
iter.ReportError("unmarshal model.HistogramBucket", err.Error())
return
}
h.Buckets = append(h.Buckets, b)
}
default:
iter.ReportError("unmarshal model.SampleHistogramPair", fmt.Sprint("unexpected key in histogram:", key))
return
}
}
if iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair has too many values, must be [timestamp, {histogram}]")
return
}
}

func marshalSampleHistogramPairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SampleHistogramPair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalHistogram(*p.Histogram, stream)
stream.WriteArrayEnd()
}

func unmarshalSampleStreamJSON(ptr unsafe.Pointer, iter *json.Iterator) {
ss := (*model.SampleStream)(ptr)
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "metric":
metricString := iter.ReadAny().ToString()
if err := json.UnmarshalFromString(metricString, &ss.Metric); err != nil {
iter.ReportError("unmarshal model.SampleStream", err.Error())
return
}
case "values":
for iter.ReadArray() {
v := model.SamplePair{}
unmarshalSamplePairJSON(unsafe.Pointer(&v), iter)
ss.Values = append(ss.Values, v)
}
case "histograms":
for iter.ReadArray() {
h := model.SampleHistogramPair{}
unmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, h)
}
default:
iter.ReportError("unmarshal model.SampleStream", fmt.Sprint("unexpected key:", key))
return
}
}
}

func marshalSampleStreamJSON(ptr unsafe.Pointer, stream *json.Stream) {
ss := *((*model.SampleStream)(ptr))
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
m, err := json.ConfigCompatibleWithStandardLibrary.Marshal(ss.Metric)
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), m...))
if len(ss.Values) > 0 {
stream.WriteMore()
stream.WriteObjectField(`values`)
stream.WriteArrayStart()
for i, v := range ss.Values {
if i > 0 {
stream.WriteMore()
}
marshalSamplePairJSON(unsafe.Pointer(&v), stream)
}
stream.WriteArrayEnd()
}
if len(ss.Histograms) > 0 {
stream.WriteMore()
stream.WriteObjectField(`histograms`)
stream.WriteArrayStart()
for i, h := range ss.Histograms {
if i > 0 {
stream.WriteMore()
}
marshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
}
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalFloat(v float64, stream *json.Stream) {
stream.WriteRaw(`"`)
// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (json-iterator, to follow json standard, doesn't allow inf/nan).
buf := stream.Buffer()
abs := math.Abs(v)
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
}
buf = strconv.AppendFloat(buf, v, fmt, -1, 64)
stream.SetBuffer(buf)
stream.WriteRaw(`"`)
}

func marshalTimestamp(timestamp model.Time, stream *json.Stream) {
t := int64(timestamp)
// Write out the timestamp as a float divided by 1000.
// This is ~3x faster than converting to a float.
t := int64(p.Timestamp)
if t < 0 {
stream.WriteRaw(`-`)
t = -t
Expand All @@ -90,28 +247,113 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
}
stream.WriteInt64(fraction)
}
stream.WriteMore()
stream.WriteRaw(`"`)
}

// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (jsoniter, to follow json standard, doesn't allow inf/nan)
buf := stream.Buffer()
abs := math.Abs(float64(p.Value))
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
func unmarshalHistogramBucket(iter *json.Iterator) (*model.HistogramBucket, error) {
b := model.HistogramBucket{}
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
buf = strconv.AppendFloat(buf, float64(p.Value), fmt, -1, 64)
stream.SetBuffer(buf)
boundaries, err := iter.ReadNumber().Int64()
if err != nil {
return nil, err
}
b.Boundaries = int32(boundaries)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Lower = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Upper = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Count = model.FloatString(f)
if iter.ReadArray() {
return nil, errors.New("HistogramBucket has too many values, must be [boundaries, lower, upper, count]")
}
return &b, nil
}

stream.WriteRaw(`"`)
// marshalHistogramBucket writes something like: [ 3, "-0.25", "0.25", "3"]
// See marshalHistogram to understand what the numbers mean
func marshalHistogramBucket(b model.HistogramBucket, stream *json.Stream) {
stream.WriteArrayStart()
stream.WriteInt32(b.Boundaries)
stream.WriteMore()
marshalFloat(float64(b.Lower), stream)
stream.WriteMore()
marshalFloat(float64(b.Upper), stream)
stream.WriteMore()
marshalFloat(float64(b.Count), stream)
stream.WriteArrayEnd()
}

func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
// marshalHistogram writes something like:
//
// {
// "count": "42",
// "sum": "34593.34",
// "buckets": [
// [ 3, "-0.25", "0.25", "3"],
// [ 0, "0.25", "0.5", "12"],
// [ 0, "0.5", "1", "21"],
// [ 0, "2", "4", "6"]
// ]
// }
//
// The 1st element in each bucket array determines if the boundaries are
// inclusive (AKA closed) or exclusive (AKA open):
//
// 0: lower exclusive, upper inclusive
// 1: lower inclusive, upper exclusive
// 2: both exclusive
// 3: both inclusive
//
// The 2nd and 3rd elements are the lower and upper boundary. The 4th element is
// the bucket count.
func marshalHistogram(h model.SampleHistogram, stream *json.Stream) {
stream.WriteObjectStart()
stream.WriteObjectField(`count`)
marshalFloat(float64(h.Count), stream)
stream.WriteMore()
stream.WriteObjectField(`sum`)
marshalFloat(float64(h.Sum), stream)

bucketFound := false
for _, bucket := range h.Buckets {
if bucket.Count == 0 {
continue // No need to expose empty buckets in JSON.
}
stream.WriteMore()
if !bucketFound {
stream.WriteObjectField(`buckets`)
stream.WriteArrayStart()
}
bucketFound = true
marshalHistogramBucket(*bucket, stream)
}
if bucketFound {
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}

Expand Down
Loading