Skip to content

Commit

Permalink
Implement timeline api, fix order of operations
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandreLamarre committed Oct 25, 2022
1 parent 3d42379 commit 7c73fde
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 39 deletions.
1 change: 1 addition & 0 deletions pkg/apis/alerting/v1/alerting.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/apis/alerting/v1/alerting.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message ListAlertConditionSystem {
repeated string agentIds = 1;
}

// Requires kube state metrics, otherwise the list of choices will be empty
message AlertConditionKubeState {
string clusterId = 1;
// must be one of the listed kube objects
Expand Down
12 changes: 12 additions & 0 deletions plugins/alerting/pkg/alerting/alertstorage/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,15 @@ type AgentIncidentStep struct {
health.StatusUpdate `json:",inline"`
AlertFiring bool `json:"alertFiring"`
}

func (a AgentIncident) isEquivalentState(other AgentIncidentStep) bool {
if len(a.Steps) == 0 {
return false
}
cur := a.Steps[len(a.Steps)-1]
if cur.StatusUpdate.Status.Connected == other.StatusUpdate.Status.Connected && cur.AlertFiring == other.AlertFiring {
return true
}
// different enough that we should add a new step
return false
}
62 changes: 57 additions & 5 deletions plugins/alerting/pkg/alerting/alertstorage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package alertstorage
import (
"context"
"encoding/json"
"errors"
"path"
"sync"
"time"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/rancher/opni/pkg/storage"
"github.com/rancher/opni/pkg/util/future"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"google.golang.org/protobuf/types/known/timestamppb"
)

const conditionPrefix = "/alerting/conditions"
Expand Down Expand Up @@ -282,6 +285,49 @@ func (s *StorageNode) GetAgentIncidentTracker(ctx context.Context, conditionId s
return &st, nil
}

func (s *StorageNode) GetActiveWindowsFromAgentIncidentTracker(
ctx context.Context,
conditionId string,
start,
end *timestamppb.Timestamp,
) ([]*alertingv1.ActiveWindow, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
incident, err := s.GetAgentIncidentTracker(ctxTimeout, conditionId)
if err != nil {
return nil, err
}
res := []*alertingv1.ActiveWindow{}
if len(incident.Steps) == 0 {
return res, nil
}
risingEdge := true
for _, step := range incident.Steps {
if step.Status.Timestamp == nil {
continue
}
if step.AlertFiring && risingEdge {
res = append(res, &alertingv1.ActiveWindow{
Start: step.Status.Timestamp,
End: timestamppb.Now(), // overwritten if it is found later
Type: alertingv1.TimelineType_Timeline_Alerting,
})
risingEdge = false
} else if !step.AlertFiring && !risingEdge {
res[len(res)-1].End = step.Status.Timestamp
risingEdge = true
}
}
pruneIdx := 0
for _, window := range res {
if window.End.AsTime().Before(start.AsTime()) {
pruneIdx++
}
}
res = slices.Delete(res, 0, pruneIdx)
return res, nil
}

func (s *StorageNode) ListAgentIncidentTrackers(ctx context.Context) ([]AgentIncident, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
Expand Down Expand Up @@ -328,7 +374,7 @@ func (s *StorageNode) AddToAgentIncidentTracker(ctx context.Context, conditionId
}
var agentEntry nats.KeyValueEntry
entry, err := sts.Get(conditionId)
if err != nil {
if errors.Is(err, nats.ErrKeyNotFound) {
err := s.CreateAgentIncidentTracker(ctx, conditionId, updateValue)
if err != nil {
return err
Expand All @@ -338,16 +384,22 @@ func (s *StorageNode) AddToAgentIncidentTracker(ctx context.Context, conditionId
return err
}
agentEntry = entry
} else if err != nil {
return err
} else {
agentEntry = entry
}
var st AgentIncident
err = json.Unmarshal(agentEntry.Value(), &st)
var prev AgentIncident
err = json.Unmarshal(agentEntry.Value(), &prev)
if err != nil {
return err
}
st.Steps = append(st.Steps, &updateValue)
data, err := json.Marshal(st)
if prev.isEquivalentState(updateValue) { // prevent filling up K,V with duplicate states
return nil
}

prev.Steps = append(prev.Steps, &updateValue)
data, err := json.Marshal(prev)
if err != nil {
return err
}
Expand Down
126 changes: 100 additions & 26 deletions plugins/alerting/pkg/alerting/api_conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"sync"
"time"

"github.com/prometheus/alertmanager/api/v2/models"
"github.com/prometheus/common/model"
"github.com/rancher/opni/pkg/alerting/backend"
"github.com/rancher/opni/pkg/metrics/unmarshal"
"github.com/rancher/opni/plugins/metrics/pkg/apis/cortexadmin"

"github.com/rancher/opni/pkg/util"
"github.com/rancher/opni/pkg/validation"
"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/google/uuid"
Expand Down Expand Up @@ -382,34 +388,102 @@ func (p *Plugin) Timeline(ctx context.Context, req *alertingv1.TimelineRequest)
resp := &alertingv1.TimelineResponse{
Items: make(map[string]*alertingv1.ActiveWindows),
}
for idx := range conditions {
resp.Items[ids[idx]] = &alertingv1.ActiveWindows{
requiresCortex := false
for idx, id := range ids {
if k := conditions[idx].GetAlertType().GetKubeState(); k != nil {
requiresCortex = true
}
resp.Items[id] = &alertingv1.ActiveWindows{
Windows: make([]*alertingv1.ActiveWindow, 0),
}
numWindows := rand.Intn(10) + 1
for i := 0; i < numWindows; i++ {
startTime := time.Now()
startTime.Add(-time.Duration(rand.Int31n(24)+1) * time.Hour)
endTime := time.Now()
endTime.Add(-time.Duration(rand.Int31n(24)+1) * time.Hour)
if endTime.UnixNano() < startTime.UnixNano() {
temp := startTime
startTime = endTime
endTime = temp
}
typeRandom := rand.Intn(4)
var t alertingv1.TimelineType
if typeRandom == 0 {
t = alertingv1.TimelineType_Timeline_Silenced
} else {
t = alertingv1.TimelineType_Timeline_Alerting
}
resp.Items[ids[idx]].Windows = append(resp.Items[ids[idx]].Windows, &alertingv1.ActiveWindow{
Start: timestamppb.New(startTime),
End: timestamppb.New(endTime),
Type: t,
})
}

var cortexAdminClient cortexadmin.CortexAdminClient
if requiresCortex {
ctxCa, cancel := context.WithCancel(ctx)
defer cancel()
adminClient, err := p.adminClient.GetContext(ctxCa)
if err != nil {
return nil, util.StatusError(codes.Code(code.Code_FAILED_PRECONDITION))
}
cortexAdminClient = adminClient
}

start := timestamppb.New(time.Now().Add(-req.LookbackWindow.AsDuration()))
end := timestamppb.Now()
cortexStep := durationpb.New(req.LookbackWindow.AsDuration() / 500)
var wg sync.WaitGroup
var addMu sync.Mutex
for idx := range conditions {
idx := idx // capture in closure
wg.Add(1)
go func() {
defer wg.Done()
condition := conditions[idx]
if s := condition.GetAlertType().GetSystem(); s != nil {
// check system tracker
activeWindows, err := p.storageNode.GetActiveWindowsFromAgentIncidentTracker(ctx, ids[idx], start, end)
if err != nil {
p.Logger.Errorf("failed to get active windows from agent incident tracker : %s", err)
return
}
addMu.Lock()
resp.Items[ids[idx]] = &alertingv1.ActiveWindows{
Windows: activeWindows,
}
addMu.Unlock()
} else if k := condition.GetAlertType().GetKubeState(); k != nil {
// do the raw quer
qr, err := cortexAdminClient.QueryRange(ctx, &cortexadmin.QueryRangeRequest{
Start: start,
End: end,
Step: cortexStep,
})
if err != nil {
p.Logger.Errorf("failed to query range : %s", err)
return
}
rawBytes := qr.Data
qres, err := unmarshal.UnmarshalPrometheusResponse(rawBytes)
if err != nil {
p.Logger.Errorf("failed to unmarshal prometheus response : %s", err)
return
}
dataMatrix, err := qres.GetMatrix()
if err != nil {
p.Logger.Errorf("failed to get matrix : %s", err)
return
}
isRising := true
isFiring := func(v model.SampleValue) bool {
return v > 0
}
activeWindows := alertingv1.ActiveWindows{
Windows: make([]*alertingv1.ActiveWindow, 0),
}
for _, row := range *dataMatrix {
for _, rowValue := range row.Values {
ts := time.Unix(rowValue.Timestamp.Unix(), 0)
if !isFiring(rowValue.Value) && isRising {
activeWindows.Windows = append(activeWindows.Windows, &alertingv1.ActiveWindow{
Start: timestamppb.New(ts),
End: end,
Type: alertingv1.TimelineType_Timeline_Alerting,
})
isRising = false
} else if isFiring(rowValue.Value) && !isRising {
activeWindows.Windows[len(activeWindows.Windows)].End = timestamppb.New(ts)
isRising = true
}
}
}
addMu.Lock()
resp.Items[ids[idx]] = &activeWindows
addMu.Unlock()
}
}()
}
wg.Wait()

return resp, nil
}
Loading

0 comments on commit 7c73fde

Please sign in to comment.