Skip to content

Commit

Permalink
NodeDirection / IfDirection
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Mar 12, 2024
1 parent 50ea5cc commit 219c84c
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 154 deletions.
43 changes: 28 additions & 15 deletions config/sample-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ loki:
- DstK8S_Type
- K8S_FlowLayer
- FlowDirection
# - Duplicate
# - _RecordType
# - K8S_ClusterName
# - SrcK8S_Zone
Expand Down Expand Up @@ -443,23 +444,30 @@ frontend:
default: false
width: 10
- id: FlowDirection
name: Direction
tooltip: The direction of the flow observed at the Node observation point.
name: Node Direction
tooltip: The interpreted direction of the flow observed at the Node observation point.
field: FlowDirection
filter: direction
filter: flowDirection
default: false
width: 10
- id: Interface
name: Interface
tooltip: The network interface of the Flow.
field: Interface
filter: interface
- id: Interfaces
name: Interfaces
tooltip: The network interfaces of the Flow.
field: Interfaces
filter: interfaces
default: false
width: 10
- id: IfDirections
name: Interface Directions
tooltip: The directions of the Flow observed at the network interface observation point.
field: IfDirections
filter: ifdirections
default: false
width: 10
- id: FlowDirInts
name: Interfaces and Directions
tooltip: Pairs of network interface and direction of the Flow observed at the Node observation point.
field: FlowDirection
tooltip: Pairs of network interface and direction of the Flow observed at the network interface observation point.
field: Interfaces
default: false
width: 15
- id: Bytes
Expand Down Expand Up @@ -819,21 +827,26 @@ frontend:
name: ICMP code
component: number
hint: Specify an ICMP code value as integer number.
- id: direction
name: Direction
- id: flowDirection
name: Node Direction
component: autocomplete
placeholder: 'E.g: Ingress, Egress, Inner'
hint: Specify the direction of the Flow observed at the Node observation point.
hint: Specify the interpreted direction of the Flow observed at the Node observation point.
- id: flow_layer
name: Flow layer
component: text
placeholder: 'Either infra or app'
hint: Specify the layer of Flow.
- id: interface
name: Network interface
- id: interfaces
name: Network interfaces
component: text
placeholder: 'E.g: br-ex, ovn-k8s-mp0'
hint: Specify a network interface.
- id: ifdirections
name: Interface Directions
component: autocomplete
placeholder: 'E.g: Ingress, Egress'
hint: Specify the direction of the Flow observed at the network interface observation point.
- id: id
name: Conversation Id
component: text
Expand Down
7 changes: 3 additions & 4 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if err != nil {
return nil, http.StatusBadRequest, err
}
if shouldMergeReporters(metricType, cfg.Deduper) {
if shouldMergeReporters(metricType) {
filterGroups = expandReportersMergeQueries(filterGroups)
}

Expand Down Expand Up @@ -135,9 +135,8 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
return qr, http.StatusOK, nil
}

func shouldMergeReporters(metricType constants.MetricType, deduper loki.Deduper) bool {
return !deduper.Merge && deduper.Mark && (metricType == constants.MetricTypeBytes ||
metricType == constants.MetricTypePackets)
func shouldMergeReporters(metricType constants.MetricType) bool {
return metricType == constants.MetricTypeBytes || metricType == constants.MetricTypePackets
}

func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQueries {
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ func (f *labelFilter) writeInto(sb *strings.Builder) {
sb.WriteString(`ip("`)
sb.WriteString(f.value)
sb.WriteString(`")`)
case typeRegexContains, typeRegexArrayContains:
case typeRegexContains:
// match any case
sb.WriteString("`(?i).*")
sb.WriteString(f.value)
sb.WriteString(".*`")
case typeRegexArrayContains:
// match any case and ensure we stay inside the array
sb.WriteString("`(?i)[^]]*")
sb.WriteString(f.value)
sb.WriteString("[^]]*`")
default:
panic(fmt.Sprint("wrong filter value type", int(f.valueType)))
}
Expand Down Expand Up @@ -312,9 +318,9 @@ func (f *lineFilter) writeInto(sb *strings.Builder) {
sb.WriteString(`.*"`)
// for array, we ensure it starts by [ and ends by ]
case typeRegexArrayContains:
sb.WriteString(`\[(?i).*`)
sb.WriteString(`\[(?i)[^]]*`)
sb.WriteString(valueReplacer.Replace(v.value))
sb.WriteString(`.*]`)
sb.WriteString(`[^]]*]`)
}
}
}
Expand Down
12 changes: 1 addition & 11 deletions pkg/loki/flow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,12 @@ func (q *FlowQueryBuilder) addLineFilters(key string, values []string, not bool,
return
}

