From bb37d296bed3704f2ad817488769cecbdf02c589 Mon Sep 17 00:00:00 2001 From: Mustafain Ali Khan Date: Thu, 21 Mar 2024 14:01:10 -0700 Subject: [PATCH] Add integration test for keep_firing_for field Signed-off-by: Mustafain Ali Khan --- integration/ruler_test.go | 170 +++++++++++++++++++++++++++++++++++++- 1 file changed, 166 insertions(+), 4 deletions(-) diff --git a/integration/ruler_test.go b/integration/ruler_test.go index cd2b37a29a..8cc8da26ad 100644 --- a/integration/ruler_test.go +++ b/integration/ruler_test.go @@ -8,6 +8,7 @@ import ( "context" "crypto/x509" "crypto/x509/pkix" + "encoding/json" "fmt" "math" "math/rand" @@ -19,10 +20,7 @@ import ( "testing" "time" - "github.com/cortexproject/cortex/pkg/ruler" - - "github.com/cortexproject/cortex/pkg/storage/tsdb" - + v1 "github.com/prometheus/client_golang/api/prometheus/v1" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/rulefmt" @@ -37,6 +35,8 @@ import ( "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/ruler" + "github.com/cortexproject/cortex/pkg/storage/tsdb" ) func TestRulerAPI(t *testing.T) { @@ -1038,6 +1038,168 @@ func TestRulerDisablesRuleGroups(t *testing.T) { }) } +func TestRulerKeepFiring(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Configure the ruler. + flags := mergeFlags( + BlocksStorageFlags(), + RulerFlags(), + map[string]string{ + // Since we're not going to run any rule (our only rule is invalid), we don't need the + // store-gateway to be configured to a valid address. + "-querier.store-gateway-addresses": "localhost:12345", + // Enable the bucket index so we can skip the initial bucket scan. + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + // Evaluate rules often, so that we don't need to wait for metrics to show up. + "-ruler.evaluation-interval": "2s", + "-ruler.poll-interval": "2s", + // No delay + "-ruler.evaluation-delay-duration": "0", + + "-blocks-storage.tsdb.block-ranges-period": "1h", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": "2h", + + // We run single ingester only, no replication. + "-distributor.replication-factor": "1", + + "-querier.max-fetched-chunks-per-query": "50", + }, + ) + + const namespace = "test" + const user = "user" + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler)) + + // Wait until both the distributor and ruler have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user) + require.NoError(t, err) + + expression := "vector(1) > 0" // Alert will fire + groupName := "rule_group_1" + ruleName := "rule_keep_firing" + + require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace)) + + m := ruleGroupMatcher(user, namespace, groupName) + + // Wait until ruler has loaded the group. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{ + RuleNames: []string{ruleName}, + }) + require.NoError(t, err) + require.NotEmpty(t, groups) + require.Equal(t, 1, len(groups[0].Rules)) + alert := parseAlertFromRule(t, groups[0].Rules[0]) + require.Equal(t, float64(10), alert.KeepFiringFor) + require.Equal(t, 1, len(alert.Alerts)) + require.Empty(t, alert.Alerts[0].KeepFiringSince) //Alert expression not resolved, keepFiringSince should be empty + + expression = "vector(1) > 1" // Resolve, should keep firing for set duration + ts := time.Now() + require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace)) + // Wait until rule group has tried to evaluate the rule. + require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics)) + + updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{ + RuleNames: []string{ruleName}, + }) + require.NoError(t, err) + require.NotEmpty(t, updatedGroups) + require.Equal(t, 1, len(updatedGroups[0].Rules)) + + alert = parseAlertFromRule(t, updatedGroups[0].Rules[0]) + require.Equal(t, "firing", alert.State) + require.Equal(t, float64(10), alert.KeepFiringFor) + require.Equal(t, 1, len(alert.Alerts)) + require.NotEmpty(t, alert.Alerts[0].KeepFiringSince) + require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved") + + time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time + updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{ + RuleNames: []string{ruleName}, + }) + require.NoError(t, err) + require.NotEmpty(t, updatedGroups) + require.Equal(t, 1, len(updatedGroups[0].Rules)) + alert = parseAlertFromRule(t, updatedGroups[0].Rules[0]) + require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires +} + +func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule { + responseJson, err := json.Marshal(rules) + require.NoError(t, err) + + alertResp := &alertingRule{} + require.NoError(t, json.Unmarshal(responseJson, alertResp)) + return alertResp +} + +type alertingRule struct { + // State can be "pending", "firing", "inactive". + State string `json:"state"` + Name string `json:"name"` + Query string `json:"query"` + Duration float64 `json:"duration"` + KeepFiringFor float64 `json:"keepFiringFor"` + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + Alerts []*Alert `json:"alerts"` + Health string `json:"health"` + LastError string `json:"lastError"` + Type v1.RuleType `json:"type"` + LastEvaluation time.Time `json:"lastEvaluation"` + EvaluationTime float64 `json:"evaluationTime"` +} + +// Alert has info for an alert. +type Alert struct { + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + State string `json:"state"` + ActiveAt *time.Time `json:"activeAt"` + KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"` + Value string `json:"value"` +} + +func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup { + var recordNode = yaml.Node{} + var exprNode = yaml.Node{} + + recordNode.SetString(ruleName) + exprNode.SetString(expression) + + return rulefmt.RuleGroup{ + Name: groupName, + Interval: 10, + Rules: []rulefmt.RuleNode{{ + Alert: recordNode, + Expr: exprNode, + KeepFiringFor: keepFiring, + }}, + } +} + func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher { return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName)) }