Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Prometheus to LabelNames with matchers #4380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* `-store-gateway.sharding-ring.heartbeat-period`
* [ENHANCEMENT] Memberlist: optimized receive path for processing ring state updates, to help reduce CPU utilization in large clusters. #4345
* [ENHANCEMENT] Memberlist: expose configuration of memberlist packet compression via `-memberlist.compression=enabled`. #4346
* [ENHANCEMENT] Updated Prometheus to include changes from prometheus/prometheus#9083. Now whenever `/labels` API calls include matchers, blocks store is queried for `LabelNames` with matchers instead of `Series` calls which was inefficient. #4380
* [BUGFIX] HA Tracker: when cleaning up obsolete elected replicas from KV store, tracker didn't update number of cluster per user correctly. #4336
* [BUGFIX] Ruler: fixed counting of PromQL evaluation errors as user-errors when updating `cortex_ruler_queries_failed_total`. #4335
* [BUGFIX] Ingester: When using block storage, prevent any reads or writes while the ingester is stopping. This will prevent accessing TSDB blocks once they have been already closed. #4304
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.29.0
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e
github.com/sony/gobreaker v0.4.1
github.com/spf13/afero v1.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1529,8 +1529,8 @@ github.com/prometheus/prometheus v1.8.2-0.20210215121130-6f488061dfb4/go.mod h1:
github.com/prometheus/prometheus v1.8.2-0.20210315220929-1cba1741828b/go.mod h1:MS/bpdil77lPbfQeKk6OqVQ9OLnpN3Rszd0hka0EOWE=
github.com/prometheus/prometheus v1.8.2-0.20210324152458-c7a62b95cea0/go.mod h1:sf7j/iAbhZahjeC0s3wwMmp5dksrJ/Za1UKdR+j6Hmw=
github.com/prometheus/prometheus v1.8.2-0.20210421143221-52df5ef7a3be/go.mod h1:WbIKsp4vWCoPHis5qQfd0QimLOR7qe79roXN5O8U8bs=
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003 h1:MYbsDV+OIFLkqwea5oC3jIUp/HxFyXQrvXpw/hooMRQ=
github.com/prometheus/prometheus v1.8.2-0.20210720084720-59d02b5ef003/go.mod h1:o6V+A4iPEWjLG0rSEKeev3OzfBZwP+ay+4iS4dkfLI4=
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d h1:UnqZFF2qXa+ctCfbss/J4yn9rTVoTiuawjrokqwt4Hg=
github.com/prometheus/prometheus v1.8.2-0.20210720123808-b1ed4a0a663d/go.mod h1:o6V+A4iPEWjLG0rSEKeev3OzfBZwP+ay+4iS4dkfLI4=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rafaeljusto/redigomock v0.0.0-20190202135759-257e089e14a1/go.mod h1:JaY6n2sDr+z2WTsXkOmNRUfDy6FN0L6Nk7x06ndm4tY=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
Expand Down
6 changes: 4 additions & 2 deletions pkg/configs/legacy_promql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,10 @@ func (q *errQuerier) Select(bool, *storage.SelectHints, ...*labels.Matcher) stor
func (q *errQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, q.err
}
func (q *errQuerier) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, q.err }
func (q *errQuerier) Close() error { return q.err }
func (q *errQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, q.err
}
func (q *errQuerier) Close() error { return q.err }

