Skip to content

Commit

Permalink
Merge pull request #1225 from prometheus/beorn7/api
Browse files Browse the repository at this point in the history
api: Extend and improve json-iterator usage
  • Loading branch information
bwplotka committed Mar 1, 2023
2 parents 66687e5 + 2236d78 commit ffbbe80
Show file tree
Hide file tree
Showing 5 changed files with 555 additions and 66 deletions.
284 changes: 263 additions & 21 deletions api/prometheus/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,15 @@ import (
)

func init() {
json.RegisterTypeEncoderFunc("model.SamplePair", marshalPointJSON, marshalPointJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unMarshalPointJSON)
json.RegisterTypeEncoderFunc("model.SamplePair", marshalSamplePairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SamplePair", unmarshalSamplePairJSON)
json.RegisterTypeEncoderFunc("model.SampleHistogramPair", marshalSampleHistogramPairJSON, marshalJSONIsEmpty)
json.RegisterTypeDecoderFunc("model.SampleHistogramPair", unmarshalSampleHistogramPairJSON)
json.RegisterTypeEncoderFunc("model.SampleStream", marshalSampleStreamJSON, marshalJSONIsEmpty) // Only needed for benchmark.
json.RegisterTypeDecoderFunc("model.SampleStream", unmarshalSampleStreamJSON) // Only needed for benchmark.
}

func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
func unmarshalSamplePairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SamplePair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SamplePair", "SamplePair must be [timestamp, value]")
Expand Down Expand Up @@ -68,12 +72,165 @@ func unMarshalPointJSON(ptr unsafe.Pointer, iter *json.Iterator) {
}
}

func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
func marshalSamplePairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SamplePair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalFloat(float64(p.Value), stream)
stream.WriteArrayEnd()
}

func unmarshalSampleHistogramPairJSON(ptr unsafe.Pointer, iter *json.Iterator) {
p := (*model.SampleHistogramPair)(ptr)
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair must be [timestamp, {histogram}]")
return
}
t := iter.ReadNumber()
if err := p.Timestamp.UnmarshalJSON([]byte(t)); err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", err.Error())
return
}
if !iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SamplePair missing histogram")
return
}
h := &model.SampleHistogram{}
p.Histogram = h
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "count":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "count of histogram is not a float")
return
}
h.Count = model.FloatString(f)
case "sum":
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
iter.ReportError("unmarshal model.SampleHistogramPair", "sum of histogram is not a float")
return
}
h.Sum = model.FloatString(f)
case "buckets":
for iter.ReadArray() {
b, err := unmarshalHistogramBucket(iter)
if err != nil {
iter.ReportError("unmarshal model.HistogramBucket", err.Error())
return
}
h.Buckets = append(h.Buckets, b)
}
default:
iter.ReportError("unmarshal model.SampleHistogramPair", fmt.Sprint("unexpected key in histogram:", key))
return
}
}
if iter.ReadArray() {
iter.ReportError("unmarshal model.SampleHistogramPair", "SampleHistogramPair has too many values, must be [timestamp, {histogram}]")
return
}
}

func marshalSampleHistogramPairJSON(ptr unsafe.Pointer, stream *json.Stream) {
p := *((*model.SampleHistogramPair)(ptr))
stream.WriteArrayStart()
marshalTimestamp(p.Timestamp, stream)
stream.WriteMore()
marshalHistogram(*p.Histogram, stream)
stream.WriteArrayEnd()
}

func unmarshalSampleStreamJSON(ptr unsafe.Pointer, iter *json.Iterator) {
ss := (*model.SampleStream)(ptr)
for key := iter.ReadObject(); key != ""; key = iter.ReadObject() {
switch key {
case "metric":
metricString := iter.ReadAny().ToString()
if err := json.UnmarshalFromString(metricString, &ss.Metric); err != nil {
iter.ReportError("unmarshal model.SampleStream", err.Error())
return
}
case "values":
for iter.ReadArray() {
v := model.SamplePair{}
unmarshalSamplePairJSON(unsafe.Pointer(&v), iter)
ss.Values = append(ss.Values, v)
}
case "histograms":
for iter.ReadArray() {
h := model.SampleHistogramPair{}
unmarshalSampleHistogramPairJSON(unsafe.Pointer(&h), iter)
ss.Histograms = append(ss.Histograms, h)
}
default:
iter.ReportError("unmarshal model.SampleStream", fmt.Sprint("unexpected key:", key))
return
}
}
}

