Skip to content

Commit

Permalink
feat(monitor): support sorting reduced result (#21633)
Browse files Browse the repository at this point in the history
  • Loading branch information
zexi authored Nov 20, 2024
1 parent 17364c8 commit 7096c5b
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 15 deletions.
21 changes: 15 additions & 6 deletions pkg/apis/monitor/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,28 @@ func (s AlertSetting) IsZero() bool {
}

type AlertCondition struct {
Type string `json:"type"`
Query AlertQuery `json:"query"`
Reducer Condition `json:"reducer"`
Evaluator Condition `json:"evaluator"`
Operator string `json:"operator"`
Type string `json:"type"`
Query AlertQuery `json:"query"`
Reducer Condition `json:"reducer"`
ReducerOrder ResultReducerOrder `json:"reducer_order"`
Evaluator Condition `json:"evaluator"`
Operator string `json:"operator"`
}

type ResultReducerOrder string

const (
RESULT_REDUCER_ORDER_ASC ResultReducerOrder = "asc"
RESULT_REDUCER_ORDER_DESC ResultReducerOrder = "desc"
)

type AlertQuery struct {
Model MetricQuery `json:"model"`
From string `json:"from"`
To string `json:"to"`
// 查询结果 reducer,执行 p95 这些操作
ResultReducer *Condition `json:"result_reducer"`
ResultReducer *Condition `json:"result_reducer"`
ResultReducerOrder ResultReducerOrder `json:"result_reducer_order"`
}

type AlertCreateInput struct {
Expand Down
19 changes: 13 additions & 6 deletions pkg/mcclient/modules/monitor/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,11 @@ type AlertQuery struct {
policy string
resultFormat string

selects *AlertQuerySelects
where *AlertQueryWhere
groupBy *AlertQueryGroupBy
resultReducer *monitor.Condition
selects *AlertQuerySelects
where *AlertQueryWhere
groupBy *AlertQueryGroupBy
resultReducer *monitor.Condition
resultReducerOrder monitor.ResultReducerOrder
}

func NewAlertQuery(database string, measurement string) *AlertQuery {
Expand Down Expand Up @@ -404,6 +405,11 @@ func (q *AlertQuery) Reducer(rType string, params []float64) *AlertQuery {
return q
}

func (q *AlertQuery) ReducerOrder(rType monitor.ResultReducerOrder) *AlertQuery {
q.resultReducerOrder = rType
return q
}

type AlertQuerySelects struct {
parts []*AlertQuerySelect
}
Expand Down Expand Up @@ -675,8 +681,9 @@ func (input *MetricQueryInput) ToQueryData() *monitor.MetricQueryInput {
}
data.MetricQuery = []*monitor.AlertQuery{
{
Model: input.query.ToMetricQuery(),
ResultReducer: input.query.resultReducer,
Model: input.query.ToMetricQuery(),
ResultReducer: input.query.resultReducer,
ResultReducerOrder: input.query.resultReducerOrder,
},
}

Expand Down
57 changes: 54 additions & 3 deletions pkg/monitor/alerting/conditions/metricquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package conditions

import (
gocontext "context"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -71,13 +72,54 @@ func NewMetricQueryCondition(models []*monitor.AlertCondition) (*MetricQueryCond
return nil, errors.Wrapf(err, "NewAlertReducer")
}
qc.Reducer = reducer
qc.ReducerOrder = model.ReducerOrder
qc.setResType()
cond.QueryCons = append(cond.QueryCons, *qc)
}

return cond, nil
}

type queryReducedResultSorter struct {
r *queryResult
order monitor.ResultReducerOrder
}

func newQueryReducedResultSorter(
r *queryResult,
order monitor.ResultReducerOrder) *queryReducedResultSorter {
return &queryReducedResultSorter{
r: r,
order: order,
}
}

func (q *queryReducedResultSorter) Len() int {
return len(q.r.reducedResult.Result)
}

func (q *queryReducedResultSorter) Less(i, j int) bool {
vi, vj := q.r.reducedResult.Result[i], q.r.reducedResult.Result[j]
if q.order == monitor.RESULT_REDUCER_ORDER_ASC {
if vi < vj {
return true
}
return false
}
if vi > vj {
return true
}
return false
}

func (q *queryReducedResultSorter) Swap(i, j int) {
q.r.reducedResult.Result[i], q.r.reducedResult.Result[j] = q.r.reducedResult.Result[j], q.r.reducedResult.Result[i]
q.r.series[i], q.r.series[j] = q.r.series[j], q.r.series[i]
if len(q.r.metas) > i && len(q.r.metas) > j {
q.r.metas[i], q.r.metas[j] = q.r.metas[j], q.r.metas[i]
}
}

func (query *MetricQueryCondition) ExecuteQuery(userCred mcclient.TokenCredential, scope string, skipCheckSeries bool) (*monitor.MetricsQueryResult, error) {
firstCond := query.QueryCons[0]
timeRange := tsdb.NewTimeRange(firstCond.Query.From, firstCond.Query.To)
Expand All @@ -97,6 +139,15 @@ func (query *MetricQueryCondition) ExecuteQuery(userCred mcclient.TokenCredentia
return nil, errors.Wrapf(err, "Can't find default datasource")
}

sortMetrics := func(metrics *queryResult, order monitor.ResultReducerOrder) *queryResult {
if metrics.reducedResult != nil && len(metrics.reducedResult.Result) > 0 {
s := newQueryReducedResultSorter(metrics, order)
sort.Sort(s)
return s.r
}
return metrics
}

queryTSDB := func() (*queryResult, error) {
startTime := time.Now()

Expand All @@ -122,7 +173,7 @@ func (query *MetricQueryCondition) ExecuteQuery(userCred mcclient.TokenCredentia
}
}
}
return queryResult, nil
return sortMetrics(queryResult, firstCond.ReducerOrder), nil
}

noCheck := query.noCheckSeries(skipCheckSeries)
Expand All @@ -131,12 +182,12 @@ func (query *MetricQueryCondition) ExecuteQuery(userCred mcclient.TokenCredentia
if err != nil {
return nil, errors.Wrap(err, "queryTSDB")
}
metrics := monitor.MetricsQueryResult{
metrics := &monitor.MetricsQueryResult{
Series: qr.series,
Metas: qr.metas,
ReducedResult: qr.reducedResult,
}
return &metrics, nil
return metrics, nil
}

qInfluxdbCh := make(chan bool, 0)
Expand Down
2 changes: 2 additions & 0 deletions pkg/monitor/alerting/conditions/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type QueryCondition struct {
Index int
Query AlertQuery
Reducer Reducer
ReducerOrder monitor.ResultReducerOrder
Evaluator AlertEvaluator
Operator string
HandleRequest tsdb.HandleRequestFunc
Expand Down Expand Up @@ -507,6 +508,7 @@ func newQueryCondition(model *monitor.AlertCondition, index int) (*QueryConditio
return nil, fmt.Errorf("error in condition %v: %v", index, err)
}
cond.Reducer = reducer
cond.ReducerOrder = model.ReducerOrder
evaluator, err := NewAlertEvaluator(&model.Evaluator)
if err != nil {
return nil, fmt.Errorf("error in condition %v: %v", index, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/monitor/models/unifiedmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func doQuery(userCred mcclient.TokenCredential, query monitor.MetricQueryInput)
}
if q.ResultReducer != nil {
condition.Reducer = *q.ResultReducer
condition.ReducerOrder = q.ResultReducerOrder
}
conds = append(conds, &condition)
}
Expand Down

0 comments on commit 7096c5b

Please sign in to comment.