Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add user label to ruler query and write metrics #5312

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [CHANGE] Ruler: Added user label to `cortex_ruler_write_requests_total`, `cortex_ruler_write_requests_failed_total`, `cortex_ruler_queries_total`, and `cortex_ruler_queries_failed_total` metrics. #5312
* [CHANGE] Alertmanager: Validating new fields on the PagerDuty AM config. #5290
* [CHANGE] Ingester: Creating label `native-histogram-sample` on the `cortex_discarded_samples_total` to keep track of discarded native histogram samples. #5289
* [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request.
Expand Down
118 changes: 112 additions & 6 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand Down Expand Up @@ -576,8 +578,8 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.Equal(t, 200, res.StatusCode)
}

totalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
require.NoError(t, err)
matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
var totalQueries = []float64{0}

// Verify that user-failures don't increase cortex_ruler_queries_failed_total
for groupName, expression := range map[string]string{
Expand All @@ -601,7 +603,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// But these failures were not reported as "failed queries"
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

Expand All @@ -612,7 +614,7 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_group_rules"}, e2e.SkipMissingMetrics))

// Check that cortex_ruler_queries_total went up since last test.
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"})
newTotalQueries, err := ruler.SumMetrics([]string{"cortex_ruler_queries_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Greater(t, newTotalQueries[0], totalQueries[0])

Expand All @@ -637,15 +639,119 @@ func TestRulerMetricsForInvalidQueries(t *testing.T) {
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Still no failures.
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"})
sum, err := ruler.SumMetrics([]string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher))
require.NoError(t, err)
require.Equal(t, float64(0), sum[0])

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_queries_failed_total failures.
require.NoError(t, s.Stop(ingester))

// We should start getting "real" failures now.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher)))
})
}

func TestRulerMetricsWhenIngesterFails(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

const blockRangePeriod = 2 * time.Second
// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),

// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "false",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

// Very low limit so that ruler hits it.
"-querier.max-fetched-chunks-per-query": "15",
"-querier.query-store-after": (1 * time.Second).String(),
"-querier.query-ingesters-within": (2 * time.Second).String(),
},
)

const namespace = "test"
const user = "user"

storeGateway := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")

flags = mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
})

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler, storeGateway))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(1024), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

matcher := labels.MustNewMatcher(labels.MatchEqual, "user", user)
expression := "absent(sum_over_time(metric{}[2s] offset 1h))"

// Now let's upload a non-failing rule, and make sure that it works.
t.Run("real_error", func(t *testing.T) {
const groupName = "good_rule"

var ruleEvalCount float64
ruleGroup := ruleGroupWithRule(groupName, "rule", expression)
ruleGroup.Interval = 2
require.NoError(t, c.SetRuleGroup(ruleGroup, namespace))
m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

// Wait until rule group has tried to evaluate the rule, and succeeded.
ruleEvalCount++
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_prometheus_rule_evaluation_failures_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

// Wait until the TSDB head is compacted and shipped to the storage.
// The shipped block contains the 1st series, while the 2ns series in the head.
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))

// Now let's stop ingester, and recheck metrics. This should increase cortex_ruler_write_requests_failed_total failures.
require.NoError(t, s.Stop(ingester))
ruleEvalCount++
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(ruleEvalCount), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_ruler_queries_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_ruler_write_requests_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))

require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_ruler_write_requests_failed_total"}, e2e.WithLabelMatchers(matcher), e2e.WaitMissingMetrics))
})
}

Expand Down
21 changes: 13 additions & 8 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,23 @@ type RulesManager interface {
type ManagerFactory func(ctx context.Context, userID string, notifier *notifier.Manager, logger log.Logger, reg prometheus.Registerer) RulesManager

func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engine v1.QueryEngine, overrides RulesLimits, reg prometheus.Registerer) ManagerFactory {
totalWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_total",
Help: "Number of write requests to ingesters.",
})
failedWrites := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedWritesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_write_requests_failed_total",
Help: "Number of failed write requests to ingesters.",
})
}, []string{"user"})

totalQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
totalQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_total",
Help: "Number of queries executed by ruler.",
})
failedQueries := promauto.With(reg).NewCounter(prometheus.CounterOpts{
}, []string{"user"})
failedQueriesVec := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ruler_queries_failed_total",
Help: "Number of failed queries by ruler.",
})
}, []string{"user"})
var rulerQuerySeconds *prometheus.CounterVec
if cfg.EnableQueryStats {
rulerQuerySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand All @@ -287,6 +287,11 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
queryTime = rulerQuerySeconds.WithLabelValues(userID)
}

failedQueries := failedQueriesVec.WithLabelValues(userID)
totalQueries := totalQueriesVec.WithLabelValues(userID)
totalWrites := totalWritesVec.WithLabelValues(userID)
failedWrites := failedWritesVec.WithLabelValues(userID)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
Expand Down