var isArray bool
if q.config.Deduper.Merge {
switch key {
case "FlowDirection", "Interface":
key = fmt.Sprintf("%ss", key)
isArray = true
default:
isArray = false
}
}

lf := lineFilter{
key: key,
not: not,
moreThan: moreThan,
}
isArray := fields.IsArray(key)
isNumeric := fields.IsNumeric(key)
emptyMatches := false
for _, value := range values {
Expand Down
13 changes: 13 additions & 0 deletions pkg/model/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
DSCP = "Dscp"
PktDropBytes = "PktDropBytes"
FlowDirection = "FlowDirection"
Interfaces = "Interfaces"
IfDirections = "IfDirections"
DNSID = "DnsId"
DNSLatency = "DnsLatencyMs"
DNSErrNo = "DnsErrno"
Expand Down Expand Up @@ -80,3 +82,14 @@ func IsIP(f string) bool {
return false
}
}

func IsArray(v string) bool {
switch v {
case
IfDirections,
Interfaces:
return true
default:
return false
}
}
54 changes: 0 additions & 54 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,60 +396,6 @@ func TestLokiConfigurationForTopology(t *testing.T) {
assert.NotNil(t, qr.Result)
}

func TestLokiConfigurationForDeduperMerge(t *testing.T) {
// GIVEN a Loki service
lokiMock := httpMock{}
lokiMock.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
_, _ = args.Get(0).(http.ResponseWriter).Write([]byte(`{"status":"","data":{"resultType":"matrix","result":[]}}`))
})
lokiSvc := httptest.NewServer(&lokiMock)
defer lokiSvc.Close()
authM := &authMock{}
authM.MockGranted()
lokiURL, err := url.Parse(lokiSvc.URL)
require.NoError(t, err)

// THAT is accessed behind the NOO console plugin backend
backendRoutes := setupRoutes(&Config{
Loki: loki.Config{
URL: lokiURL,
Timeout: time.Second,
Labels: utils.GetMapInterface([]string{fields.SrcNamespace, fields.DstNamespace, fields.SrcOwnerName, fields.DstOwnerName}),
Deduper: loki.Deduper{
Mark: false,
Merge: true,
},
},
}, authM)
backendSvc := httptest.NewServer(backendRoutes)
defer backendSvc.Close()

// WHEN the Loki flows endpoint is queried in the backend
resp, err := backendSvc.Client().Get(backendSvc.URL + "/api/loki/flow/metrics")
require.NoError(t, err)

// THEN the query has been properly forwarded to Loki
// Two queries for dedup
assert.Len(t, lokiMock.Calls, 1)
req1 := lokiMock.Calls[0].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector"}|json|unwrap Bytes|__error__=""[1m])))`,
}
assert.Equal(t, expected, queries)

// without any multi-tenancy header
assert.Empty(t, req1.Header.Get("X-Scope-OrgID"))

// AND the response is sent back to the client
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
var qr model.AggregatedQueryResponse
err = json.Unmarshal(body, &qr)
require.NoError(t, err)
assert.NotNil(t, qr.Result)
}

