Skip to content

Commit

Permalink
Add cortex_ruler_rule_groups_in_store metric
Browse files Browse the repository at this point in the history
Signed-off-by: Emmanuel Lodovice <lodovice@amazon.com>
  • Loading branch information
emanlodovice committed Apr 19, 2024
1 parent 00ffb3c commit 80f2933
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805
* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837
* [ENHANCEMENT] Ring: Add experimental `-ingester.tokens-generator-strategy=minimize-spread` flag to enable the new minimize spread token generator strategy. #5855
* [ENHANCEMENT] Ruler: Add new ruler metric `cortex_ruler_rule_groups_in_store`. #5869
* [BUGFIX] Distributor: Do not use label with empty values for sharding #5717
* [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719
* [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734
Expand Down
3 changes: 3 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,9 @@ func TestRulerSharding(t *testing.T) {
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
// Even with rules sharded, we expect rulers to have the same cortex_ruler_rule_groups_in_store metric values
require.NoError(t, ruler1.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Equals(numRulesGroups), "cortex_ruler_rule_groups_in_store"))

// Fetch the rules and ensure they match the configured ones.
actualGroups, err := c.GetPrometheusRules(e2ecortex.DefaultFilter)
Expand Down
41 changes: 41 additions & 0 deletions pkg/ruler/manager_metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ruler

import (
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -271,3 +273,42 @@ func (m *RuleEvalMetrics) deletePerUserMetrics(userID string) {
m.RulerQuerySeconds.DeleteLabelValues(userID)
}
}

type RuleGroupMetrics struct {
mtx sync.Mutex
RuleGroupsInStore *prometheus.GaugeVec
tenants map[string]struct{}
allowedTenants *util.AllowedTenants
}

func NewRuleGroupMetrics(reg prometheus.Registerer, allowedTenants *util.AllowedTenants) *RuleGroupMetrics {
m := &RuleGroupMetrics{
RuleGroupsInStore: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ruler_rule_groups_in_store",
Help: "The number of rule groups a tenant has in store.",
}, []string{"user"}),
allowedTenants: allowedTenants,
}
return m
}

// UpdateRuleGroupsInStore updates the cortex_ruler_rule_groups_in_store metric with the provided number of rule
// groups per tenant and removing the metrics for tenants that are not present anymore
func (r *RuleGroupMetrics) UpdateRuleGroupsInStore(ruleGroupsCount map[string]int) {
r.mtx.Lock()
defer r.mtx.Unlock()
tenants := make(map[string]struct{}, len(ruleGroupsCount))
for userID, count := range ruleGroupsCount {
if !r.allowedTenants.IsAllowed(userID) { // if the tenant is disabled just ignore its rule groups
continue
}
tenants[userID] = struct{}{}
r.RuleGroupsInStore.WithLabelValues(userID).Set(float64(count))
}
for userID := range r.tenants {
if _, ok := tenants[userID]; !ok {
r.RuleGroupsInStore.DeleteLabelValues(userID)
}
}
r.tenants = tenants
}
48 changes: 48 additions & 0 deletions pkg/ruler/manager_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
dto "github.com/prometheus/client_model/go"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -595,3 +596,50 @@ func TestRuleEvalMetricsDeletePerUserMetrics(t *testing.T) {
require.Contains(t, mfm[name].String(), "value:\"fake2\"")
}
}

func TestRuleGroupMetrics(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
m := NewRuleGroupMetrics(reg, util.NewAllowedTenants(nil, []string{"fake3"}))
m.UpdateRuleGroupsInStore(map[string]int{
"fake1": 10,
"fake2": 20,
"fake3": 30,
})
gm, err := reg.Gather()
require.NoError(t, err)
mfm, err := util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 2, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake1",
}, float64(10))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[1], map[string]string{
"user": "fake2",
}, float64(20))
m.UpdateRuleGroupsInStore(map[string]int{
"fake2": 30,
})
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Equal(t, 1, len(mfm["cortex_ruler_rule_groups_in_store"].Metric))
requireMetricEqual(t, mfm["cortex_ruler_rule_groups_in_store"].Metric[0], map[string]string{
"user": "fake2",
}, float64(30))
m.UpdateRuleGroupsInStore(make(map[string]int))
gm, err = reg.Gather()
require.NoError(t, err)
mfm, err = util.NewMetricFamilyMap(gm)
require.NoError(t, err)
require.Nil(t, mfm["cortex_ruler_rule_groups_in_store"])
}

