Skip to content

Commit

Permalink
Remove user specific evaluation metrics when rule manager is removed
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <lodovice@amazon.com>
  • Loading branch information
emanlodovice committed Feb 14, 2024
1 parent 0a19a7d commit e1c1d01
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 54 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [CHANGE] Index Cache: Multi level cache backfilling operation becomes async. Added `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` and `-blocks-storage.bucket-store.index-cache.multilevel.max-async-buffer-size` configs and metric `cortex_store_multilevel_index_cache_backfill_dropped_items_total` for number of dropped items. #5661
* [CHANGE] Ingester: Disable uploading compacted blocks and overlapping compaction in ingester. #5735
* [CHANGE] Distributor: Count the number of rate-limited samples in `distributor_samples_in_total`. #5714
* [CHANGE] Ruler: Remove `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, `cortex_ruler_queries_failed_total`, and `cortex_ruler_query_seconds_total` metrics for the tenant when the ruler deletes the manager for the tenant. #5772
* [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
Expand Down
16 changes: 8 additions & 8 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,6 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
}

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
var totalQueries = []float64{0}

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
Expand Down Expand Up @@ -769,19 +768,20 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Check that cortex_ruler_queries_total went up
totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, totalQueries[0], float64(0))

// Delete rule before checkin "cortex_ruler_queries_total", as we want to reuse value for next test.
require.NoError(t, c.DeleteRuleGroup(namespace, groupName))

// Wait until ruler has unloaded the group. We don't use any matcher, so there should be no groups (in fact, metric disappears).
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

// Remember totalQueries for next test.
totalQueries = newTotalQueries
// Deleting the rule group should clean up the cortex_ruler_queries_total metrics
_, err = ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.EqualError(t, err, "metric=cortex_ruler_queries_total service=ruler: metric not found")
})
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
}

t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
metrics := ruler.NewRuleEvalMetrics(t.Cfg.Ruler, prometheus.DefaultRegisterer)

if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
Expand Down Expand Up @@ -577,15 +578,15 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
queryEngine = promql.NewEngine(opts)
}

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
} else {
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
// TODO: Consider wrapping logger to differentiate from querier module logger
queryable, _, engine := querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, rulerRegisterer, util_log.Logger)

managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, prometheus.DefaultRegisterer, util_log.Logger)
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Distributor, queryable, engine, t.Overrides, metrics, prometheus.DefaultRegisterer)
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)
}

if err != nil {
Expand Down
40 changes: 7 additions & 33 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -279,47 +278,22 @@ type RulesManager interface {
// ManagerFactory is a function that creates new RulesManager for given user and notifier.Manager.
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory {
totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
}, []string{"user"})
failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
}, []string{"user"})

totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
}, []string{"user"})
failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
}, []string{"user"})
var rulerQuerySeconds *prometheus.CounterVec
if cfg.EnableQueryStats {
rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_query_seconds_total",
Help: "Total amount of wall clock time spent processing queries by the ruler.",
}, []string{"user"})
}

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer) ManagerFactory {
// Wrap errors returned by Queryable to our wrapper, so that we can distinguish between those errors
// and errors returned by PromQL engine. Errors from Queryable can be either caused by user (limits) or internal errors.
// Errors from PromQL are always "user" errors.
q = querier.NewErrorTranslateQueryableWithFn(q, WrapQueryableErrors)

return func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager {
var queryTime prometheus.Counter
if rulerQuerySeconds != nil {
queryTime = rulerQuerySeconds.WithLabelValues(userID)
if evalMetrics.RulerQuerySeconds != nil {
queryTime = evalMetrics.RulerQuerySeconds.WithLabelValues(userID)
}

failedQueries := failedQueriesVec.WithLabelValues(userID)
totalQueries := totalQueriesVec.WithLabelValues(userID)
totalWrites := totalWritesVec.WithLabelValues(userID)
failedWrites := failedWritesVec.WithLabelValues(userID)
failedQueries := evalMetrics.FailedQueriesVec.WithLabelValues(userID)
totalQueries := evalMetrics.TotalQueriesVec.WithLabelValues(userID)
totalWrites := evalMetrics.TotalWritesVec.WithLabelValues(userID)
failedWrites := evalMetrics.FailedWritesVec.WithLabelValues(userID)

engineQueryFunc := EngineQueryFunc(engine, q, overrides, userID)
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)
Expand Down
13 changes: 9 additions & 4 deletions pkg/ruler/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ import (
)

type DefaultMultiTenantManager struct {
cfg Config
notifierCfg *config.Config
managerFactory ManagerFactory
cfg Config
notifierCfg *config.Config
managerFactory ManagerFactory
ruleEvalMetrics *RuleEvalMetrics

mapper *mapper

Expand All @@ -51,7 +52,7 @@ type DefaultMultiTenantManager struct {
logger log.Logger
}

func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, evalMetrics *RuleEvalMetrics, reg prometheus.Registerer, logger log.Logger) (*DefaultMultiTenantManager, error) {
ncfg, err := buildNotifierConfig(&cfg)
if err != nil {
return nil, err
Expand All @@ -78,6 +79,7 @@ func NewDefaultMultiTenantManager(cfg Config, managerFactory ManagerFactory, reg
cfg: cfg,
notifierCfg: ncfg,
managerFactory: managerFactory,
ruleEvalMetrics: evalMetrics,
notifiers: map[string]*rulerNotifier{},
notifiersDiscoveryMetrics: notifiersDiscoveryMetrics,
mapper: newMapper(cfg.RulePath, logger),
Expand Down Expand Up @@ -130,6 +132,9 @@ func (r *DefaultMultiTenantManager) SyncRuleGroups(ctx context.Context, ruleGrou
r.lastReloadSuccessfulTimestamp.DeleteLabelValues(userID)
r.configUpdatesTotal.DeleteLabelValues(userID)
r.userManagerMetrics.RemoveUserRegistry(userID)
if r.ruleEvalMetrics != nil {
r.ruleEvalMetrics.deletePerUserMetrics(userID)
}
level.Info(r.logger).Log("msg", "deleted rule manager and local rule files", "user", userID)
}
}
Expand Down
49 changes: 49 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ruler

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -222,3 +223,51 @@ func (m *ManagerMetrics) Collect(out chan<- prometheus.Metric) {
data.SendSumOfGaugesPerUser(out, m.NotificationQueueCapacity, "prometheus_notifications_queue_capacity")
data.SendSumOfGaugesPerUser(out, m.AlertmanagersDiscovered, "prometheus_notifications_alertmanagers_discovered")
}

type RuleEvalMetrics struct {
TotalWritesVec *prometheus.CounterVec
FailedWritesVec *prometheus.CounterVec
TotalQueriesVec *prometheus.CounterVec
FailedQueriesVec *prometheus.CounterVec
RulerQuerySeconds *prometheus.CounterVec
}

func NewRuleEvalMetrics(cfg Config, reg prometheus.Registerer) *RuleEvalMetrics {
m := &RuleEvalMetrics{
TotalWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
}, []string{"user"}),
FailedWritesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
}, []string{"user"}),
TotalQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
}, []string{"user"}),
FailedQueriesVec: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
}, []string{"user"}),
}
if cfg.EnableQueryStats {
m.RulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_query_seconds_total",
Help: "Total amount of wall clock time spent processing queries by the ruler.",
}, []string{"user"})
}

return m
}

func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.TotalWritesVec.DeleteLabelValues(userID)
m.FailedWritesVec.DeleteLabelValues(userID)
m.TotalQueriesVec.DeleteLabelValues(userID)
m.FailedQueriesVec.DeleteLabelValues(userID)

if m.RulerQuerySeconds != nil {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}
39 changes: 39 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/util"
)

func TestManagerMetricsWithRuleGroupLabel(t *testing.T) {
Expand Down Expand Up @@ -556,3 +558,40 @@ func TestMetricsArePerUser(t *testing.T) {
assert.True(t, foundUserLabel, "user label not found for metric %s", desc.String())
}
}

func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()

m := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m.TotalWritesVec.WithLabelValues("fake1").Add(10)
m.TotalWritesVec.WithLabelValues("fake2").Add(10)
m.FailedWritesVec.WithLabelValues("fake1").Add(10)
m.FailedWritesVec.WithLabelValues("fake2").Add(10)
m.TotalQueriesVec.WithLabelValues("fake1").Add(10)
m.TotalQueriesVec.WithLabelValues("fake2").Add(10)
m.FailedQueriesVec.WithLabelValues("fake1").Add(10)
m.FailedQueriesVec.WithLabelValues("fake2").Add(10)
m.RulerQuerySeconds.WithLabelValues("fake1").Add(10)
m.RulerQuerySeconds.WithLabelValues("fake2").Add(10)

metricNames := []string{"cortex_ruler_write_requests_total", "cortex_ruler_write_requests_failed_total", "cortex_ruler_queries_total", "cortex_ruler_queries_failed_total", "cortex_ruler_query_seconds_total"}
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
for _, name := range metricNames {
require.Contains(t, mfm[name].String(), "value:\"fake1\"")
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}

m.deletePerUserMetrics("fake1")
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
for _, name := range metricNames {
require.NotContains(t, mfm[name].String(), "value:\"fake1\"")
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}
44 changes: 43 additions & 1 deletion pkg/ruler/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
"go.uber.org/atomic"

"github.com/cortexproject/cortex/pkg/ruler/rulespb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/test"
)

func TestSyncRuleGroups(t *testing.T) {
dir := t.TempDir()

m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, log.NewNopLogger())
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, nil, nil, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"
Expand Down Expand Up @@ -96,6 +97,47 @@ func TestSyncRuleGroups(t *testing.T) {
})
}

func TestSyncRuleGroupsCleanUpPerUserMetrics(t *testing.T) {
dir := t.TempDir()
reg := prometheus.NewPedanticRegistry()
evalMetrics := NewRuleEvalMetrics(Config{RulePath: dir, EnableQueryStats: true}, reg)
m, err := NewDefaultMultiTenantManager(Config{RulePath: dir}, factory, evalMetrics, reg, log.NewNopLogger())
require.NoError(t, err)

const user = "testUser"

evalMetrics.TotalWritesVec.WithLabelValues(user).Add(10)

userRules := map[string]rulespb.RuleGroupList{
user: {
&rulespb.RuleGroupDesc{
Name: "group1",
Namespace: "ns",
Interval: 1 * time.Minute,
User: user,
},
},
}
m.SyncRuleGroups(context.Background(), userRules)
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Contains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"")
require.Contains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")

// Passing empty map / nil stops all managers.
m.SyncRuleGroups(context.Background(), nil)
require.Nil(t, getManager(m, user))

gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.NotContains(t, mfm["cortex_ruler_write_requests_total"].String(), "value:\""+user+"\"")
require.NotContains(t, mfm["cortex_ruler_config_last_reload_successful"].String(), "value:\""+user+"\"")
}

func getManager(m *DefaultMultiTenantManager, user string) RulesManager {
m.userManagerMtx.Lock()
defer m.userManagerMtx.Unlock()
Expand Down
10 changes: 6 additions & 4 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func testSetup(t *testing.T, querierTestConfig *querier.TestConfig) (*promql.Eng

func newManager(t *testing.T, cfg Config) *DefaultMultiTenantManager {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, nil)
manager, err := NewDefaultMultiTenantManager(cfg, DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, nil), reg, logger)
metrics := NewRuleEvalMetrics(cfg, nil)
managerFactory := DefaultTenantManagerFactory(cfg, pusher, queryable, engine, overrides, metrics, nil)
manager, err := NewDefaultMultiTenantManager(cfg, managerFactory, metrics, reg, logger)
require.NoError(t, err)

return manager
Expand Down Expand Up @@ -221,9 +223,9 @@ func newMockClientsPool(cfg Config, logger log.Logger, reg prometheus.Registerer

func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.TestConfig, store rulestore.RuleStore, rulerAddrMap map[string]*Ruler) (*Ruler, *DefaultMultiTenantManager) {
engine, queryable, pusher, logger, overrides, reg := testSetup(t, querierTestConfig)

managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, reg)
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, reg, log.NewNopLogger())
metrics := NewRuleEvalMetrics(rulerConfig, reg)
managerFactory := DefaultTenantManagerFactory(rulerConfig, pusher, queryable, engine, overrides, metrics, reg)
manager, err := NewDefaultMultiTenantManager(rulerConfig, managerFactory, metrics, reg, log.NewNopLogger())
require.NoError(t, err)

ruler, err := newRuler(
Expand Down

0 comments on commit e1c1d01

Please sign in to comment.