Skip to content

Commit

Permalink
Merge pull request #700 from rancher/slo-with-endpoints
Browse files Browse the repository at this point in the history
SLOs support attaching alerting endpoints to them
  • Loading branch information
alexandreLamarre authored Oct 25, 2022
2 parents c9136c4 + dfd3f03 commit ffa614c
Show file tree
Hide file tree
Showing 12 changed files with 540 additions and 323 deletions.
35 changes: 35 additions & 0 deletions pkg/alerting/backend/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"fmt"
"time"

alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"

"github.com/go-openapi/strfmt"
)

Expand Down Expand Up @@ -203,3 +206,35 @@ func (p *PostableSilence) Must() error {
}
return nil
}

func ConvertEndpointIdsToRoutingNode(
endpointList *alertingv1.AlertEndpointList,
req *alertingv1.AttachedEndpoints,
conditionId string,

) (*alertingv1.RoutingNode, error) {
routingNode := &alertingv1.RoutingNode{
ConditionId: &corev1.Reference{Id: conditionId},
FullAttachedEndpoints: &alertingv1.FullAttachedEndpoints{
Items: []*alertingv1.FullAttachedEndpoint{},
InitialDelay: req.InitialDelay,
RepeatInterval: req.RepeatInterval,
ThrottlingDuration: req.ThrottlingDuration,
Details: req.Details,
},
}
for _, endpointItem := range endpointList.Items {
for _, expectedEndpoint := range req.Items {
if endpointItem.Id.Id == expectedEndpoint.EndpointId {
routingNode.FullAttachedEndpoints.Items = append(
routingNode.FullAttachedEndpoints.Items,
&alertingv1.FullAttachedEndpoint{
EndpointId: endpointItem.Id.Id,
AlertEndpoint: endpointItem.Endpoint,
Details: req.Details,
})
}
}
}
return routingNode, nil
}
4 changes: 4 additions & 0 deletions pkg/alerting/metrics/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ func (a *AlertingRule) Build(id string) (*rulefmt.Rule, error) {
return promRule, nil
}

func WithSloId(sloId, alertType, suffix string) string {
return fmt.Sprintf("%s-%s%s", sloId, alertType, suffix)
}

// Pretty simple durations for prometheus.
func timeDurationToModelDuration(t time.Duration) model.Duration {
return model.Duration(t)
Expand Down
27 changes: 26 additions & 1 deletion pkg/apis/alerting/v1/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func (a *AlertTypeDetails) ListTemplates() []string {
}
panic("No templates returned for alert type")
}

func (a *AlertConditionSystem) ListTemplates() []string {
return []string{
"agentId",
Expand Down Expand Up @@ -114,3 +113,29 @@ func (r *RoutingRelationships) InvolvedConditionsForEndpoint(endpointId string)
}
return res
}

func ShouldCreateRoutingNode(new, old *AttachedEndpoints) bool {
// only create if we go from having no endpoints to having some
if new == nil || len(new.Items) == 0 {
return false
} else if (old == nil || len(old.Items) == 0) && new != nil && len(new.Items) > 0 {
return true
}
return false // should update pre-existing routing-node
}

func ShouldUpdateRoutingNode(new, old *AttachedEndpoints) bool {
// only update if both are specified
if new != nil && len(new.Items) > 0 && old != nil && len(old.Items) > 0 {
return true
}
return false
}