func requireMetricEqual(t *testing.T, m *io_prometheus_client.Metric, labels map[string]string, value float64) {
l := m.GetLabel()
require.Equal(t, len(labels), len(l))
for _, pair := range l {
require.Equal(t, labels[*pair.Name], *pair.Value)
}
require.Equal(t, value, *m.Gauge.Value)
}
16 changes: 16 additions & 0 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ type Ruler struct {
rulerSync *prometheus.CounterVec
ruleGroupStoreLoadDuration prometheus.Gauge
ruleGroupSyncDuration prometheus.Gauge
ruleGroupMetrics *RuleGroupMetrics

allowedTenants *util.AllowedTenants

Expand Down Expand Up @@ -313,6 +314,7 @@ func newRuler(cfg Config, manager MultiTenantManager, reg prometheus.Registerer,
Help: "The duration in seconds required to sync and load rule groups from storage.",
}),
}
ruler.ruleGroupMetrics = NewRuleGroupMetrics(reg, ruler.allowedTenants)

if len(cfg.EnabledTenants) > 0 {
level.Info(ruler.logger).Log("msg", "ruler using enabled users", "enabled", strings.Join(cfg.EnabledTenants, ", "))
Expand Down Expand Up @@ -606,7 +608,9 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
if err != nil {
return nil, err
}
ruleGroupCounts := make(map[string]int, len(allRuleGroups))
for userID, groups := range allRuleGroups {
ruleGroupCounts[userID] = len(groups)
disabledRuleGroupsForUser := r.limits.DisabledRuleGroups(userID)
if len(disabledRuleGroupsForUser) == 0 {
continue
Expand All @@ -621,6 +625,7 @@ func (r *Ruler) listRulesNoSharding(ctx context.Context) (map[string]rulespb.Rul
}
allRuleGroups[userID] = filteredGroupsForUser
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return allRuleGroups, nil
}

Expand All @@ -630,13 +635,16 @@ func (r *Ruler) listRulesShardingDefault(ctx context.Context) (map[string]rulesp
return nil, err
}

ruleGroupCounts := make(map[string]int, len(configs))
filteredConfigs := make(map[string]rulespb.RuleGroupList)
for userID, groups := range configs {
ruleGroupCounts[userID] = len(groups)
filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), r.ring, r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) > 0 {
filteredConfigs[userID] = filtered
}
}
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return filteredConfigs, nil
}

Expand Down Expand Up @@ -664,6 +672,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

if len(userRings) == 0 {
r.ruleGroupMetrics.UpdateRuleGroupsInStore(make(map[string]int))
return nil, nil
}

Expand All @@ -675,6 +684,8 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp

mu := sync.Mutex{}
result := map[string]rulespb.RuleGroupList{}
gLock := sync.Mutex{}
ruleGroupCounts := make(map[string]int, len(userRings))

concurrency := loadRulesConcurrency
if len(userRings) < concurrency {
Expand All @@ -690,6 +701,10 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
return errors.Wrapf(err, "failed to fetch rule groups for user %s", userID)
}

gLock.Lock()
ruleGroupCounts[userID] = len(groups)
gLock.Unlock()

filtered := filterRuleGroups(userID, groups, r.limits.DisabledRuleGroups(userID), userRings[userID], r.lifecycler.GetInstanceAddr(), r.logger, r.ringCheckErrors)
if len(filtered) == 0 {
continue
Expand All @@ -704,6 +719,7 @@ func (r *Ruler) listRulesShuffleSharding(ctx context.Context) (map[string]rulesp
}

err = g.Wait()
r.ruleGroupMetrics.UpdateRuleGroupsInStore(ruleGroupCounts)
return result, err
}

Expand Down

0 comments on commit 80f2933

Please sign in to comment.