Skip to content

Commit

Permalink
NETOBSERV-1227 NETOBSERV-1388: Max / P90 / P99 graphs (#412)
Browse files Browse the repository at this point in the history
* min / max / 90 / 99 percentiles

* addressed feedback

* flex donut when single
  • Loading branch information
jpinsonneau authored Dec 11, 2023
1 parent 78be350 commit 1d3b420
Show file tree
Hide file tree
Showing 59 changed files with 2,427 additions and 1,997 deletions.
16 changes: 11 additions & 5 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
)

const (
metricTypeKey = "type"
metricTypeKey = "type"
metricFunctionKey = "function"

aggregateByKey = "aggregateBy"
groupsKey = "groups"
rateIntervalKey = "rateInterval"
Expand Down Expand Up @@ -72,6 +74,10 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if err != nil {
return nil, http.StatusBadRequest, err
}
metricFunction, err := getMetricFunction(params)
if err != nil {
return nil, http.StatusBadRequest, err
}
recordType, err := getRecordType(params)
if err != nil {
return nil, http.StatusBadRequest, err
Expand All @@ -96,7 +102,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
// match any, and multiple filters => run in parallel then aggregate
var queries []string
for _, group := range filterGroups {
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, group, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, errors.New("Can't build query: " + err.Error())
}
Expand All @@ -112,7 +118,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if len(filterGroups) > 0 {
filters = filterGroups[0]
}
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
query, code, err := buildTopologyQuery(cfg, filters, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return nil, code, err
}
Expand Down Expand Up @@ -148,8 +154,8 @@ func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQuer
return out
}

func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, recordType, packetLoss, aggregate, groups)
func buildTopologyQuery(cfg *loki.Config, queryFilters filters.SingleQuery, start, end, limit, rateInterval, step string, metricType constants.MetricType, metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss, aggregate, groups string) (string, int, error) {
qb, err := loki.NewTopologyQuery(cfg, start, end, limit, rateInterval, step, metricType, metricFunction, recordType, packetLoss, aggregate, groups)
if err != nil {
return "", http.StatusBadRequest, err
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/handler/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ func getMetricType(params url.Values) (constants.MetricType, error) {
return "", fmt.Errorf("invalid metric type: %s", mt)
}

func getMetricFunction(params url.Values) (constants.MetricFunction, error) {
mf := params.Get(metricFunctionKey)
if mf == "" {
return "", nil
}
metricFunction := constants.MetricFunction(mf)
if metricFunction == constants.MetricFunctionAvg ||
metricFunction == constants.MetricFunctionMin ||
metricFunction == constants.MetricFunctionMax ||
metricFunction == constants.MetricFunctionP90 ||
metricFunction == constants.MetricFunctionP99 {
return metricFunction, nil
}
return "", fmt.Errorf("invalid metric function: %s", mf)
}

func getRateInterval(params url.Values) (string, error) {
rateInterval := params.Get(rateIntervalKey)
if rateInterval == "" {
Expand Down
19 changes: 14 additions & 5 deletions pkg/loki/flow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,28 @@ func (q *FlowQueryBuilder) appendPktDropCauseFilter(sb *strings.Builder) {
}

func (q *FlowQueryBuilder) appendDNSFilter(sb *strings.Builder) {
// ensure at least one Dns field is specified
// |~`"Dns`
// ensure at least one Dns field is specified except DnsErrno
// |~`"DnsId`|~`"DnsLatencyMs`|~`"DnsFlagsResponseCode"`
sb.WriteString("|~`")
sb.WriteString(`"Dns`)
sb.WriteString(`"DnsId`)
sb.WriteString("`")
sb.WriteString("|~`")
sb.WriteString(`"DnsLatencyMs`)
sb.WriteString("`")
sb.WriteString("|~`")
sb.WriteString(`"DnsFlagsResponseCode"`)
sb.WriteString("`")
}

func (q *FlowQueryBuilder) appendDNSLatencyFilter(sb *strings.Builder) {
// ensure DnsLatencyMs field is specified
// |~`"DnsLatencyMs`
// ensure DnsLatencyMs field is specified and value is not zero
// |~`"DnsLatencyMs`!~`DnsLatencyMs%22:0[,}]`
sb.WriteString("|~`")
sb.WriteString(`"DnsLatencyMs`)
sb.WriteString("`")
sb.WriteString("!~`")
sb.WriteString(`"DnsLatencyMs":0[,}]`)
sb.WriteString("`")
}

func (q *FlowQueryBuilder) appendDNSRCodeFilter(sb *strings.Builder) {
Expand Down
140 changes: 94 additions & 46 deletions pkg/loki/topology_query.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package loki

import (
"fmt"
"strings"

"github.com/netobserv/network-observability-console-plugin/pkg/utils"
Expand All @@ -17,13 +18,14 @@ type Topology struct {
step string
function string
dataField string
fields string
labels string
skipEmptyDropState bool
skipEmptyDropCause bool
skipNonDNS bool
skipEmptyDNSLatency bool
skipEmptyDNSRCode bool
skipEmptyRTT bool
scalar string
factor string
}

Expand All @@ -33,71 +35,49 @@ type TopologyQueryBuilder struct {
}

func NewTopologyQuery(cfg *Config, start, end, limit, rateInterval, step string, metricType constants.MetricType,
recordType constants.RecordType, packetLoss constants.PacketLoss,
metricFunction constants.MetricFunction, recordType constants.RecordType, packetLoss constants.PacketLoss,
aggregate, groups string) (*TopologyQueryBuilder, error) {
l := limit
if len(l) == 0 {
l = topologyDefaultLimit
}

fields := getFields(aggregate, groups)
var f, t string
factor := ""
switch metricType {
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
f = "count_over_time"
case constants.MetricTypeDroppedPackets:
f = "rate"
t = "PktDropPackets"
case constants.MetricTypePackets:
f = "rate"
t = "Packets"
case constants.MetricTypeDroppedBytes:
f = "rate"
t = "PktDropBytes"
case constants.MetricTypeDNSLatencies:
f = "avg_over_time"
t = "DnsLatencyMs"
case constants.MetricTypeBytes:
f = "rate"
t = "Bytes"
case constants.MetricTypeFlowRTT:
f = "avg_over_time"
t = "TimeFlowRttNs"
factor = "/1000000" // nanoseconds to miliseconds
}
labels := getLabels(aggregate, groups)
field, factor := getFieldsAndFactor(metricType)
f, scalar := getFunctionWithScalar(metricType, metricFunction)

var d bool
var dedup bool
var rt constants.RecordType
if utils.Contains(constants.AnyConnectionType, string(recordType)) {
d = false
dedup = false
rt = "endConnection"
} else {
d = true
dedup = true
rt = "flowLog"
}

return &TopologyQueryBuilder{
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, d, rt, packetLoss),
FlowQueryBuilder: NewFlowQueryBuilder(cfg, start, end, limit, dedup, rt, packetLoss),
topology: &Topology{
rateInterval: rateInterval,
step: step,
limit: l,
function: f,
dataField: t,
fields: fields,
dataField: field,
factor: factor,
labels: labels,
skipEmptyDropState: aggregate == "droppedState",
skipEmptyDropCause: aggregate == "droppedCause",
skipNonDNS: metricType == constants.MetricTypeCountDNS,
skipEmptyDNSLatency: metricType == constants.MetricTypeDNSLatencies,
skipEmptyDNSRCode: aggregate == "dnsRCode",
skipEmptyRTT: metricType == constants.MetricTypeFlowRTT,
factor: factor,
scalar: scalar,
},
}, nil
}

func getFields(aggregate, groups string) string {
func getLabels(aggregate, groups string) string {
var fields []string
switch aggregate {
case "app":
Expand Down Expand Up @@ -141,10 +121,60 @@ func getFields(aggregate, groups string) string {
return strings.Join(fields[:], ",")
}

func getFieldsAndFactor(metricType constants.MetricType) (string, string) {
switch metricType {
case constants.MetricTypeDroppedPackets:
return "PktDropPackets", ""
case constants.MetricTypePackets:
return "Packets", ""
case constants.MetricTypeDroppedBytes:
return "PktDropBytes", ""
case constants.MetricTypeBytes:
return "Bytes", ""
case constants.MetricTypeDNSLatencies:
return "DnsLatencyMs", ""
case constants.MetricTypeFlowRTT:
return "TimeFlowRttNs", "/1000000" // nanoseconds to miliseconds
case constants.MetricTypeCount, constants.MetricTypeCountDNS:
return "", ""
default:
panic(fmt.Sprint("wrong metricType for fields and factor provided", metricType))
}
}

func getFunctionWithScalar(metricType constants.MetricType, metricFunction constants.MetricFunction) (string, string) {
switch metricFunction {
case constants.MetricFunctionMax:
return "max_over_time", ""
case constants.MetricFunctionMin:
return "min_over_time", ""
case constants.MetricFunctionAvg:
return "avg_over_time", ""
case constants.MetricFunctionP90:
return "quantile_over_time", "0.9"
case constants.MetricFunctionP99:
return "quantile_over_time", "0.99"
default:
switch metricType {
case constants.MetricTypeBytes,
constants.MetricTypePackets,
constants.MetricTypeDroppedBytes,
constants.MetricTypeDroppedPackets:
return "rate", ""
case constants.MetricTypeCount, constants.MetricTypeCountDNS, constants.MetricTypeFlowRTT, constants.MetricTypeDNSLatencies:
return "count_over_time", ""
default:
panic(fmt.Sprint("wrong metricType for function with scalar provided", metricType))
}
}
}

func (q *TopologyQueryBuilder) Build() string {
sumBy := q.topology.function == "rate" || q.topology.function == "count_over_time"

// Build topology query like:
// /<url path>?query=
// topk(
// topk | bottomk(
// <k>,
// <sum | avg> by(<aggregations>) (
// <function>(
Expand All @@ -155,19 +185,28 @@ func (q *TopologyQueryBuilder) Build() string {
// )
// &<query params>&step=<step>
sb := q.createStringBuilderURL()
sb.WriteString("topk(")
if q.topology.function == "min_over_time" {
sb.WriteString("bottomk")
} else {
sb.WriteString("topk")
}
sb.WriteRune('(')
sb.WriteString(q.topology.limit)
sb.WriteRune(',')
if q.topology.function == "avg_over_time" {
sb.WriteString("avg")
} else {
sb.WriteString("sum")

if sumBy {
sb.WriteString("sum by(")
sb.WriteString(q.topology.labels)
sb.WriteRune(')')
}
sb.WriteString(" by(")
sb.WriteString(q.topology.fields)
sb.WriteString(") (")

sb.WriteRune('(')
sb.WriteString(q.topology.function)
sb.WriteString("(")
if len(q.topology.scalar) > 0 {
sb.WriteString(q.topology.scalar)
sb.WriteRune(',')
}
q.appendLabels(sb)
q.appendLineFilters(sb)

Expand Down Expand Up @@ -202,10 +241,19 @@ func (q *TopologyQueryBuilder) Build() string {
sb.WriteString(q.topology.rateInterval)
}
sb.WriteString("])")

if !sumBy {
sb.WriteString(" by(")
sb.WriteString(q.topology.labels)
sb.WriteRune(')')
}
sb.WriteRune(')')

if len(q.topology.factor) > 0 {
sb.WriteString(q.topology.factor)
}
sb.WriteString("))")
sb.WriteRune(')')

q.appendQueryParams(sb)
sb.WriteString("&step=")
sb.WriteString(q.topology.step)
Expand Down
6 changes: 3 additions & 3 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func TestLokiConfigurationForTopology(t *testing.T) {
req2 := lokiMock.Calls[1].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query"), req2.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection=~"^0$|^2$"}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector",FlowDirection="1",DstK8S_OwnerName=""}|~` + "`" + `Duplicate":false` + "`" + `|json|unwrap Bytes|__error__=""[1m])))`,
}
// We don't predict the order so sort both actual and expected
sort.Strings(queries)
Expand Down Expand Up @@ -434,7 +434,7 @@ func TestLokiConfigurationForTableHistogram(t *testing.T) {
req1 := lokiMock.Calls[0].Arguments[1].(*http.Request)
query := req1.URL.Query().Get("query")
expected :=
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName) (count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(count_over_time({app="netobserv-flowcollector"}|~` + "`" + `Duplicate":false` + "`" + `|json[30s])))`
assert.Equal(t, expected, query)

// without any multi-tenancy header
Expand Down
Loading

0 comments on commit 1d3b420

Please sign in to comment.