From 85b384a415cc83baa6372848736627faba487342 Mon Sep 17 00:00:00 2001 From: Anand Rajagopal Date: Mon, 1 May 2023 16:56:50 +0000 Subject: [PATCH] Adding user label to metrics to be able to track these metrics at a user level Signed-off-by: Anand Rajagopal --- CHANGELOG.md | 1 + integration/ruler_test.go | 117 ++++++++++++++++++++++++++++++++++++-- pkg/ruler/compat.go | 21 ++++--- 3 files changed, 125 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39ce61d8f6..7275b72b63 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog ## master / unreleased +* [CHANGE] Ruler: Added user label to `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, and `cortex_ruler_queries_failed_total` metrics. #5312 * [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290 * [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289 * [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request. diff --git a/integration/ruler_test.go b/integration/ruler_test.go index 1dc6ca4bc5..ef830f9ab3 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -8,6 +8,7 @@ import ( "crypto/x509" "crypto/x509/pkix" "fmt" + "github.com/cortexproject/cortex/pkg/storage/tsdb" "math" "net/http" "os" @@ -576,8 +577,8 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.Equal(t, 200, res.StatusCode) } - totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}) - require.NoError(t, err) + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + var totalQueries = []float64{0} // Verify that user-failures don't increase cortex_ruler_queries_failed_total for groupName, expression := range map[string]string{ @@ -601,7 +602,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) // But these failures were not reported as "failed queries" - sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}) + sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher)) require.NoError(t, err) require.Equal(t, float64(0), sum[0]) @@ -612,7 +613,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics)) // Check that cortex_ruler_queries_total went up since last test. - newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}) + newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher)) require.NoError(t, err) require.Greater(t, newTotalQueries[0], totalQueries[0]) @@ -637,7 +638,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) // Still no failures. - sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}) + sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher)) require.NoError(t, err) require.Equal(t, float64(0), sum[0]) @@ -645,7 +646,111 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) { require.NoError(t, s.Stop(ingester)) // We should start getting "real" failures now. - require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"})) + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher))) + }) +} + +func TestRulerMetricsWhenIngesterFails(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + const blockRangePeriod = 2 * time.Second + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "false", + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + "-ruler.poll-interval": "2s", + // No delay + "-ruler.evaluation-delay-duration": "0", + + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + + // Very low limit so that ruler hits it. + "-querier.max-fetched-chunks-per-query": "15", + "-querier.query-store-after": (1 * time.Second).String(), + "-querier.query-ingesters-within": (2 * time.Second).String(), + }, + ) + + const namespace = "test" + const user = "user" + + storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(), + }) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway)) + + // Wait until both the distributor and ruler have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user) + expression := "absent(sum_over_time(metric{}[2s] offset 1h))" + + // Now let's upload a non-failing rule, and make sure that it works. + t.Run("real_error", func(t *testing.T) { + const groupName = "good_rule" + + var ruleEvalCount float64 + ruleGroup := ruleGroupWithRule(groupName, "rule", expression) + ruleGroup.Interval = 2 + require.NoError(t, c.SetRuleGroup(ruleGroup, namespace)) + m := ruleGroupMatcher(user, namespace, groupName) + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + // Wait until rule group has tried to evaluate the rule, and succeeded. + ruleEvalCount++ + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + + // Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_write_requests_failed_total failures. + require.NoError(t, s.Stop(ingester)) + ruleEvalCount++ + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) + + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics)) }) } diff --git a/pkg/ruler/compat.go b/pkg/ruler/compat.go index 70c2164a43..879f3f82e3 100644 --- a/pkg/ruler/compat.go +++ b/pkg/ruler/compat.go @@ -251,23 +251,23 @@ type RulesManager interface { type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory { - totalWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ruler_write_requests_total", Help: "Number of write requests to ingesters.", - }) - failedWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"user"}) + failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ruler_write_requests_failed_total", Help: "Number of failed write requests to ingesters.", - }) + }, []string{"user"}) - totalQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ruler_queries_total", Help: "Number of queries executed by ruler.", - }) - failedQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{ + }, []string{"user"}) + failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_ruler_queries_failed_total", Help: "Number of failed queries by ruler.", - }) + }, []string{"user"}) var rulerQuerySeconds *prometheus.CounterVec if cfg.EnableQueryStats { rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -287,6 +287,11 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi queryTime = rulerQuerySeconds.WithLabelValues(userID) } + failedQueries := failedQueriesVec.WithLabelValues(userID) + totalQueries := totalQueriesVec.WithLabelValues(userID) + totalWrites := totalWritesVec.WithLabelValues(userID) + failedWrites := failedWritesVec.WithLabelValues(userID) + return rules.NewManager(&rules.ManagerOptions{ Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites), Queryable: q,