Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1466 NodeDirections / IfDirections #482

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions config/sample-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ loki:
- DstK8S_Type
- K8S_FlowLayer
- FlowDirection
# - Duplicate
# - _RecordType
# - K8S_ClusterName
# - SrcK8S_Zone
Expand Down Expand Up @@ -443,23 +444,30 @@ frontend:
default: false
width: 10
- id: FlowDirection
name: Direction
tooltip: The direction of the flow observed at the Node observation point.
name: Node Direction
tooltip: The interpreted direction of the flow observed at the Node observation point.
field: FlowDirection
filter: direction
filter: flowDirection
default: false
width: 10
- id: Interface
name: Interface
tooltip: The network interface of the Flow.
field: Interface
filter: interface
- id: Interfaces
name: Interfaces
tooltip: The network interfaces of the Flow.
field: Interfaces
filter: interfaces
default: false
width: 10
- id: IfDirections
name: Interface Directions
tooltip: The directions of the Flow observed at the network interface observation point.
field: IfDirections
filter: ifdirections
default: false
width: 10
- id: FlowDirInts
name: Interfaces and Directions
tooltip: Pairs of network interface and direction of the Flow observed at the Node observation point.
field: FlowDirection
tooltip: Pairs of network interface and direction of the Flow observed at the network interface observation point.
field: Interfaces
default: false
width: 15
- id: Bytes
Expand Down Expand Up @@ -819,21 +827,26 @@ frontend:
name: ICMP code
component: number
hint: Specify an ICMP code value as integer number.
- id: direction
name: Direction
- id: flowDirection
name: Node Direction
component: autocomplete
placeholder: 'E.g: Ingress, Egress, Inner'
hint: Specify the direction of the Flow observed at the Node observation point.
hint: Specify the interpreted direction of the Flow observed at the Node observation point.
- id: flow_layer
name: Flow layer
component: text
placeholder: 'Either infra or app'
hint: Specify the layer of Flow.
- id: interface
name: Network interface
- id: interfaces
name: Network interfaces
component: text
placeholder: 'E.g: br-ex, ovn-k8s-mp0'
hint: Specify a network interface.
- id: ifdirections
name: Interface Directions
component: autocomplete
placeholder: 'E.g: Ingress, Egress'
hint: Specify the direction of the Flow observed at the network interface observation point.
- id: id
name: Conversation Id
component: text
Expand Down
7 changes: 3 additions & 4 deletions pkg/handler/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
if err != nil {
return nil, http.StatusBadRequest, err
}
if shouldMergeReporters(metricType, cfg.Deduper) {
if shouldMergeReporters(metricType) {
filterGroups = expandReportersMergeQueries(filterGroups)
}

Expand Down Expand Up @@ -135,9 +135,8 @@ func getTopologyFlows(cfg *loki.Config, client httpclient.Caller, params url.Val
return qr, http.StatusOK, nil
}

func shouldMergeReporters(metricType constants.MetricType, deduper loki.Deduper) bool {
return !deduper.Merge && deduper.Mark && (metricType == constants.MetricTypeBytes ||
metricType == constants.MetricTypePackets)
func shouldMergeReporters(metricType constants.MetricType) bool {
return metricType == constants.MetricTypeBytes || metricType == constants.MetricTypePackets
}

func expandReportersMergeQueries(queries filters.MultiQueries) filters.MultiQueries {
Expand Down
12 changes: 9 additions & 3 deletions pkg/loki/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,16 @@ func (f *labelFilter) writeInto(sb *strings.Builder) {
sb.WriteString(`ip("`)
sb.WriteString(f.value)
sb.WriteString(`")`)
case typeRegexContains, typeRegexArrayContains:
case typeRegexContains:
// match any case
sb.WriteString("`(?i).*")
sb.WriteString(f.value)
sb.WriteString(".*`")
case typeRegexArrayContains:
// match any case and ensure we stay inside the array
sb.WriteString("`(?i)[^]]*")
sb.WriteString(f.value)
sb.WriteString("[^]]*`")
default:
panic(fmt.Sprint("wrong filter value type", int(f.valueType)))
}
Expand Down Expand Up @@ -312,9 +318,9 @@ func (f *lineFilter) writeInto(sb *strings.Builder) {
sb.WriteString(`.*"`)
// for array, we ensure it starts by [ and ends by ]
case typeRegexArrayContains:
sb.WriteString(`\[(?i).*`)
sb.WriteString(`\[(?i)[^]]*`)
sb.WriteString(valueReplacer.Replace(v.value))
sb.WriteString(`.*]`)
sb.WriteString(`[^]]*]`)
}
}
}
Expand Down
12 changes: 1 addition & 11 deletions pkg/loki/flow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,22 +181,12 @@ func (q *FlowQueryBuilder) addLineFilters(key string, values []string, not bool,
return
}

var isArray bool
if q.config.Deduper.Merge {
switch key {
case "FlowDirection", "Interface":
key = fmt.Sprintf("%ss", key)
isArray = true
default:
isArray = false
}
}

lf := lineFilter{
key: key,
not: not,
moreThan: moreThan,
}
isArray := fields.IsArray(key)
isNumeric := fields.IsNumeric(key)
emptyMatches := false
for _, value := range values {
Expand Down
13 changes: 13 additions & 0 deletions pkg/model/fields/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ const (
DSCP = "Dscp"
PktDropBytes = "PktDropBytes"
FlowDirection = "FlowDirection"
Interfaces = "Interfaces"
IfDirections = "IfDirections"
DNSID = "DnsId"
DNSLatency = "DnsLatencyMs"
DNSErrNo = "DnsErrno"
Expand Down Expand Up @@ -80,3 +82,14 @@ func IsIP(f string) bool {
return false
}
}

func IsArray(v string) bool {
switch v {
case
IfDirections,
Interfaces:
return true
default:
return false
}
}
54 changes: 0 additions & 54 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,60 +396,6 @@ func TestLokiConfigurationForTopology(t *testing.T) {
assert.NotNil(t, qr.Result)
}

func TestLokiConfigurationForDeduperMerge(t *testing.T) {
// GIVEN a Loki service
lokiMock := httpMock{}
lokiMock.On("ServeHTTP", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
_, _ = args.Get(0).(http.ResponseWriter).Write([]byte(`{"status":"","data":{"resultType":"matrix","result":[]}}`))
})
lokiSvc := httptest.NewServer(&lokiMock)
defer lokiSvc.Close()
authM := &authMock{}
authM.MockGranted()
lokiURL, err := url.Parse(lokiSvc.URL)
require.NoError(t, err)

// THAT is accessed behind the NOO console plugin backend
backendRoutes := setupRoutes(&Config{
Loki: loki.Config{
URL: lokiURL,
Timeout: time.Second,
Labels: utils.GetMapInterface([]string{fields.SrcNamespace, fields.DstNamespace, fields.SrcOwnerName, fields.DstOwnerName}),
Deduper: loki.Deduper{
Mark: false,
Merge: true,
},
},
}, authM)
backendSvc := httptest.NewServer(backendRoutes)
defer backendSvc.Close()

// WHEN the Loki flows endpoint is queried in the backend
resp, err := backendSvc.Client().Get(backendSvc.URL + "/api/loki/flow/metrics")
require.NoError(t, err)

// THEN the query has been properly forwarded to Loki
// Two queries for dedup
assert.Len(t, lokiMock.Calls, 1)
req1 := lokiMock.Calls[0].Arguments[1].(*http.Request)
queries := []string{req1.URL.Query().Get("query")}
expected := []string{
`topk(100,sum by(SrcK8S_Name,SrcK8S_Type,SrcK8S_OwnerName,SrcK8S_OwnerType,SrcK8S_Namespace,SrcAddr,SrcK8S_HostName,DstK8S_Name,DstK8S_Type,DstK8S_OwnerName,DstK8S_OwnerType,DstK8S_Namespace,DstAddr,DstK8S_HostName)(rate({app="netobserv-flowcollector"}|json|unwrap Bytes|__error__=""[1m])))`,
}
assert.Equal(t, expected, queries)

// without any multi-tenancy header
assert.Empty(t, req1.Header.Get("X-Scope-OrgID"))

// AND the response is sent back to the client
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
var qr model.AggregatedQueryResponse
err = json.Unmarshal(body, &qr)
require.NoError(t, err)
assert.NotNil(t, qr.Result)
}

func TestLokiConfigurationForTableHistogram(t *testing.T) {
// GIVEN a Loki service
lokiMock := httpMock{}
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions web/locales/en/plugin__netobserv-plugin.json
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@
"Examples": "Examples",
"ICMP type provided but protocol is {{proto}}": "ICMP type provided but protocol is {{proto}}",
"ICMP code provided but protocol is {{proto}}": "ICMP code provided but protocol is {{proto}}",
"Invalid data provided. Check JSON for details.": "Invalid data provided. Check JSON for details.",
"dropped": "dropped",
"dropped by": "dropped by",
"sent": "sent",
Expand Down
22 changes: 6 additions & 16 deletions web/src/api/ipfix.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface Labels {
SrcK8S_Type?: string;
/** Kind of the destination matched Kubernetes object, such as Pod name, Service name, etc. */
DstK8S_Type?: string;
/** Flow direction from the node observation point, only when using eBPF deduper "JustMark" */
/** Flow direction from the node observation point*/
FlowDirection?: FlowDirection;
/** Type of record: 'flowLog' for regular flow logs, or 'allConnections',
* 'newConnection', 'heartbeat', 'endConnection' for conversation tracking */
Expand All @@ -57,7 +57,7 @@ export enum FlowDirection {
Inner = '2'
}

export const getFlowDirectionDisplayString = (value: FlowDirection, t: TFunction) => {
export const getDirectionDisplayString = (value: FlowDirection, t: TFunction) => {
return value === FlowDirection.Ingress
? t('Ingress')
: value === FlowDirection.Egress
Expand All @@ -67,11 +67,7 @@ export const getFlowDirectionDisplayString = (value: FlowDirection, t: TFunction
: t('n/a');
};

export const getFlowDirection = (flow: Record): FlowDirection => {
return String(flow.labels.FlowDirection || flow.fields.FlowDirection!) as FlowDirection;
};

export enum InterfaceDirection {
export enum IfDirection {
/** Incoming traffic, from the network interface observation point */
Ingress = '0',
/** Outgoing traffic, from the network interface observation point */
Expand Down Expand Up @@ -115,16 +111,10 @@ export interface Fields {
K8S_ClusterName?: string;
/** L4 protocol */
Proto: number;
/** Flow direction of the first flow captured, only when using eBPF deduper 'merge' mode */
FlowDirection?: FlowDirection;
/** Flow direction array, only when using eBPF deduper 'merge' mode */
FlowDirections?: number[];
/** Network interface */
Interface?: string;
/** Network interface array, only when using eBPF deduper 'merge' mode */
/** Network interface array */
Interfaces?: string[];
/** Flow direction from the network interface observation point */
IfDirection?: InterfaceDirection;
/** Flow direction array from the network interface observation point */
IfDirections?: IfDirection[];
/** Logical OR combination of unique TCP flags comprised in the flow, as per RFC-9293, with additional custom flags to represent the following per-packet combinations: SYN+ACK (0x100), FIN+ACK (0x200) and RST+ACK (0x400). */
Flags?: number;
/** Number of packets */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ describe('<RecordPanel />', () => {
type: 'flowLog',
canSwitchTypes: false,
allowPktDrops: false,
deduperMerge: false,
setFilters: jest.fn(),
setRange: jest.fn(),
setType: jest.fn(),
Expand Down
41 changes: 29 additions & 12 deletions web/src/components/netflow-record/record-field.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { FilterIcon, GlobeAmericasIcon, TimesIcon, ToggleOffIcon, ToggleOnIcon }
import * as React from 'react';
import { useTranslation } from 'react-i18next';
import { Link } from 'react-router-dom';
import { FlowDirection, getFlowDirection, getFlowDirectionDisplayString, Record } from '../../api/ipfix';
import { FlowDirection, getDirectionDisplayString, Record } from '../../api/ipfix';
import { Column, ColumnsId, getFullColumnName } from '../../utils/columns';
import { dateFormatter, getFormattedDate, timeMSFormatter, utcDateTimeFormatter } from '../../utils/datetime';
import { DNS_CODE_NAMES, DNS_ERRORS_VALUES, getDNSErrorDescription, getDNSRcodeDescription } from '../../utils/dns';
Expand Down Expand Up @@ -485,21 +485,43 @@ export const RecordField: React.FC<{
}
return singleContainer(child);
}
case ColumnsId.flowdir: {
return singleContainer(simpleTextWithTooltip(getFlowDirectionDisplayString(String(value) as FlowDirection, t)));
case ColumnsId.nodedir:
case ColumnsId.ifdirs: {
if (Array.isArray(value)) {
return nthContainer(
value.map(dir => simpleTextWithTooltip(getDirectionDisplayString(String(dir) as FlowDirection, t))),
true,
false,
false
);
}
return singleContainer(simpleTextWithTooltip(getDirectionDisplayString(String(value) as FlowDirection, t)));
}
case ColumnsId.interfaces: {
if (Array.isArray(value)) {
return nthContainer(
value.map(iName => simpleTextWithTooltip(String(iName))),
true,
false,
false
);
}
return singleContainer(simpleTextWithTooltip(String(value)));
}
case ColumnsId.flowdirints: {
if (
flow.fields.Interfaces &&
flow.fields.FlowDirections &&
flow.fields.Interfaces.length === flow.fields.FlowDirections.length
flow.fields.IfDirections &&
Array.isArray(flow.fields.Interfaces) &&
Array.isArray(flow.fields.IfDirections) &&
flow.fields.Interfaces.length === flow.fields.IfDirections.length
) {
return nthContainer(
flow.fields.Interfaces.map((iName, i) =>
sideBySideContainer(
simpleTextWithTooltip(iName),
simpleTextWithTooltip(
getFlowDirectionDisplayString(String(flow.fields.FlowDirections![i]) as FlowDirection, t)
getDirectionDisplayString(String(flow.fields.IfDirections![i]) as FlowDirection, t)
)
)
),
Expand All @@ -508,12 +530,7 @@ export const RecordField: React.FC<{
false
);
} else {
return singleContainer(
sideBySideContainer(
simpleTextWithTooltip(flow.fields.Interface),
simpleTextWithTooltip(getFlowDirectionDisplayString(getFlowDirection(flow), t))
)
);
return singleContainer(emptyText(t('Invalid data provided. Check JSON for details.')));
}
}
case ColumnsId.packets:
Expand Down
Loading
Loading