Skip to content

Commit

Permalink
Fix support for keep_firing_for field in alert rules (#5823)
Browse files Browse the repository at this point in the history
* Support keep_firing_for field for alert rules

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>

* Include keepFiringFor and keepFiringSince in API response

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>

* Add integration test for keep_firing_for field

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>

---------

Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
  • Loading branch information
mustafain117 authored Mar 23, 2024
1 parent 4b75a5c commit 5a7ba2f
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 98 deletions.
170 changes: 166 additions & 4 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"crypto/x509"
"crypto/x509/pkix"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand All @@ -19,10 +20,7 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ruler"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
Expand All @@ -37,6 +35,8 @@ import (
"github.com/cortexproject/cortex/integration/e2e"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/ruler"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestRulerAPI(t *testing.T) {
Expand Down Expand Up @@ -1038,6 +1038,168 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
})
}

func TestRulerKeepFiring(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, bucketName, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
flags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
map[string]string{
// Since we're not going to run any rule (our only rule is invalid), we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
// Evaluate rules often, so that we don't need to wait for metrics to show up.
"-ruler.evaluation-interval": "2s",
"-ruler.poll-interval": "2s",
// No delay
"-ruler.evaluation-delay-duration": "0",

"-blocks-storage.tsdb.block-ranges-period": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "2h",

// We run single ingester only, no replication.
"-distributor.replication-factor": "1",

"-querier.max-fetched-chunks-per-query": "50",
},
)

const namespace = "test"
const user = "user"

distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
ruler := e2ecortex.NewRuler("ruler", consul.NetworkHTTPEndpoint(), flags, "")
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(distributor, ingester, ruler))

// Wait until both the distributor and ruler have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, ruler.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", ruler.HTTPEndpoint(), user)
require.NoError(t, err)

expression := "vector(1) > 0" // Alert will fire
groupName := "rule_group_1"
ruleName := "rule_keep_firing"

require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))

m := ruleGroupMatcher(user, namespace, groupName)

// Wait until ruler has loaded the group.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_prometheus_rule_group_rules"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(1), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

groups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, groups)
require.Equal(t, 1, len(groups[0].Rules))
alert := parseAlertFromRule(t, groups[0].Rules[0])
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.Empty(t, alert.Alerts[0].KeepFiringSince) //Alert expression not resolved, keepFiringSince should be empty

expression = "vector(1) > 1" // Resolve, should keep firing for set duration
ts := time.Now()
require.NoError(t, c.SetRuleGroup(alertRuleWithKeepFiringFor(groupName, ruleName, expression, model.Duration(10*time.Second)), namespace))
// Wait until rule group has tried to evaluate the rule.
require.NoError(t, ruler.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(2), []string{"cortex_prometheus_rule_evaluations_total"}, e2e.WithLabelMatchers(m), e2e.WaitMissingMetrics))

updatedGroups, err := c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))

alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, "firing", alert.State)
require.Equal(t, float64(10), alert.KeepFiringFor)
require.Equal(t, 1, len(alert.Alerts))
require.NotEmpty(t, alert.Alerts[0].KeepFiringSince)
require.Greater(t, alert.Alerts[0].KeepFiringSince.UnixNano(), ts.UnixNano(), "KeepFiringSince value should be after expression is resolved")

time.Sleep(10 * time.Second) // Sleep beyond keepFiringFor time
updatedGroups, err = c.GetPrometheusRules(e2ecortex.RuleFilter{
RuleNames: []string{ruleName},
})
require.NoError(t, err)
require.NotEmpty(t, updatedGroups)
require.Equal(t, 1, len(updatedGroups[0].Rules))
alert = parseAlertFromRule(t, updatedGroups[0].Rules[0])
require.Equal(t, 0, len(alert.Alerts)) // alert should be resolved once keepFiringFor time expires
}

func parseAlertFromRule(t *testing.T, rules interface{}) *alertingRule {
responseJson, err := json.Marshal(rules)
require.NoError(t, err)

alertResp := &alertingRule{}
require.NoError(t, json.Unmarshal(responseJson, alertResp))
return alertResp
}

type alertingRule struct {
// State can be "pending", "firing", "inactive".
State string `json:"state"`
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
KeepFiringFor float64 `json:"keepFiringFor"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*Alert `json:"alerts"`
Health string `json:"health"`
LastError string `json:"lastError"`
Type v1.RuleType `json:"type"`
LastEvaluation time.Time `json:"lastEvaluation"`
EvaluationTime float64 `json:"evaluationTime"`
}

// Alert has info for an alert.
type Alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"`
Value string `json:"value"`
}

