From d83f7b10305940d78e79129b6a0369ff4b5fe8fd Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 19 Jun 2024 12:48:28 -0700 Subject: [PATCH] Support ingesting native histograms in Ruler appender (#6029) * support ingesting native histogram samples in Ruler pusher Signed-off-by: Ben Ye * handle evaluation delay for native histograms Signed-off-by: Ben Ye * fix unit test Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- pkg/cortexpb/compat.go | 9 + pkg/cortexpb/timeseries.go | 16 +- pkg/ruler/compat.go | 35 +++- pkg/ruler/compat_test.go | 398 +++++++++++++++++++++++++++++++++---- pkg/util/http_test.go | 3 +- 5 files changed, 405 insertions(+), 56 deletions(-) diff --git a/pkg/cortexpb/compat.go b/pkg/cortexpb/compat.go index 07bd89f716..6de2423d56 100644 --- a/pkg/cortexpb/compat.go +++ b/pkg/cortexpb/compat.go @@ -44,6 +44,15 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe return req } +func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms []Histogram) { + for i := 0; i < len(lbls); i++ { + ts := TimeseriesFromPool() + ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...) + ts.Histograms = append(ts.Histograms, histograms[i]) + w.Timeseries = append(w.Timeseries, PreallocTimeseries{TimeSeries: ts}) + } +} + // FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels. // It uses unsafe, but as LabelAdapter == labels.Label this should be safe. // This allows us to use labels.Labels directly in protos. diff --git a/pkg/cortexpb/timeseries.go b/pkg/cortexpb/timeseries.go index da1eff65df..db7354ffe4 100644 --- a/pkg/cortexpb/timeseries.go +++ b/pkg/cortexpb/timeseries.go @@ -12,10 +12,11 @@ import ( ) var ( - expectedTimeseries = 100 - expectedLabels = 20 - expectedSamplesPerSeries = 10 - expectedExemplarsPerSeries = 1 + expectedTimeseries = 100 + expectedLabels = 20 + expectedSamplesPerSeries = 10 + expectedExemplarsPerSeries = 1 + expectedHistogramsPerSeries = 1 /* We cannot pool these as pointer-to-slice because the place we use them is in WriteRequest which is generated from Protobuf @@ -31,9 +32,10 @@ var ( timeSeriesPool = sync.Pool{ New: func() interface{} { return &TimeSeries{ - Labels: make([]LabelAdapter, 0, expectedLabels), - Samples: make([]Sample, 0, expectedSamplesPerSeries), - Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries), + Labels: make([]LabelAdapter, 0, expectedLabels), + Samples: make([]Sample, 0, expectedSamplesPerSeries), + Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries), + Histograms: make([]Histogram, 0, expectedHistogramsPerSeries), } }, } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 4a02540ef6..984b251864 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -43,12 +43,34 @@ type PusherAppender struct { pusher Pusher labels []labels.Labels samples []cortexpb.Sample + histogramLabels []labels.Labels + histograms []cortexpb.Histogram userID string evaluationDelay time.Duration } -func (a *PusherAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) { - return 0, errors.New("querying native histograms is not supported") +func (a *PusherAppender) AppendHistogram(_ storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) { + if h == nil && fh == nil { + return 0, errors.New("no histogram") + } + + if h != nil { + // A histogram sample is considered stale if its sum is set to NaN. + // https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346 + if a.evaluationDelay > 0 && (value.IsStaleNaN(h.Sum)) { + t -= a.evaluationDelay.Milliseconds() + } + a.histograms = append(a.histograms, cortexpb.HistogramToHistogramProto(t, h)) + } else { + // A histogram sample is considered stale if its sum is set to NaN. + // https://github.com/prometheus/prometheus/blob/b6ef745016fa9472fdd0ae20f75a9682e01d1e5c/tsdb/head_append.go#L339-L346 + if a.evaluationDelay > 0 && (value.IsStaleNaN(fh.Sum)) { + t -= a.evaluationDelay.Milliseconds() + } + a.histograms = append(a.histograms, cortexpb.FloatHistogramToHistogramProto(t, fh)) + } + a.histogramLabels = append(a.histogramLabels, l) + return 0, nil } func (a *PusherAppender) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) { @@ -85,10 +107,11 @@ func (a *PusherAppender) AppendExemplar(_ storage.SeriesRef, _ labels.Labels, _ func (a *PusherAppender) Commit() error { a.totalWrites.Inc() + req := cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE) + req.AddHistogramTimeSeries(a.histogramLabels, a.histograms) // Since a.pusher is distributor, client.ReuseSlice will be called in a.pusher.Push. // We shouldn't call client.ReuseSlice here. - _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), cortexpb.ToWriteRequest(a.labels, a.samples, nil, nil, cortexpb.RULE)) - + _, err := a.pusher.Push(user.InjectOrgID(a.ctx, a.userID), req) if err != nil { // Don't report errors that ended with 4xx HTTP status code (series limits, duplicate samples, out of order, etc.) if resp, ok := httpgrpc.HTTPResponseFromError(err); !ok || resp.Code/100 != 4 { @@ -98,6 +121,8 @@ func (a *PusherAppender) Commit() error { a.labels = nil a.samples = nil + a.histogramLabels = nil + a.histograms = nil return err } @@ -108,6 +133,8 @@ func (a *PusherAppender) UpdateMetadata(_ storage.SeriesRef, _ labels.Labels, _ func (a *PusherAppender) Rollback() error { a.labels = nil a.samples = nil + a.histogramLabels = nil + a.histograms = nil return nil } diff --git a/pkg/ruler/compat_test.go b/pkg/ruler/compat_test.go index 71db759d64..8648ddd5f3 100644 --- a/pkg/ruler/compat_test.go +++ b/pkg/ruler/compat_test.go @@ -13,6 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" @@ -20,6 +22,7 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/cortexpb" + histogram_util "github.com/cortexproject/cortex/pkg/util/histogram" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -38,63 +41,357 @@ func TestPusherAppendable(t *testing.T) { pusher := &fakePusher{} pa := NewPusherAppendable(pusher, "user-1", nil, prometheus.NewCounter(prometheus.CounterOpts{}), prometheus.NewCounter(prometheus.CounterOpts{})) + lbls1 := cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{labels.MetricName: "foo_bar"})) + lbls2 := cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{labels.MetricName: "ALERTS", labels.AlertName: "boop"})) + lbls3 := cortexpb.FromLabelsToLabelAdapters(labels.FromMap(map[string]string{labels.MetricName: "ALERTS_FOR_STATE", labels.AlertName: "boop"})) + + testHistogram := histogram_util.GenerateTestHistogram(1) + testFloatHistogram := histogram_util.GenerateTestFloatHistogram(2) + testHistogramWithNaN := histogram_util.GenerateTestHistogram(1) + testFloatHistogramWithNaN := histogram_util.GenerateTestFloatHistogram(1) + testHistogramWithNaN.Sum = math.Float64frombits(value.StaleNaN) + testFloatHistogramWithNaN.Sum = math.Float64frombits(value.StaleNaN) + for _, tc := range []struct { - name string - series string - evalDelay time.Duration - value float64 - expectedTS int64 + name string + series string + evalDelay time.Duration + value float64 + histogram *histogram.Histogram + floatHistogram *histogram.FloatHistogram + expectedReq *cortexpb.WriteRequest }{ { - name: "tenant without delay, normal value", - series: "foo_bar", - value: 1.234, - expectedTS: 120_000, + name: "tenant without delay, normal value", + series: "foo_bar", + value: 1.234, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 120_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "tenant without delay, stale nan value", + series: "foo_bar", + value: math.Float64frombits(value.StaleNaN), + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 120_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "tenant with delay, normal value", + series: "foo_bar", + value: 1.234, + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 120_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "tenant with delay, stale nan value", + series: "foo_bar", + value: math.Float64frombits(value.StaleNaN), + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 60_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "ALERTS without delay, normal value", + series: `ALERTS{alertname="boop"}`, + value: 1.234, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls2, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 120_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "ALERTS without delay, stale nan value", + series: `ALERTS{alertname="boop"}`, + value: math.Float64frombits(value.StaleNaN), + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls2, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 120_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "ALERTS with delay, normal value", + series: `ALERTS{alertname="boop"}`, + value: 1.234, + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls2, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 60_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "ALERTS with delay, stale nan value", + series: `ALERTS_FOR_STATE{alertname="boop"}`, + value: math.Float64frombits(value.StaleNaN), + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls3, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 60_000}, + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "tenant without delay, stale nan value", - series: "foo_bar", - value: math.Float64frombits(value.StaleNaN), - expectedTS: 120_000, + name: "tenant without delay, normal histogram", + series: "foo_bar", + histogram: testHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(120_000, testHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "tenant with delay, normal value", - series: "foo_bar", - value: 1.234, - expectedTS: 120_000, - evalDelay: time.Minute, + name: "tenant without delay, float histogram", + series: "foo_bar", + floatHistogram: testFloatHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.FloatHistogramToHistogramProto(120_000, testFloatHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "tenant with delay, stale nan value", - value: math.Float64frombits(value.StaleNaN), - expectedTS: 60_000, - evalDelay: time.Minute, + name: "tenant without delay, both sample and histogram", + series: "foo_bar", + value: 1.234, + histogram: testHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 120_000}, + }, + }, + }, + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(120_000, testHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "ALERTS without delay, normal value", - series: `ALERTS{alertname="boop"}`, - value: 1.234, - expectedTS: 120_000, + name: "tenant without delay, both sample and float histogram", + series: "foo_bar", + value: 1.234, + floatHistogram: testFloatHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: 1.234, TimestampMs: 120_000}, + }, + }, + }, + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.FloatHistogramToHistogramProto(120_000, testFloatHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "ALERTS without delay, stale nan value", - series: `ALERTS{alertname="boop"}`, - value: math.Float64frombits(value.StaleNaN), - expectedTS: 120_000, + name: "tenant with delay and NaN sample, normal histogram", + series: "foo_bar", + value: math.Float64frombits(value.StaleNaN), + evalDelay: time.Minute, + histogram: testHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 60_000}, + }, + }, + }, + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(120_000, testHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "ALERTS with delay, normal value", - series: `ALERTS{alertname="boop"}`, - value: 1.234, - expectedTS: 60_000, - evalDelay: time.Minute, + name: "tenant with delay and NaN sample, float histogram", + series: "foo_bar", + value: math.Float64frombits(value.StaleNaN), + evalDelay: time.Minute, + floatHistogram: testFloatHistogram, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Samples: []cortexpb.Sample{ + {Value: math.Float64frombits(value.StaleNaN), TimestampMs: 60_000}, + }, + }, + }, + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.FloatHistogramToHistogramProto(120_000, testFloatHistogram), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, { - name: "ALERTS with delay, stale nan value", - series: `ALERTS_FOR_STATE{alertname="boop"}`, - value: math.Float64frombits(value.StaleNaN), - expectedTS: 60_000, - evalDelay: time.Minute, + name: "tenant with delay, NaN histogram", + series: "foo_bar", + histogram: testHistogramWithNaN, + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.HistogramToHistogramProto(60_000, testHistogramWithNaN), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, + }, + { + name: "tenant with delay, NaN float histogram", + series: "foo_bar", + floatHistogram: testFloatHistogramWithNaN, + evalDelay: time.Minute, + expectedReq: &cortexpb.WriteRequest{ + Timeseries: []cortexpb.PreallocTimeseries{ + { + TimeSeries: &cortexpb.TimeSeries{ + Labels: lbls1, + Histograms: []cortexpb.Histogram{ + cortexpb.FloatHistogramToHistogramProto(60_000, testFloatHistogramWithNaN), + }, + }, + }, + }, + Source: cortexpb.RULE, + }, }, } { t.Run(tc.name, func(t *testing.T) { @@ -108,13 +405,21 @@ func TestPusherAppendable(t *testing.T) { pusher.response = &cortexpb.WriteResponse{} a := pa.Appender(ctx) - _, err = a.Append(0, lbls, 120_000, tc.value) + // We don't ingest sample if value is set to 0 for testing purpose. + if tc.value != 0 { + _, err = a.Append(0, lbls, 120_000, tc.value) + require.NoError(t, err) + } + + if tc.histogram != nil { + _, err = a.AppendHistogram(0, lbls, 120_000, tc.histogram, nil) + } else if tc.floatHistogram != nil { + _, err = a.AppendHistogram(0, lbls, 120_000, nil, tc.floatHistogram) + } require.NoError(t, err) require.NoError(t, a.Commit()) - - require.Equal(t, tc.expectedTS, pusher.request.Timeseries[0].Samples[0].TimestampMs) - + require.Equal(t, tc.expectedReq.String(), pusher.request.String()) }) } } @@ -165,6 +470,11 @@ func TestPusherErrors(t *testing.T) { _, err = a.Append(0, lbls, int64(model.Now()), 123456) require.NoError(t, err) + _, err = a.AppendHistogram(0, lbls, int64(model.Now()), histogram_util.GenerateTestHistogram(1), nil) + require.NoError(t, err) + _, err = a.AppendHistogram(0, lbls, int64(model.Now()), nil, histogram_util.GenerateTestFloatHistogram(2)) + require.NoError(t, err) + require.Equal(t, tc.returnedError, a.Commit()) require.Equal(t, tc.expectedWrites, int(testutil.ToFloat64(writes))) diff --git a/pkg/util/http_test.go b/pkg/util/http_test.go index 4c399a8603..e4de5b6b96 100644 --- a/pkg/util/http_test.go +++ b/pkg/util/http_test.go @@ -153,7 +153,8 @@ func TestParseProtoReader(t *testing.T) { {Value: 20, TimestampMs: 2}, {Value: 30, TimestampMs: 3}, }, - Exemplars: []cortexpb.Exemplar{}, + Exemplars: []cortexpb.Exemplar{}, + Histograms: []cortexpb.Histogram{}, }, }, },