Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix support for keep_firing_for field in alert rules #5823

Merged
merged 3 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 166 additions & 4 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand All @@ -19,10 +20,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ruler"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand All @@ -37,6 +35,8 @@ import (
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestRulerAPI(t *testing.T) {
Expand Down Expand Up @@ -1038,6 +1038,168 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}

func TestRulerKeepFiring(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

"-querier.max-fetched-chunks-per-query": "50",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "vector(1) > 0" // Alert will fire
groupName := "rule_group_1"
ruleName := "rule_keep_firing"

require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))

m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, groups)
require.Equal(t, 1, len(groups[0].Rules))
alert := parseAlertFromRule(t, groups[0].Rules[0])
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.Empty(t, alert.Alerts[0].KeepFiringSince) //Alert expression not resolved, keepFiringSince should be empty

expression = "vector(1) > 1" // Resolve, should keep firing for set duration
ts := time.Now()
require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))

alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, "firing", alert.State)
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.NotEmpty(t, alert.Alerts[0].KeepFiringSince)
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")

time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))
alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
responseJson, err := json.Marshal(rules)
require.NoError(t, err)

alertResp := &alertingRule{}
require.NoError(t, json.Unmarshal(responseJson, alertResp))
return alertResp
}

type alertingRule struct {
// State can be "pending", "firing", "inactive".
State string `json:"state"`
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
KeepFiringFor float64 `json:"keepFiringFor"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*Alert `json:"alerts"`
Health string `json:"health"`
LastError string `json:"lastError"`
Type v1.RuleType `json:"type"`
LastEvaluation time.Time `json:"lastEvaluation"`
EvaluationTime float64 `json:"evaluationTime"`
}

// Alert has info for an alert.
type Alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"`
Value string `json:"value"`
}

func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd love to see we don't duplicate code for this since it is almost the same functionality for ruleGroupWithRule.
But it is not a blocker. We can address later to use builder pattern.

var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
KeepFiringFor: keepFiring,
}},
}
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ type AlertDiscovery struct {

// Alert has info for an alert.
type Alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
Value string `json:"value"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"`
Value string `json:"value"`
}

// RuleDiscovery has info for all rules
Expand Down Expand Up @@ -80,6 +81,7 @@ type alertingRule struct {
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
KeepFiringFor float64 `json:"keepFiringFor"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*Alert `json:"alerts"`
Expand Down Expand Up @@ -211,13 +213,17 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
if g.ActiveRules[i].Rule.Alert != "" {
alerts := make([]*Alert, 0, len(rl.Alerts))
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
alert := &Alert{
Labels: cortexpb.FromLabelAdaptersToLabels(a.Labels),
Annotations: cortexpb.FromLabelAdaptersToLabels(a.Annotations),
State: a.GetState(),
ActiveAt: &a.ActiveAt,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
})
}
if !a.KeepFiringSince.IsZero() {
alert.KeepFiringSince = &a.KeepFiringSince
}
alerts = append(alerts, alert)
}
grp.Rules[i] = alertingRule{
State: rl.GetState(),
Expand All @@ -232,6 +238,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
LastEvaluation: rl.GetEvaluationTimestamp(),
EvaluationTime: rl.GetEvaluationDuration().Seconds(),
Type: v1.RuleTypeAlerting,
KeepFiringFor: rl.Rule.KeepFiringFor.Seconds(),
}
} else {
grp.Rules[i] = recordingRule{
Expand Down Expand Up @@ -296,13 +303,17 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
for _, rl := range g.ActiveRules {
if rl.Rule.Alert != "" {
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
alert := &Alert{
Labels: cortexpb.FromLabelAdaptersToLabels(a.Labels),
Annotations: cortexpb.FromLabelAdaptersToLabels(a.Annotations),
State: a.GetState(),
ActiveAt: &a.ActiveAt,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
})
}
if !a.KeepFiringSince.IsZero() {
alert.KeepFiringSince = &a.KeepFiringSince
}
alerts = append(alerts, alert)
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,15 +829,16 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
alerts := []*AlertStateDesc{}
for _, a := range rule.ActiveAlerts() {
alerts = append(alerts, &AlertStateDesc{
State: a.State.String(),
Labels: cortexpb.FromLabelsToLabelAdapters(a.Labels),
Annotations: cortexpb.FromLabelsToLabelAdapters(a.Annotations),
Value: a.Value,
ActiveAt: a.ActiveAt,
FiredAt: a.FiredAt,
ResolvedAt: a.ResolvedAt,
LastSentAt: a.LastSentAt,
ValidUntil: a.ValidUntil,
State: a.State.String(),
Labels: cortexpb.FromLabelsToLabelAdapters(a.Labels),
Annotations: cortexpb.FromLabelsToLabelAdapters(a.Annotations),
Value: a.Value,
ActiveAt: a.ActiveAt,
FiredAt: a.FiredAt,
ResolvedAt: a.ResolvedAt,
LastSentAt: a.LastSentAt,
ValidUntil: a.ValidUntil,
KeepFiringSince: a.KeepFiringSince,
})
}
ruleDesc = &RuleStateDesc{
Expand Down
Loading
Loading