func TestLokiConfigurationForTableHistogram(t *testing.T) {
// GIVEN a Loki service
lokiMock := httpMock{}
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions web/locales/en/plugin__netobserv-plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
"Examples": "Examples",
"ICMP type provided but protocol is {{proto}}": "ICMP type provided but protocol is {{proto}}",
"ICMP code provided but protocol is {{proto}}": "ICMP code provided but protocol is {{proto}}",
"Invalid data provided. Check JSON for details.": "Invalid data provided. Check JSON for details.",
"dropped": "dropped",
"dropped by": "dropped by",
"sent": "sent",
Expand Down
22 changes: 6 additions & 16 deletions web/src/api/ipfix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface Labels {
SrcK8S_Type?: string;
/** Kind of the destination matched Kubernetes object, such as Pod name, Service name, etc. */
DstK8S_Type?: string;
/** Flow direction from the node observation point, only when using eBPF deduper "JustMark" */
/** Flow direction from the node observation point*/
FlowDirection?: FlowDirection;
/** Type of record: 'flowLog' for regular flow logs, or 'allConnections',
* 'newConnection', 'heartbeat', 'endConnection' for conversation tracking */
Expand All @@ -57,7 +57,7 @@ export enum FlowDirection {
Inner = '2'
}

export const getFlowDirectionDisplayString = (value: FlowDirection, t: TFunction) => {
export const getDirectionDisplayString = (value: FlowDirection, t: TFunction) => {
return value === FlowDirection.Ingress
? t('Ingress')
: value === FlowDirection.Egress
Expand All @@ -67,11 +67,7 @@ export const getFlowDirectionDisplayString = (value: FlowDirection, t: TFunction
: t('n/a');
};

export const getFlowDirection = (flow: Record): FlowDirection => {
return String(flow.labels.FlowDirection || flow.fields.FlowDirection!) as FlowDirection;
};

export enum InterfaceDirection {
export enum IfDirection {
/** Incoming traffic, from the network interface observation point */
Ingress = '0',
/** Outgoing traffic, from the network interface observation point */
Expand Down Expand Up @@ -115,16 +111,10 @@ export interface Fields {
K8S_ClusterName?: string;
/** L4 protocol */
Proto: number;
/** Flow direction of the first flow captured, only when using eBPF deduper 'merge' mode */
FlowDirection?: FlowDirection;
/** Flow direction array, only when using eBPF deduper 'merge' mode */
FlowDirections?: number[];
/** Network interface */
Interface?: string;
/** Network interface array, only when using eBPF deduper 'merge' mode */
/** Network interface array */
Interfaces?: string[];
/** Flow direction from the network interface observation point */
IfDirection?: InterfaceDirection;
/** Flow direction array from the network interface observation point */
IfDirections?: IfDirection[];
/** Logical OR combination of unique TCP flags comprised in the flow, as per RFC-9293, with additional custom flags to represent the following per-packet combinations: SYN+ACK (0x100), FIN+ACK (0x200) and RST+ACK (0x400). */
Flags?: number;
/** Number of packets */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ describe('<RecordPanel />', () => {
type: 'flowLog',
canSwitchTypes: false,
allowPktDrops: false,
deduperMerge: false,
setFilters: jest.fn(),
setRange: jest.fn(),
setType: jest.fn(),
Expand Down
41 changes: 29 additions & 12 deletions web/src/components/netflow-record/record-field.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { FilterIcon, GlobeAmericasIcon, TimesIcon, ToggleOffIcon, ToggleOnIcon }
import * as React from 'react';
import { useTranslation } from 'react-i18next';
import { Link } from 'react-router-dom';
import { FlowDirection, getFlowDirection, getFlowDirectionDisplayString, Record } from '../../api/ipfix';
import { FlowDirection, getDirectionDisplayString, Record } from '../../api/ipfix';
import { Column, ColumnsId, getFullColumnName } from '../../utils/columns';
import { dateFormatter, getFormattedDate, timeMSFormatter, utcDateTimeFormatter } from '../../utils/datetime';
import { DNS_CODE_NAMES, DNS_ERRORS_VALUES, getDNSErrorDescription, getDNSRcodeDescription } from '../../utils/dns';
Expand Down Expand Up @@ -485,21 +485,43 @@ export const RecordField: React.FC<{
}
return singleContainer(child);
}
case ColumnsId.flowdir: {
return singleContainer(simpleTextWithTooltip(getFlowDirectionDisplayString(String(value) as FlowDirection, t)));
case ColumnsId.nodedir:
case ColumnsId.ifdirs: {
if (Array.isArray(value)) {
return nthContainer(
value.map(dir => simpleTextWithTooltip(getDirectionDisplayString(String(dir) as FlowDirection, t))),
true,
false,
false
);
}
return singleContainer(simpleTextWithTooltip(getDirectionDisplayString(String(value) as FlowDirection, t)));
}
case ColumnsId.interfaces: {
if (Array.isArray(value)) {
return nthContainer(
value.map(iName => simpleTextWithTooltip(String(iName))),
true,
false,
false
);
}
return singleContainer(simpleTextWithTooltip(String(value)));
}
case ColumnsId.flowdirints: {
if (
flow.fields.Interfaces &&
flow.fields.FlowDirections &&
flow.fields.Interfaces.length === flow.fields.FlowDirections.length
flow.fields.IfDirections &&
Array.isArray(flow.fields.Interfaces) &&
Array.isArray(flow.fields.IfDirections) &&
flow.fields.Interfaces.length === flow.fields.IfDirections.length
) {
return nthContainer(
flow.fields.Interfaces.map((iName, i) =>
sideBySideContainer(
simpleTextWithTooltip(iName),
simpleTextWithTooltip(
getFlowDirectionDisplayString(String(flow.fields.FlowDirections![i]) as FlowDirection, t)
getDirectionDisplayString(String(flow.fields.IfDirections![i]) as FlowDirection, t)
)
)
),
Expand All @@ -508,12 +530,7 @@ export const RecordField: React.FC<{
false
);
} else {
return singleContainer(
sideBySideContainer(
simpleTextWithTooltip(flow.fields.Interface),
simpleTextWithTooltip(getFlowDirectionDisplayString(getFlowDirection(flow), t))
)
);
return singleContainer(emptyText(t('Invalid data provided. Check JSON for details.')));
}
}
case ColumnsId.packets:
Expand Down
Loading

0 comments on commit 219c84c

Please sign in to comment.