func TestQueryError(t *testing.T) {
engine := NewEngine(nil, nil, 10, 10*time.Second)
Expand Down
31 changes: 21 additions & 10 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,15 @@ type BlocksStoreQueryable struct {
subservicesWatcher *services.FailureWatcher
}

func NewBlocksStoreQueryable(stores BlocksStoreSet, finder BlocksFinder, consistency *BlocksConsistencyChecker, limits BlocksStoreLimits, queryStoreAfter time.Duration, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
func NewBlocksStoreQueryable(
stores BlocksStoreSet,
finder BlocksFinder,
consistency *BlocksConsistencyChecker,
limits BlocksStoreLimits,
queryStoreAfter time.Duration,
logger log.Logger,
reg prometheus.Registerer,
) (*BlocksStoreQueryable, error) {
manager, err := services.NewManager(stores, finder)
if err != nil {
return nil, errors.Wrap(err, "register blocks storage queryable subservices")
Expand Down Expand Up @@ -317,20 +325,21 @@ func (q *blocksStoreQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
return q.selectSorted(sp, matchers...)
}

func (q *blocksStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *blocksStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
spanLog, spanCtx := spanlogger.New(q.ctx, "blocksStoreQuerier.LabelNames")
defer spanLog.Span.Finish()

minT, maxT := q.minT, q.maxT

var (
resMtx sync.Mutex
resNameSets = [][]string{}
resWarnings = storage.Warnings(nil)
resMtx sync.Mutex
resNameSets = [][]string{}
resWarnings = storage.Warnings(nil)
convertedMatchers = convertMatchersToLabelMatcher(matchers)
)

queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT)
nameSets, warnings, queriedBlocks, err := q.fetchLabelNamesFromStore(spanCtx, clients, minT, maxT, convertedMatchers)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -693,6 +702,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
clients map[BlocksStoreClient][]ulid.ULID,
minT int64,
maxT int64,
matchers []storepb.LabelMatcher,
) ([][]string, storage.Warnings, []ulid.ULID, error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, cortex_tsdb.TenantIDExternalLabel, q.userID)
Expand All @@ -711,7 +721,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore(
blockIDs := blockIDs

g.Go(func() error {
req, err := createLabelNamesRequest(minT, maxT, blockIDs)
req, err := createLabelNamesRequest(minT, maxT, blockIDs, matchers)
if err != nil {
return errors.Wrapf(err, "failed to create label names request")
}
Expand Down Expand Up @@ -870,10 +880,11 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
}, nil
}

func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID) (*storepb.LabelNamesRequest, error) {
func createLabelNamesRequest(minT, maxT int64, blockIDs []ulid.ULID, matchers []storepb.LabelMatcher) (*storepb.LabelNamesRequest, error) {
req := &storepb.LabelNamesRequest{
Start: minT,
End: maxT,
Start: minT,
End: maxT,
Matchers: matchers,
}

// Selectively query only specific blocks.
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/chunk_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (q *chunkStoreQuerier) LabelValues(name string, labels ...*labels.Matcher)
return nil, nil, nil
}

func (q *chunkStoreQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *chunkStoreQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
37 changes: 35 additions & 2 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,44 @@ func (q *distributorQuerier) LabelValues(name string, matchers ...*labels.Matche
return lvs, nil, err
}

func (q *distributorQuerier) LabelNames() ([]string, storage.Warnings, error) {
ln, err := q.distributor.LabelNames(q.ctx, model.Time(q.mint), model.Time(q.maxt))
func (q *distributorQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if len(matchers) > 0 {
return q.labelNamesWithMatchers(matchers...)
}

log, ctx := spanlogger.New(q.ctx, "distributorQuerier.LabelNames")
defer log.Span.Finish()

ln, err := q.distributor.LabelNames(ctx, model.Time(q.mint), model.Time(q.maxt))
return ln, nil, err
}

// labelNamesWithMatchers performs the LabelNames call by calling ingester's MetricsForLabelMatchers method
func (q *distributorQuerier) labelNamesWithMatchers(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.labelNamesWithMatchers")
defer log.Span.Finish()

ms, err := q.distributor.MetricsForLabelMatchers(ctx, model.Time(q.mint), model.Time(q.maxt), matchers...)
if err != nil {
return nil, nil, err
}
namesMap := make(map[string]struct{})

for _, m := range ms {
for name := range m.Metric {
namesMap[string(name)] = struct{}{}
}
}

names := make([]string, 0, len(namesMap))
for name := range namesMap {
names = append(names, name)
}
sort.Strings(names)

return names, nil, nil
}

func (q *distributorQuerier) Close() error {
return nil
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,30 @@ func verifySeries(t *testing.T, series storage.Series, l labels.Labels, samples
require.False(t, it.Next())
require.Nil(t, it.Err())
}
func TestDistributorQuerier_LabelNames(t *testing.T) {
someMatchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")}
labelNames := []string{"foo", "job"}

t.Run("with matchers", func(t *testing.T) {
metrics := []metric.Metric{
{Metric: model.Metric{"foo": "bar"}},
{Metric: model.Metric{"job": "baz"}},
{Metric: model.Metric{"job": "baz", "foo": "boom"}},
}
d := &mockDistributor{}
d.On("MetricsForLabelMatchers", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

queryable := newDistributorQueryable(d, false, nil, 0)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

names, warnings, err := querier.LabelNames(someMatchers...)
require.NoError(t, err)
assert.Empty(t, warnings)
assert.Equal(t, labelNames, names)
})
}

func convertToChunks(t *testing.T, samples []cortexpb.Sample) []client.Chunk {
// We need to make sure that there is atleast one chunk present,
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/duplicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (m testQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
return nil, nil, nil
}

func (m testQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (m testQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/error_translate_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ func (e errorTranslateQuerier) LabelValues(name string, matchers ...*labels.Matc
return values, warnings, e.fn(err)
}

func (e errorTranslateQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
func (e errorTranslateQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames(matchers...)
return values, warnings, e.fn(err)
}

Expand All @@ -149,8 +149,8 @@ func (e errorTranslateChunkQuerier) LabelValues(name string, matchers ...*labels
return values, warnings, e.fn(err)
}

func (e errorTranslateChunkQuerier) LabelNames() ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames()
func (e errorTranslateChunkQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
values, warnings, err := e.q.LabelNames(matchers...)
return values, warnings, e.fn(err)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/error_translate_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (t errorTestQuerier) LabelValues(name string, matchers ...*labels.Matcher)
return nil, nil, t.err
}

func (t errorTestQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (t errorTestQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, t.err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/lazyquery/lazyquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ func (l LazyQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
}

// LabelNames implements Storage.Querier
func (l LazyQuerier) LabelNames() ([]string, storage.Warnings, error) {
return l.next.LabelNames()
func (l LazyQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return l.next.LabelNames(matchers...)
}

// Close implements Storage.Querier
Expand Down
8 changes: 4 additions & 4 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ func (q querier) LabelValues(name string, matchers ...*labels.Matcher) ([]string
return strutil.MergeSlices(sets...), warnings, nil
}

func (q querier) LabelNames() ([]string, storage.Warnings, error) {
func (q querier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
if !q.queryStoreForLabels {
return q.metadataQuerier.LabelNames()
return q.metadataQuerier.LabelNames(matchers...)
}

if len(q.queriers) == 1 {
return q.queriers[0].LabelNames()
return q.queriers[0].LabelNames(matchers...)
}

var (
Expand All @@ -453,7 +453,7 @@ func (q querier) LabelNames() ([]string, storage.Warnings, error) {
querier := querier
g.Go(func() error {
// NB: Names are sorted in Cortex already.
myNames, myWarnings, err := querier.LabelNames()
myNames, myWarnings, err := querier.LabelNames(matchers...)
if err != nil {
return err
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,35 @@ func TestQuerier_ValidateQueryTimeRange_MaxQueryLookback(t *testing.T) {
}
})

t.Run("label names with matchers", func(t *testing.T) {
matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchNotEqual, "route", "get_user"),
}
distributor := &mockDistributor{}
distributor.On("MetricsForLabelMatchers", mock.Anything, mock.Anything, mock.Anything, matchers).Return([]metric.Metric{}, nil)

queryable, _, _ := New(cfg, overrides, distributor, queryables, purger.NewTombstonesLoader(nil, nil), nil, log.NewNopLogger())
q, err := queryable.Querier(ctx, util.TimeToMillis(testData.queryStartTime), util.TimeToMillis(testData.queryEndTime))
require.NoError(t, err)

_, _, err = q.LabelNames(matchers...)
require.NoError(t, err)

if !testData.expectedSkipped {
// Assert on the time range of the actual executed query (5s delta).
delta := float64(5000)
require.Len(t, distributor.Calls, 1)
assert.Equal(t, "MetricsForLabelMatchers", distributor.Calls[0].Method)
args := distributor.Calls[0].Arguments
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataStartTime), int64(args.Get(1).(model.Time)), delta)
assert.InDelta(t, util.TimeToMillis(testData.expectedMetadataEndTime), int64(args.Get(2).(model.Time)), delta)
assert.Equal(t, matchers, args.Get(3).([]*labels.Matcher))
} else {
// Ensure no query has been executed executed (because skipped).
assert.Len(t, distributor.Calls, 0)
}
})

t.Run("label values", func(t *testing.T) {
distributor := &mockDistributor{}
distributor.On("LabelValuesForLabelName", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]string{}, nil)
Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/queryrange/promql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,8 +614,10 @@ func (m *testMatrix) Select(_ bool, selectParams *storage.SelectHints, matchers
func (m *testMatrix) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) LabelNames() ([]string, storage.Warnings, error) { return nil, nil, nil }
func (m *testMatrix) Close() error { return nil }
func (m *testMatrix) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}
func (m *testMatrix) Close() error { return nil }

func newSeries(metric labels.Labels, generator func(float64) float64) *promql.StorageSeries {
sort.Sort(metric)
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (q *ShardedQuerier) LabelValues(name string, matchers ...*labels.Matcher) (
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *ShardedQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (q *ShardedQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.Errorf("unimplemented")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/queryrange/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (q *MockShardedQueryable) LabelValues(name string, matchers ...*labels.Matc
}

// LabelNames returns all the unique label names present in the block in sorted order.
func (q *MockShardedQueryable) LabelNames() ([]string, storage.Warnings, error) {
func (q *MockShardedQueryable) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, errors.Errorf("unimplemented")
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m mockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]st
return nil, nil, nil
}

func (m mockQuerier) LabelNames() ([]string, storage.Warnings, error) {
func (m mockQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
return nil, nil, nil
}

Expand Down
Loading