func alertRuleWithKeepFiringFor(groupName string, ruleName string, expression string, keepFiring model.Duration) rulefmt.RuleGroup {
var recordNode = yaml.Node{}
var exprNode = yaml.Node{}

recordNode.SetString(ruleName)
exprNode.SetString(expression)

return rulefmt.RuleGroup{
Name: groupName,
Interval: 10,
Rules: []rulefmt.RuleNode{{
Alert: recordNode,
Expr: exprNode,
KeepFiringFor: keepFiring,
}},
}
}

func ruleGroupMatcher(user, namespace, groupName string) *labels.Matcher {
return labels.MustNewMatcher(labels.MatchEqual, "rule_group", fmt.Sprintf("/rules/%s/%s;%s", user, namespace, groupName))
}
Expand Down
29 changes: 20 additions & 9 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ type AlertDiscovery struct {

// Alert has info for an alert.
type Alert struct {
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
Value string `json:"value"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
State string `json:"state"`
ActiveAt *time.Time `json:"activeAt"`
KeepFiringSince *time.Time `json:"keepFiringSince,omitempty"`
Value string `json:"value"`
}

// RuleDiscovery has info for all rules
Expand Down Expand Up @@ -80,6 +81,7 @@ type alertingRule struct {
Name string `json:"name"`
Query string `json:"query"`
Duration float64 `json:"duration"`
KeepFiringFor float64 `json:"keepFiringFor"`
Labels labels.Labels `json:"labels"`
Annotations labels.Labels `json:"annotations"`
Alerts []*Alert `json:"alerts"`
Expand Down Expand Up @@ -211,13 +213,17 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
if g.ActiveRules[i].Rule.Alert != "" {
alerts := make([]*Alert, 0, len(rl.Alerts))
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
alert := &Alert{
Labels: cortexpb.FromLabelAdaptersToLabels(a.Labels),
Annotations: cortexpb.FromLabelAdaptersToLabels(a.Annotations),
State: a.GetState(),
ActiveAt: &a.ActiveAt,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
})
}
if !a.KeepFiringSince.IsZero() {
alert.KeepFiringSince = &a.KeepFiringSince
}
alerts = append(alerts, alert)
}
grp.Rules[i] = alertingRule{
State: rl.GetState(),
Expand All @@ -232,6 +238,7 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {
LastEvaluation: rl.GetEvaluationTimestamp(),
EvaluationTime: rl.GetEvaluationDuration().Seconds(),
Type: v1.RuleTypeAlerting,
KeepFiringFor: rl.Rule.KeepFiringFor.Seconds(),
}
} else {
grp.Rules[i] = recordingRule{
Expand Down Expand Up @@ -296,13 +303,17 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {
for _, rl := range g.ActiveRules {
if rl.Rule.Alert != "" {
for _, a := range rl.Alerts {
alerts = append(alerts, &Alert{
alert := &Alert{
Labels: cortexpb.FromLabelAdaptersToLabels(a.Labels),
Annotations: cortexpb.FromLabelAdaptersToLabels(a.Annotations),
State: a.GetState(),
ActiveAt: &a.ActiveAt,
Value: strconv.FormatFloat(a.Value, 'e', -1, 64),
})
}
if !a.KeepFiringSince.IsZero() {
alert.KeepFiringSince = &a.KeepFiringSince
}
alerts = append(alerts, alert)
}
}
}
Expand Down
19 changes: 10 additions & 9 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,15 +829,16 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
alerts := []*AlertStateDesc{}
for _, a := range rule.ActiveAlerts() {
alerts = append(alerts, &AlertStateDesc{
State: a.State.String(),
Labels: cortexpb.FromLabelsToLabelAdapters(a.Labels),
Annotations: cortexpb.FromLabelsToLabelAdapters(a.Annotations),
Value: a.Value,
ActiveAt: a.ActiveAt,
FiredAt: a.FiredAt,
ResolvedAt: a.ResolvedAt,
LastSentAt: a.LastSentAt,
ValidUntil: a.ValidUntil,
State: a.State.String(),
Labels: cortexpb.FromLabelsToLabelAdapters(a.Labels),
Annotations: cortexpb.FromLabelsToLabelAdapters(a.Annotations),
Value: a.Value,
ActiveAt: a.ActiveAt,
FiredAt: a.FiredAt,
ResolvedAt: a.ResolvedAt,
LastSentAt: a.LastSentAt,
ValidUntil: a.ValidUntil,
KeepFiringSince: a.KeepFiringSince,
})
}
ruleDesc = &RuleStateDesc{
Expand Down
Loading

0 comments on commit 5a7ba2f

Please sign in to comment.