Skip to content

Commit

Permalink
Implement metadata API limit in Ingester
Browse files Browse the repository at this point in the history
Signed-off-by: 🌲 Harry 🌊 John 🏔 <johrry@amazon.com>
  • Loading branch information
harry671003 committed Jul 30, 2024
1 parent 4e7dcfd commit 8671cac
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 129 deletions.
35 changes: 18 additions & 17 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
})
}

func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
"name": labelName,
"start": from.Unix(),
Expand All @@ -1033,7 +1033,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
return nil, err
}

req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, limit, matchers)
if err != nil {
return nil, err
}
Expand All @@ -1058,8 +1058,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
}

// LabelValuesForLabelName returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelValues(ctx, req)
if err != nil {
Expand All @@ -1071,8 +1071,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
}

// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, matchers ...*labels.Matcher) ([]string, error) {
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelValuesStream(ctx, req)
if err != nil {
Expand All @@ -1096,7 +1096,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, limit int, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
"start": from.Unix(),
"end": to.Unix(),
Expand All @@ -1110,6 +1110,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
req := &ingester_client.LabelNamesRequest{
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Limit: int64(limit),
}
resps, err := f(ctx, replicationSet, req)
if err != nil {
Expand All @@ -1131,8 +1132,8 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
return r, nil
}

func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, limit int) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.LabelNamesStream(ctx, req)
if err != nil {
Expand All @@ -1157,8 +1158,8 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
}

// LabelNames returns all the label names.
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, limit int) ([]string, error) {
return d.LabelNamesCommon(ctx, from, to, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.LabelNames(ctx, req)
if err != nil {
Expand All @@ -1170,8 +1171,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
}

// MetricsForLabelMatchers gets the metrics that match said matchers
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, limit int, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
resp, err := client.MetricsForLabelMatchers(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1199,8 +1200,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
}, matchers...)
}

func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, limit int, matchers ...*labels.Matcher) ([]model.Metric, error) {
return d.metricsForLabelMatchersCommon(ctx, from, through, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
if err != nil {
Expand Down Expand Up @@ -1239,14 +1240,14 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
}, matchers...)
}

func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, limit int, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
replicationSet, err := d.GetIngestersForMetadata(ctx)
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
if err != nil {
return nil, err
}

req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, limit, matchers)
if err != nil {
return nil, err
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse {
}

// ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto
func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
func ToMetricsForLabelMatchersRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
Expand All @@ -124,6 +124,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
MatchersSet: []*LabelMatchers{{Matchers: ms}},
Limit: int64(limit),
}, nil
}

Expand Down Expand Up @@ -174,22 +175,22 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
}

// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
for _, matchers := range req.MatchersSet {
matchers, err := FromLabelMatchers(matchers.Matchers)
if err != nil {
return 0, 0, nil, err
return 0, 0, 0, nil, err
}
matchersSet = append(matchersSet, matchers)
}
from := model.Time(req.StartTimestampMs)
to := model.Time(req.EndTimestampMs)
return from, to, matchersSet, nil
return from, to, int(req.Limit), matchersSet, nil
}

// ToLabelValuesRequest builds a LabelValuesRequest proto
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
ms, err := toLabelMatchers(matchers)
if err != nil {
return nil, err
Expand All @@ -200,22 +201,23 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matche
StartTimestampMs: int64(from),
EndTimestampMs: int64(to),
Matchers: &LabelMatchers{Matchers: ms},
Limit: int64(limit),
}, nil
}

// FromLabelValuesRequest unpacks a LabelValuesRequest proto
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
var err error
var matchers []*labels.Matcher

if req.Matchers != nil {
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
if err != nil {
return "", 0, 0, nil, err
return "", 0, 0, 0, nil, err
}
}

return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
}

func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
Expand Down
Loading

0 comments on commit 8671cac

Please sign in to comment.