func marshalSampleStreamJSON(ptr unsafe.Pointer, stream *json.Stream) {
ss := *((*model.SampleStream)(ptr))
stream.WriteObjectStart()
stream.WriteObjectField(`metric`)
m, err := json.ConfigCompatibleWithStandardLibrary.Marshal(ss.Metric)
if err != nil {
stream.Error = err
return
}
stream.SetBuffer(append(stream.Buffer(), m...))
if len(ss.Values) > 0 {
stream.WriteMore()
stream.WriteObjectField(`values`)
stream.WriteArrayStart()
for i, v := range ss.Values {
if i > 0 {
stream.WriteMore()
}
marshalSamplePairJSON(unsafe.Pointer(&v), stream)
}
stream.WriteArrayEnd()
}
if len(ss.Histograms) > 0 {
stream.WriteMore()
stream.WriteObjectField(`histograms`)
stream.WriteArrayStart()
for i, h := range ss.Histograms {
if i > 0 {
stream.WriteMore()
}
marshalSampleHistogramPairJSON(unsafe.Pointer(&h), stream)
}
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalFloat(v float64, stream *json.Stream) {
stream.WriteRaw(`"`)
// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (json-iterator, to follow json standard, doesn't allow inf/nan).
buf := stream.Buffer()
abs := math.Abs(v)
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
}
buf = strconv.AppendFloat(buf, v, fmt, -1, 64)
stream.SetBuffer(buf)
stream.WriteRaw(`"`)
}

func marshalTimestamp(timestamp model.Time, stream *json.Stream) {
t := int64(timestamp)
// Write out the timestamp as a float divided by 1000.
// This is ~3x faster than converting to a float.
t := int64(p.Timestamp)
if t < 0 {
stream.WriteRaw(`-`)
t = -t
Expand All @@ -90,28 +247,113 @@ func marshalPointJSON(ptr unsafe.Pointer, stream *json.Stream) {
}
stream.WriteInt64(fraction)
}
stream.WriteMore()
stream.WriteRaw(`"`)
}

// Taken from https://github.com/json-iterator/go/blob/master/stream_float.go#L71 as a workaround
// to https://github.com/json-iterator/go/issues/365 (jsoniter, to follow json standard, doesn't allow inf/nan)
buf := stream.Buffer()
abs := math.Abs(float64(p.Value))
fmt := byte('f')
// Note: Must use float32 comparisons for underlying float32 value to get precise cutoffs right.
if abs != 0 {
if abs < 1e-6 || abs >= 1e21 {
fmt = 'e'
}
func unmarshalHistogramBucket(iter *json.Iterator) (*model.HistogramBucket, error) {
b := model.HistogramBucket{}
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
buf = strconv.AppendFloat(buf, float64(p.Value), fmt, -1, 64)
stream.SetBuffer(buf)
boundaries, err := iter.ReadNumber().Int64()
if err != nil {
return nil, err
}
b.Boundaries = int32(boundaries)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Lower = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Upper = model.FloatString(f)
if !iter.ReadArray() {
return nil, errors.New("HistogramBucket must be [boundaries, lower, upper, count]")
}
f, err = strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
return nil, err
}
b.Count = model.FloatString(f)
if iter.ReadArray() {
return nil, errors.New("HistogramBucket has too many values, must be [boundaries, lower, upper, count]")
}
return &b, nil
}

stream.WriteRaw(`"`)
// marshalHistogramBucket writes something like: [ 3, "-0.25", "0.25", "3"]
// See marshalHistogram to understand what the numbers mean
func marshalHistogramBucket(b model.HistogramBucket, stream *json.Stream) {
stream.WriteArrayStart()
stream.WriteInt32(b.Boundaries)
stream.WriteMore()
marshalFloat(float64(b.Lower), stream)
stream.WriteMore()
marshalFloat(float64(b.Upper), stream)
stream.WriteMore()
marshalFloat(float64(b.Count), stream)
stream.WriteArrayEnd()
}

func marshalPointJSONIsEmpty(ptr unsafe.Pointer) bool {
// marshalHistogram writes something like:
//
// {
// "count": "42",
// "sum": "34593.34",
// "buckets": [
// [ 3, "-0.25", "0.25", "3"],
// [ 0, "0.25", "0.5", "12"],
// [ 0, "0.5", "1", "21"],
// [ 0, "2", "4", "6"]
// ]
// }
//
// The 1st element in each bucket array determines if the boundaries are
// inclusive (AKA closed) or exclusive (AKA open):
//
// 0: lower exclusive, upper inclusive
// 1: lower inclusive, upper exclusive
// 2: both exclusive
// 3: both inclusive
//
// The 2nd and 3rd elements are the lower and upper boundary. The 4th element is
// the bucket count.
func marshalHistogram(h model.SampleHistogram, stream *json.Stream) {
stream.WriteObjectStart()
stream.WriteObjectField(`count`)
marshalFloat(float64(h.Count), stream)
stream.WriteMore()
stream.WriteObjectField(`sum`)
marshalFloat(float64(h.Sum), stream)

bucketFound := false
for _, bucket := range h.Buckets {
if bucket.Count == 0 {
continue // No need to expose empty buckets in JSON.
}
stream.WriteMore()
if !bucketFound {
stream.WriteObjectField(`buckets`)
stream.WriteArrayStart()
}
bucketFound = true
marshalHistogramBucket(*bucket, stream)
}
if bucketFound {
stream.WriteArrayEnd()
}
stream.WriteObjectEnd()
}

func marshalJSONIsEmpty(ptr unsafe.Pointer) bool {
return false
}

Expand Down
Loading

0 comments on commit ffbbe80

Please sign in to comment.