func ShouldDeleteRoutingNode(new, old *AttachedEndpoints) bool {
// only delete if we go from having endpoints to having none
if (new == nil || len(new.Items) > 0) && old != nil && len(old.Items) > 0 {
return true
}
return false
}
2 changes: 1 addition & 1 deletion plugins/alerting/pkg/alerting/api_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (p *Plugin) DeleteAlertCondition(ctx context.Context, ref *corev1.Reference
return nil, err
}
lg.Debugf("Deleted condition %s must clean up its existing endpoint implementation", ref.Id)
if !(existing.AttachedEndpoints == nil || len(existing.AttachedEndpoints.Items) == 0) {
if alertingv1.ShouldDeleteRoutingNode(nil, existing.AttachedEndpoints) {
_, err = p.DeleteConditionRoutingNode(ctx, ref)
if err != nil {
return nil, err
Expand Down
591 changes: 299 additions & 292 deletions plugins/slo/pkg/apis/slo/slo.pb.go

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion plugins/slo/pkg/apis/slo/slo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import "google/api/http.proto";
import "google/api/annotations.proto";
import "github.com/rancher/opni/pkg/apis/core/v1/core.proto";
import "google/rpc/status.proto";
import "github.com/rancher/opni/pkg/apis/alerting/v1/alerting.proto";

package slo;

service SLO {
Expand Down Expand Up @@ -176,7 +178,7 @@ message ServiceLevelObjective {
google.protobuf.Duration budgetingInterval = 10; // budgeting interval should be between 1m and 60m
Target target = 11;
repeated Label labels = 12;
repeated Alert alertTargets = 13;
alerting.AttachedEndpoints attachedEndpoints = 13;
}

message CreateSLORequest {
Expand Down
5 changes: 5 additions & 0 deletions plugins/slo/pkg/apis/slo/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (slo *ServiceLevelObjective) Validate() error {
if interval.AsDuration() < time.Minute || interval.AsDuration() > time.Hour {
return validation.Error("budgetingInterval must be between 1 minute and 1 hour")
}
if slo.AttachedEndpoints != nil && len(slo.AttachedEndpoints.Items) > 0 {
if err := slo.AttachedEndpoints.Validate(); err != nil {
return err
}
}
return nil
}

Expand Down
65 changes: 64 additions & 1 deletion plugins/slo/pkg/slo/cortex_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"fmt"
"time"

"github.com/rancher/opni/pkg/alerting/backend"
alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"

"emperror.dev/errors"

"github.com/prometheus/common/model"
Expand All @@ -20,6 +24,66 @@ import (

var instantMaskDisabled = true

func createRoutingNode(p *Plugin, ctx context.Context, req *alertingv1.AttachedEndpoints, alertId string) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, 20*time.Second)
defer cancelFunc()
alertEndpointClient, err := p.alertEndpointClient.GetContext(ctxTimeout)
if err != nil {
return err
}
eList, err := alertEndpointClient.ListAlertEndpoints(ctx, &alertingv1.ListAlertEndpointsRequest{})
if err != nil {
return err
}
routingNode, err := backend.ConvertEndpointIdsToRoutingNode(eList, req, alertId)
if err != nil {
p.logger.Error(err)
return err
}
_, err = alertEndpointClient.CreateConditionRoutingNode(ctx, routingNode)
if err != nil {
p.logger.Error(err)
}
return nil
}

func updateRoutingNode(p *Plugin, ctx context.Context, req *alertingv1.AttachedEndpoints, alertId string) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, 20*time.Second)
defer cancelFunc()
alertEndpointClient, err := p.alertEndpointClient.GetContext(ctxTimeout)
if err != nil {
return err
}
eList, err := alertEndpointClient.ListAlertEndpoints(ctx, &alertingv1.ListAlertEndpointsRequest{})
if err != nil {
return err
}
routingNode, err := backend.ConvertEndpointIdsToRoutingNode(eList, req, alertId)
if err != nil {
p.logger.Error(err)
return err
}
_, err = alertEndpointClient.UpdateConditionRoutingNode(ctx, routingNode)
if err != nil {
p.logger.Error(err)
}
return nil
}

func deleteRoutingNode(p *Plugin, ctx context.Context, alertId string) error {
ctxTimeout, cancelFunc := context.WithTimeout(ctx, 20*time.Second)
defer cancelFunc()
alertEndpointClient, err := p.alertEndpointClient.GetContext(ctxTimeout)
if err != nil {
return err
}
_, err = alertEndpointClient.DeleteConditionRoutingNode(ctx, &corev1.Reference{Id: alertId})
if err != nil {
p.logger.Error(err)
}
return nil
}

func createGrafanaSLOMask(p *Plugin, ctx context.Context, clusterId string, ruleId string) error {
p.logger.With("sloId", ruleId, "clusterId", clusterId).Debugf("creating grafana mask")
if !instantMaskDisabled {
Expand Down Expand Up @@ -79,7 +143,6 @@ func tryApplyThenDeleteCortexRules(p *Plugin, lg *zap.SugaredLogger, ctx context
lg.Errorf("creating grafana mask failed %s", err)
errArr = append(errArr, err)
}

}

return errors.Combine(errArr...)
Expand Down
79 changes: 68 additions & 11 deletions plugins/slo/pkg/slo/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
promql "github.com/cortexproject/cortex/pkg/configs/legacy_promql"
"github.com/google/uuid"
prommodel "github.com/prometheus/common/model"
alertingv1 "github.com/rancher/opni/pkg/apis/alerting/v1"
corev1 "github.com/rancher/opni/pkg/apis/core/v1"
managementv1 "github.com/rancher/opni/pkg/apis/management/v1"
"github.com/rancher/opni/pkg/util"
Expand All @@ -35,7 +36,19 @@ func (s SLOMonitoring) Create() (*corev1.Reference, error) {
toApply := []RuleGroupYAMLv2{rrecording, rmetadata, ralerting}
ruleId := slo.GetId()
err := tryApplyThenDeleteCortexRules(s.p, s.p.logger, s.ctx, req.GetSlo().GetClusterId(), &ruleId, toApply)
return &corev1.Reference{Id: slo.GetId()}, err
if err != nil {
return nil, err
}
for _, rule := range ralerting.Rules {
ae := req.Slo.AttachedEndpoints
if rule.Alert != "" && alertingv1.ShouldCreateRoutingNode(ae, nil) {
err := createRoutingNode(s.p, s.ctx, ae, rule.Alert)
if err != nil {
s.p.logger.Errorf("creating routing node failed %s", err)
}
}
}
return &corev1.Reference{Id: slo.GetId()}, nil
}

func (s SLOMonitoring) Update(existing *sloapi.SLOData) (*sloapi.SLOData, error) {
Expand All @@ -54,24 +67,68 @@ func (s SLOMonitoring) Update(existing *sloapi.SLOData) (*sloapi.SLOData, error)
err))
}
}
for _, rule := range ralerting.Rules {
newAE := existing.SLO.AttachedEndpoints
oldAe := incomingSLO.SLO.AttachedEndpoints
if rule.Alert != "" {
if alertingv1.ShouldCreateRoutingNode(newAE, oldAe) {
err := createRoutingNode(s.p, s.ctx, newAE, rule.Alert)
if err != nil {
s.p.logger.Errorf("creating routing node failed %s", err)
}
} else if alertingv1.ShouldUpdateRoutingNode(newAE, oldAe) {
err := updateRoutingNode(s.p, s.ctx, newAE, rule.Alert)
if err != nil {
s.p.logger.Errorf("updating routing node failed %s", err)
}
} else if alertingv1.ShouldDeleteRoutingNode(newAE, oldAe) {
err := deleteRoutingNode(s.p, s.ctx, rule.Alert)
if err != nil {
s.p.logger.Errorf("deleting routing node failed %s", err)
}
}
}
}
return incomingSLO, err
}

