Skip to content

Commit

Permalink
Native histograms support in querier remote read (#4425)
Browse files Browse the repository at this point in the history
* Native histograms sample support in querier remote read

Signed-off-by: Justin Lei <justin.lei@grafana.com>
Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
Co-authored-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
leizor and krajorama authored Mar 9, 2023
1 parent 6605490 commit 1e26275
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
36 changes: 25 additions & 11 deletions pkg/querier/remote_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,24 +189,38 @@ func seriesSetToQueryResponse(s storage.SeriesSet) (*client.QueryResponse, error
for s.Next() {
series := s.At()
samples := []mimirpb.Sample{}
histograms := []mimirpb.Histogram{}
it = series.Iterator(it)
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
if valType != chunkenc.ValFloat {
return nil, fmt.Errorf("unsupported value type %v", valType)
switch valType {
case chunkenc.ValFloat:
t, v := it.At()
samples = append(samples, mimirpb.Sample{
TimestampMs: t,
Value: v,
})
case chunkenc.ValHistogram:
t, h := it.AtHistogram()
histograms = append(histograms, mimirpb.FromHistogramToHistogramProto(t, h))
case chunkenc.ValFloatHistogram:
t, h := it.AtFloatHistogram()
histograms = append(histograms, mimirpb.FromFloatHistogramToHistogramProto(t, h))
default:
return nil, fmt.Errorf("unsupported value type: %v", valType)
}
t, v := it.At()
samples = append(samples, mimirpb.Sample{
TimestampMs: t,
Value: v,
})
}

if err := it.Err(); err != nil {
return nil, err
}
result.Timeseries = append(result.Timeseries, mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(series.Labels()),
Samples: samples,
})

ts := mimirpb.TimeSeries{
Labels: mimirpb.FromLabelsToLabelAdapters(series.Labels()),
Samples: samples,
Histograms: histograms,
}

result.Timeseries = append(result.Timeseries, ts)
}

return result, s.Err()
Expand Down
26 changes: 13 additions & 13 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/util/test"
)

type mockSampleAndChunkQueryable struct {
Expand All @@ -44,14 +45,14 @@ func (m mockSampleAndChunkQueryable) ChunkQuerier(ctx context.Context, mint, max

type mockQuerier struct {
storage.Querier
matrix model.Matrix
seriesSet storage.SeriesSet
}

func (m mockQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if sp == nil {
panic("mockQuerier: select params must be set")
}
return series.MatrixToSeriesSet(m.matrix)
return m.seriesSet
}

type mockChunkQuerier struct {
Expand All @@ -70,17 +71,13 @@ func TestSampledRemoteRead(t *testing.T) {
q := &mockSampleAndChunkQueryable{
queryableFn: func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return mockQuerier{
matrix: model.Matrix{
{
Metric: model.Metric{"foo": "bar"},
Values: []model.SamplePair{
{Timestamp: 0, Value: 0},
{Timestamp: 1, Value: 1},
{Timestamp: 2, Value: 2},
{Timestamp: 3, Value: 3},
},
},
},
seriesSet: series.NewConcreteSeriesSet([]storage.Series{
series.NewConcreteSeries(
labels.FromStrings("foo", "bar"),
[]model.SamplePair{{Timestamp: 0, Value: 0}, {Timestamp: 1, Value: 1}, {Timestamp: 2, Value: 2}, {Timestamp: 3, Value: 3}},
[]mimirpb.Histogram{mimirpb.FromHistogramToHistogramProto(4, test.GenerateTestHistogram(4))},
),
}),
}, nil
},
}
Expand Down Expand Up @@ -124,6 +121,9 @@ func TestSampledRemoteRead(t *testing.T) {
{Value: 2, TimestampMs: 2},
{Value: 3, TimestampMs: 3},
},
Histograms: []mimirpb.Histogram{
mimirpb.FromHistogramToHistogramProto(4, test.GenerateTestHistogram(4)),
},
},
},
},
Expand Down

0 comments on commit 1e26275

Please sign in to comment.