Skip to content

Commit

Permalink
Adding user label to metrics to be able to track these metrics at a u…
Browse files Browse the repository at this point in the history
…ser level (#5312)

Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
  • Loading branch information
rajagopalanand authored May 4, 2023
1 parent 1b6968f commit 36d88c0
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [CHANGE] Ruler: Added user label to `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, and `cortex_ruler_queries_failed_total` metrics. #5312
* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290
* [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
Expand Down
118 changes: 112 additions & 6 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand Down Expand Up @@ -576,8 +578,8 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.Equal(t, 200, res.StatusCode)
}

totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
var totalQueries = []float64{0}

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
Expand All @@ -601,7 +603,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// But these failures were not reported as "failed queries"
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

Expand All @@ -612,7 +614,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

Expand All @@ -637,15 +639,119 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Still no failures.
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
require.NoError(t, s.Stop(ingester))

// We should start getting "real" failures now.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher)))
})
}

func TestRulerMetricsWhenIngesterFails(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

const blockRangePeriod = 2 * time.Second
// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "15",
"-querier.query-store-after": (1 * time.Second).String(),
"-querier.query-ingesters-within": (2 * time.Second).String(),
},
)

const namespace = "test"
const user = "user"

storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
})

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
expression := "absent(sum_over_time(metric{}[2s] offset 1h))"

// Now let's upload a non-failing rule, and make sure that it works.
t.Run("real_error", func(t *testing.T) {
const groupName = "good_rule"

var ruleEvalCount float64
ruleGroup := ruleGroupWithRule(groupName, "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule, and succeeded.
ruleEvalCount++
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

// Wait until the TSDB head is compacted and shipped to the storage.
// The shipped block contains the 1st series, while the 2ns series in the head.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_write_requests_failed_total failures.
require.NoError(t, s.Stop(ingester))
ruleEvalCount++
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
})
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,23 @@ type RulesManager interface {
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory {
totalWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
})
failedWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
})
}, []string{"user"})

totalQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
})
failedQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
})
}, []string{"user"})
var rulerQuerySeconds *prometheus.CounterVec
if cfg.EnableQueryStats {
rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand All @@ -287,6 +287,11 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
queryTime = rulerQuerySeconds.WithLabelValues(userID)
}

failedQueries := failedQueriesVec.WithLabelValues(userID)
totalQueries := totalQueriesVec.WithLabelValues(userID)
totalWrites := totalWritesVec.WithLabelValues(userID)
failedWrites := failedWritesVec.WithLabelValues(userID)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
Expand Down

0 comments on commit 36d88c0

Please sign in to comment.