func (s SLOMonitoring) Delete(existing *sloapi.SLOData) error {
id, clusterId := existing.Id, existing.SLO.ClusterId
//err := deleteCortexSLORules(s.p, id, clusterId, s.ctx, s.lg)
errArr := []error{}
toApply := []string{id + RecordingRuleSuffix, id + MetadataRuleSuffix, id + AlertRuleSuffix}
slo := SLODataToStruct(existing)
rrecording, rmetadata, ralerting := slo.ConstructCortexRules(nil)
toApply := []RuleGroupYAMLv2{rrecording, rmetadata, ralerting}
for _, ruleName := range toApply {
err := deleteCortexSLORules(
s.p,
s.p.logger,
s.ctx,
clusterId,
ruleName,
)
if err != nil {
errArr = append(errArr, err)
for _, rule := range ruleName.Rules {
if rule.Alert != "" {
err := deleteRoutingNode(s.p, s.ctx, rule.Alert)
if err != nil {
s.p.logger.Errorf("deleting routing node failed %s", err)
}
err = deleteCortexSLORules(
s.p,
s.p.logger,
s.ctx,
clusterId,
rule.Alert,
)
if err != nil {
errArr = append(errArr, err)
}
}
if rule.Record != "" {
err := deleteCortexSLORules(
s.p,
s.p.logger,
s.ctx,
clusterId,
rule.Record,
)
if err != nil {
errArr = append(errArr, err)
}
}
}
}
err := createGrafanaSLOMask(s.p, s.ctx, clusterId, id)
Expand Down
25 changes: 15 additions & 10 deletions plugins/slo/pkg/slo/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package slo

import (
"context"
"github.com/rancher/opni/plugins/alerting/pkg/apis/server/endpoint"

"go.uber.org/zap"

Expand All @@ -20,11 +21,14 @@ import (
type Plugin struct {
sloapi.UnsafeSLOServer
system.UnimplementedSystemPluginClient
ctx context.Context
logger *zap.SugaredLogger
storage future.Future[StorageAPIs]
mgmtClient future.Future[managementv1.ManagementClient]
adminClient future.Future[cortexadmin.CortexAdminClient]

ctx context.Context
logger *zap.SugaredLogger

storage future.Future[StorageAPIs]
mgmtClient future.Future[managementv1.ManagementClient]
adminClient future.Future[cortexadmin.CortexAdminClient]
alertEndpointClient future.Future[endpoint.AlertEndpointsClient]
}

type StorageAPIs struct {
Expand All @@ -35,11 +39,12 @@ type StorageAPIs struct {

func NewPlugin(ctx context.Context) *Plugin {
return &Plugin{
ctx: ctx,
logger: logger.NewPluginLogger().Named("slo"),
storage: future.New[StorageAPIs](),
mgmtClient: future.New[managementv1.ManagementClient](),
adminClient: future.New[cortexadmin.CortexAdminClient](),
ctx: ctx,
logger: logger.NewPluginLogger().Named("slo"),
storage: future.New[StorageAPIs](),
mgmtClient: future.New[managementv1.ManagementClient](),
adminClient: future.New[cortexadmin.CortexAdminClient](),
alertEndpointClient: future.New[endpoint.AlertEndpointsClient](),
}
}

Expand Down
Loading

0 comments on commit ffa614c

Please sign in to comment.