Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cortex_ruler_rule_groups_in_store metric #5869

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## master / unreleased

* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store` that is the total rule groups per tenant in store, which can be used to compare with `cortex_prometheus_rule_group_rules` to count the number of rule groups that are not loaded by a ruler. #5869

## 1.18.0 in progress

Expand Down
3 changes: 3 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,9 @@ func TestRulerSharding(t *testing.T) {
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
// Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values
require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
Expand Down
36 changes: 36 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,39 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}

type RuleGroupMetrics struct {
RuleGroupsInStore *prometheus.GaugeVec
tenants map[string]struct{}
allowedTenants *util.AllowedTenants
}

func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics {
m := &RuleGroupMetrics{
RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ruler_rule_groups_in_store",
Help: "The number of rule groups a tenant has in store.",
}, []string{"user"}),
allowedTenants: allowedTenants,
}
return m
}

// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule
// groups per tenant and removing the metrics for tenants that are not present anymore
func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) {
tenants := make(map[string]struct{}, len(ruleGroupsCount))
for userID, count := range ruleGroupsCount {
if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups
continue
}
tenants[userID] = struct{}{}
r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count))
}
for userID := range r.tenants {
if _, ok := tenants[userID]; !ok {
r.RuleGroupsInStore.DeleteLabelValues(userID)
}
}
r.tenants = tenants
}
38 changes: 38 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,41 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}

func TestRuleGroupMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"}))
m.UpdateRuleGroupsInStore(map[string]int{
"fake1": 10,
"fake2": 20,
"fake3": 30,
})
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake1",
}, float64(10))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{
"user": "fake2",
}, float64(20))
m.UpdateRuleGroupsInStore(map[string]int{
"fake2": 30,
})
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake2",
}, float64(30))
m.UpdateRuleGroupsInStore(make(map[string]int))
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"])
}
15 changes: 15 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ type Ruler struct {
ruleGroupStoreLoadDuration prometheus.Gauge
ruleGroupSyncDuration prometheus.Gauge
rulerGetRulesFailures *prometheus.CounterVec
ruleGroupMetrics *RuleGroupMetrics

allowedTenants *util.AllowedTenants

Expand Down Expand Up @@ -342,6 +343,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
Help: "The total number of failed rules request sent to rulers in getShardedRules.",
}, []string{"ruler"}),
}
ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants)

if len(cfg.EnabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
Expand Down Expand Up @@ -667,7 +669,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
if err != nil {
return nil, nil, err
}
ruleGroupCounts := make(map[string]int, len(allRuleGroups))
for userID, groups := range allRuleGroups {
ruleGroupCounts[userID] = len(groups)
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
Expand All @@ -682,6 +686,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
}
allRuleGroups[userID] = filteredGroupsForUser
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return allRuleGroups, nil, nil
}

Expand All @@ -691,9 +696,11 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
return nil, nil, err
}

ruleGroupCounts := make(map[string]int, len(configs))
ownedConfigs := make(map[string]rulespb.RuleGroupList)
backedUpConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
ruleGroupCounts[userID] = len(groups)
owned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(owned) > 0 {
ownedConfigs[userID] = owned
Expand All @@ -705,6 +712,7 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
}
}
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return ownedConfigs, backedUpConfigs, nil
}

Expand Down Expand Up @@ -732,6 +740,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

if len(userRings) == 0 {
r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int))
return nil, nil, nil
}

Expand All @@ -744,6 +753,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
mu := sync.Mutex{}
owned := map[string]rulespb.RuleGroupList{}
backedUp := map[string]rulespb.RuleGroupList{}
gLock := sync.Mutex{}
Copy link
Contributor

@rapphil rapphil Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by now you must realize that I don't like mutexes 😅 (not that I don't like, but I try to minimize the use and prefer lock free whenever possible).

I think you can create a slice of maps of size concurrency and give one dedicated map per goroutine and merge all maps from the slice after the wait group. The same approach could be used with owned and backedUp to get rid of the other mutex.

Copy link
Contributor Author

@emanlodovice emanlodovice Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code will be much more complex in that approach. The use of mutex here seems to be acceptable considering the small scope of the lock. Maybe @yeya24 you can share your opinion on this matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for me, but a nice to have, therefore I will approve.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with the lock here as setting the map is a very lightweight operation

ruleGroupCounts := make(map[string]int, len(userRings))

concurrency := loadRulesConcurrency
if len(userRings) < concurrency {
Expand All @@ -758,6 +769,9 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
if err != nil {
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}
gLock.Lock()
ruleGroupCounts[userID] = len(groups)
gLock.Unlock()

filterOwned := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
var filterBackup []*rulespb.RuleGroupDesc
Expand All @@ -781,6 +795,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

err = g.Wait()
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return owned, backedUp, err
}

Expand Down
19 changes: 16 additions & 3 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,23 @@ func buildRuler(t *testing.T, rulerConfig Config, querierTestConfig *querier.Tes
func newTestRuler(t *testing.T, rulerConfig Config, store rulestore.RuleStore, querierTestConfig *querier.TestConfig) *Ruler {
ruler, _ := buildRuler(t, rulerConfig, querierTestConfig, store, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ruler))
rgs, err := store.ListAllRuleGroups(context.Background())
require.NoError(t, err)

// Ensure all rules are loaded before usage
ruler.syncRules(context.Background(), rulerSyncReasonInitial)

// Wait to ensure syncRules has finished and all rules are loaded before usage
deadline := time.Now().Add(3 * time.Second)
for {
loaded := true
for tenantId := range rgs {
if len(ruler.manager.GetRules(tenantId)) == 0 {
loaded = false
}
}
if time.Now().After(deadline) || loaded {
break
}
time.Sleep(50 * time.Millisecond)
}
return ruler
}

Expand Down
Loading