Skip to content

Commit

Permalink
Query Frontend: add query instant tripperware for query sharding (#5561)
Browse files Browse the repository at this point in the history
* add query instant tripperware for query sharding

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix lint

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* working version for instant query sharding

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* tidy up

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* lint

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix unit test

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* remove ioutil

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix E2E test

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* add e2e test for instant query sharding

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

Signed-off-by: Ben Ye <ben.ye@bytedance.com>
  • Loading branch information
Ben Ye authored Aug 17, 2022
1 parent ddf3d77 commit 13a2eb8
Show file tree
Hide file tree
Showing 16 changed files with 2,838 additions and 1,215 deletions.
2 changes: 1 addition & 1 deletion internal/cortex/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package cortexpb;

option go_package = "cortexpb";

import "github.com/gogo/protobuf/gogoproto/gogo.proto";
import "gogoproto/gogo.proto";

option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
Expand Down
138 changes: 130 additions & 8 deletions internal/cortex/querier/queryrange/query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -103,6 +104,8 @@ type Response interface {
proto.Message
// GetHeaders returns the HTTP headers in the response.
GetHeaders() []*PrometheusResponseHeader
// GetStats returns the Prometheus query stats in the response.
GetStats() *PrometheusResponseStats
}

type prometheusCodec struct{}
Expand Down Expand Up @@ -156,6 +159,14 @@ func (resp *PrometheusResponse) minTime() int64 {
return result[0].Samples[0].TimestampMs
}

func (resp *PrometheusResponse) GetStats() *PrometheusResponseStats {
return resp.Data.Stats
}

func (resp *PrometheusInstantQueryResponse) GetStats() *PrometheusResponseStats {
return resp.Data.Stats
}

// NewEmptyPrometheusResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusResponse() *PrometheusResponse {
return &PrometheusResponse{
Expand All @@ -167,6 +178,19 @@ func NewEmptyPrometheusResponse() *PrometheusResponse {
}
}

// NewEmptyPrometheusInstantQueryResponse returns an empty successful Prometheus query range response.
func NewEmptyPrometheusInstantQueryResponse() *PrometheusInstantQueryResponse {
return &PrometheusInstantQueryResponse{
Status: StatusSuccess,
Data: PrometheusInstantQueryData{
ResultType: model.ValVector.String(),
Result: PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{},
},
},
}
}

func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
if len(responses) == 0 {
return NewEmptyPrometheusResponse(), nil
Expand All @@ -189,7 +213,7 @@ func (prometheusCodec) MergeResponse(responses ...Response) (Response, error) {
Data: PrometheusData{
ResultType: model.ValMatrix.String(),
Result: matrixMerge(promResponses),
Stats: statsMerge(promResponses),
Stats: StatsMerge(responses),
},
}

Expand Down Expand Up @@ -302,7 +326,7 @@ func (prometheusCodec) DecodeResponse(ctx context.Context, r *http.Response, _ R
log, ctx := spanlogger.New(ctx, "ParseQueryRangeResponse") //nolint:ineffassign,staticcheck
defer log.Finish()

buf, err := bodyBuffer(r)
buf, err := BodyBuffer(r)
if err != nil {
log.Error(err)
return nil, err
Expand All @@ -326,7 +350,7 @@ type Buffer interface {
Bytes() []byte
}

func bodyBuffer(res *http.Response) ([]byte, error) {
func BodyBuffer(res *http.Response) ([]byte, error) {
// Attempt to cast the response body to a Buffer and use it if possible.
// This is because the frontend may have already read the body and buffered it.
if buffer, ok := res.Body.(Buffer); ok {
Expand Down Expand Up @@ -398,22 +422,120 @@ func (s *SampleStream) MarshalJSON() ([]byte, error) {
return json.Marshal(stream)
}

// statsMerge merge the stats from 2 responses
// UnmarshalJSON implements json.Unmarshaler.
func (s *Sample) UnmarshalJSON(data []byte) error {
var sample struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}
if err := json.Unmarshal(data, &sample); err != nil {
return err
}
s.Labels = cortexpb.FromMetricsToLabelAdapters(sample.Metric)
s.Sample = sample.Value
return nil
}

// MarshalJSON implements json.Marshaler.
func (s *Sample) MarshalJSON() ([]byte, error) {
sample := struct {
Metric model.Metric `json:"metric"`
Value cortexpb.Sample `json:"value"`
}{
Metric: cortexpb.FromLabelAdaptersToMetric(s.Labels),
Value: s.Sample,
}
return json.Marshal(sample)
}

// UnmarshalJSON implements json.Unmarshaler.
func (s *PrometheusInstantQueryData) UnmarshalJSON(data []byte) error {
var queryData struct {
ResultType string `json:"resultType"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}

if err := json.Unmarshal(data, &queryData); err != nil {
return err
}
s.ResultType = queryData.ResultType
s.Stats = queryData.Stats
switch s.ResultType {
case model.ValVector.String():
var result struct {
Vector []*Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Samples{Samples: &Vector{
Result: result.Vector,
}},
}
case model.ValMatrix.String():
return errors.New("matrix result type not supported for PrometheusInstantQueryData")
default:
var result struct {
Sample cortexpb.Sample `json:"result"`
}
if err := json.Unmarshal(data, &result); err != nil {
return err
}
s.Result = PrometheusInstantQueryResult{
Result: &PrometheusInstantQueryResult_Sample{Sample: &result.Sample},
}
}
return nil
}

// MarshalJSON implements json.Marshaler.
func (s *PrometheusInstantQueryData) MarshalJSON() ([]byte, error) {
if s.ResultType == model.ValVector.String() {
res := struct {
ResultType string `json:"resultType"`
Data []*Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSamples().Result,
Stats: s.Stats,
}
return json.Marshal(res)
} else if s.ResultType == model.ValMatrix.String() {
return nil, errors.New("matrix result type not supported for PrometheusInstantQueryData")
}

// Other cases only include scalar and string.
res := struct {
ResultType string `json:"resultType"`
Data *cortexpb.Sample `json:"result"`
Stats *PrometheusResponseStats `json:"stats,omitempty"`
}{
ResultType: s.ResultType,
Data: s.Result.GetSample(),
Stats: s.Stats,
}
return json.Marshal(res)
}

// StatsMerge merge the stats from 2 responses
// this function is similar to matrixMerge
func statsMerge(resps []*PrometheusResponse) *PrometheusResponseStats {
func StatsMerge(resps []Response) *PrometheusResponseStats {
output := map[int64]*PrometheusResponseQueryableSamplesStatsPerStep{}
hasStats := false
for _, resp := range resps {
if resp.Data.Stats == nil {
stats := resp.GetStats()
if stats == nil {
continue
}

hasStats = true
if resp.Data.Stats.Samples == nil {
if stats.Samples == nil {
continue
}

for _, s := range resp.Data.Stats.Samples.TotalQueryableSamplesPerStep {
for _, s := range stats.Samples.TotalQueryableSamplesPerStep {
output[s.GetTimestampMs()] = s
}
}
Expand Down
Loading

0 comments on commit 13a2eb8

Please sign in to comment.