Skip to content

Commit

Permalink
conntrack: handle TCP flags (#391)
Browse files Browse the repository at this point in the history
* Update ebpf-agent dependency

go get github.com/netobserv/netobserv-ebpf-agent@latest
go get github.com/netobserv/flowlogs-pipeline/pkg/pipeline
go mod vendor

* Fix Generic related build issues

* Add Flags fields to decode_protobuf

* Rename test funcions

* Handle FIN_ACK

* Add a test for MoveToFront

* Validate TCPFlags field name is not empty

* Add correct direction

* Add test case for mismatch of AB field count

* Add operational metric for tcp flags

* Rename CorrectDirection -> SwapAB

* Change test

* Update README

* Add json tag to conntrack api

* Update docs

* Rename variable

* Make linter happy

* Make linter happy

Subjects() has been deprecated in Go 1.18:
golang/go#46287

* Enable SwapAB only when the feature flag is set

* Fix rebase errors

* NETOBSERV-838 fix IsDuplicate

* Add missing 'omitempty'

* Add parenthesis for clarity

* Add tests for IsDuplicate()

---------

Co-authored-by: Julien Pinsonneau <jpinsonn@redhat.com>
  • Loading branch information
ronensc and jpinsonneau authored Mar 1, 2023
1 parent 203a766 commit 9740aa3
Show file tree
Hide file tree
Showing 36 changed files with 804 additions and 410 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ While, in bidirectional setting, they are grouped together.
Bidirectional setting requires defining both `fieldGroupARef` and `fieldGroupBRef` sections to allow the connection
tracking module to identify which set of fields can swap values and still be considered as the same connection.
The pairs of fields that can swap are determined by their order in the fieldGroup.
In the example below, `SrcAddr` and `DstAddr` are first in their fieldGroup, so they are swappable.
The same is true for `SrcPort` and `DstPort` which are second.
The configuration example below defines a bidirectional setting. So flow-logs that have the values of `SrcAddr` and `SrcPort`
swapped with `DstAddr` and `DstPort` are grouped together as long as they have the same `Proto` field.
Expand Down
4 changes: 4 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ Following is the supported API format for specifying connection tracking:
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
heartbeatInterval: duration of time to wait between heartbeat reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
tcpFlags: settings for handling TCP flags
fieldName: name of the field containing TCP flags
detectEndConnection: detect end connections by FIN_ACK flag
swapAB: swap source and destination when the first flowlog contains the SYN_ACK flag
</pre>
## Time-based Filters API
Following is the supported API format for specifying metrics time-based filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | type |


### conntrack_tcp_flags
| **Name** | conntrack_tcp_flags |
|:---|:---|
| **Description** | The total number of actions taken based on TCP flags. |
| **Type** | counter |
| **Labels** | action |


### encode_prom_errors
| **Name** | encode_prom_errors |
|:---|:---|
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/netobserv/flowlogs-pipeline

go 1.17
go 1.18

require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
Expand All @@ -13,9 +13,9 @@ require (
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/minio/minio-go/v7 v7.0.44
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.2.0
github.com/netobserv/gopipes v0.3.0
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d
github.com/netobserv/netobserv-ebpf-agent v0.2.4
github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,14 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/netobserv/gopipes v0.2.0 h1:CnJQq32+xNuM85eVYy/HOf+StTJdh2K6RdaEg7NAJDg=
github.com/netobserv/gopipes v0.2.0/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d h1:1tn5bJBtv44nglFmMlAbxWYf78Lz3RFxqTxZUsrXKaQ=
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d/go.mod h1:sKo7bEgMHchUkD+0c0qTQXZxQDRdgiPzKD9kgdRxgLU=
github.com/netobserv/netobserv-ebpf-agent v0.2.4 h1:I8dLLF4NJvnO8HPcZIzgEutMh9sP2YHWZcaZa+3osX8=
github.com/netobserv/netobserv-ebpf-agent v0.2.4/go.mod h1:IRGkUU+tFKr7mbaT/KyQDmuY3Nk4V1IwEZkNezzYLj0=
github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881 h1:hx5bi6xBovRjmwUoVJBzhJ3EDo4K4ZUsqqKrJuQ2vMI=
github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785 h1:qhDrIMXlk8YV7BxwA6UR/dQVdUzohjLlmrUXymsBx6g=
Expand Down
96 changes: 68 additions & 28 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@ const (
)

type ConnTrack struct {
// TODO: should by a pointer instead?
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" json:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" json:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" json:"outputFields,omitempty" doc:"list of output fields"`
Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" json:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" json:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
TCPFlags ConnTrackTCPFlags `yaml:"tcpFlags,omitempty" json:"tcpFlags,omitempty" doc:"settings for handling TCP flags"`
}

type ConnTrackOutputRecordTypeEnum struct {
NewConnection string `yaml:"newConnection" doc:"New connection"`
EndConnection string `yaml:"endConnection" doc:"End connection"`
Heartbeat string `yaml:"heartbeat" doc:"Heartbeat"`
FlowLog string `yaml:"flowLog" doc:"Flow log"`
NewConnection string `yaml:"newConnection" json:"newConnection" doc:"New connection"`
EndConnection string `yaml:"endConnection" json:"endConnection" doc:"End connection"`
Heartbeat string `yaml:"heartbeat" json:"heartbeat" doc:"Heartbeat"`
FlowLog string `yaml:"flowLog" json:"flowLog" doc:"Flow log"`
}

func ConnTrackOutputRecordTypeName(operation string) string {
return GetEnumName(ConnTrackOutputRecordTypeEnum{}, operation)
}

type KeyDefinition struct {
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" doc:"how to build the connection hash"`
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" json:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" json:"hash,omitempty" doc:"how to build the connection hash"`
}

type FieldGroup struct {
Name string `yaml:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" doc:"list of fields in the group"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" json:"fields" doc:"list of fields in the group"`
}

// ConnTrackHash determines how to compute the connection hash.
Expand All @@ -64,35 +64,41 @@ type FieldGroup struct {
// When they are not set, a different hash will be computed for A->B and B->A,
// and they are tracked as different connections.
type ConnTrackHash struct {
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" json:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" json:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" json:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
}

type OutputField struct {
Name string `yaml:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
}

type ConnTrackOperationEnum struct {
Sum string `yaml:"sum" doc:"sum"`
Count string `yaml:"count" doc:"count"`
Min string `yaml:"min" doc:"min"`
Max string `yaml:"max" doc:"max"`
Sum string `yaml:"sum" json:"sum" doc:"sum"`
Count string `yaml:"count" json:"count" doc:"count"`
Min string `yaml:"min" json:"min" doc:"min"`
Max string `yaml:"max" json:"max" doc:"max"`
}

type ConnTrackSchedulingGroup struct {
Selector map[string]interface{} `yaml:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
}

func ConnTrackOperationName(operation string) string {
return GetEnumName(ConnTrackOperationEnum{}, operation)
}

type ConnTrackTCPFlags struct {
FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"`
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN_ACK flag"`
SwapAB bool `yaml:"swapAB,omitempty" json:"swapAB,omitempty" doc:"swap source and destination when the first flowlog contains the SYN_ACK flag"`
}

func (ct *ConnTrack) Validate() error {
isGroupAEmpty := ct.KeyDefinition.Hash.FieldGroupARef == ""
isGroupBEmpty := ct.KeyDefinition.Hash.FieldGroupBRef == ""
Expand Down Expand Up @@ -201,9 +207,40 @@ func (ct *ConnTrack) Validate() error {
msg: fmt.Errorf("found %v default selectors. There should be exactly 1", numOfDefault)}
}

if len(ct.TCPFlags.FieldName) == 0 && (ct.TCPFlags.DetectEndConnection || ct.TCPFlags.SwapAB) {
return conntrackInvalidError{emptyTCPFlagsField: true,
msg: fmt.Errorf("TCPFlags.FieldName is empty although DetectEndConnection or SwapAB are enabled")}
}
if ct.TCPFlags.SwapAB && !isBidi {
return conntrackInvalidError{swapABWithNoBidi: true,
msg: fmt.Errorf("SwapAB is enabled although bidirection is not enabled (fieldGroupARef is empty)")}
}

fieldsA, fieldsB := ct.GetABFields()
if len(fieldsA) != len(fieldsB) {
return conntrackInvalidError{mismatchABFieldsCount: true,
msg: fmt.Errorf("mismatch between the field count of fieldGroupARef %v and fieldGroupBRef %v", len(fieldsA), len(fieldsB))}
}

return nil
}

func (ct *ConnTrack) GetABFields() ([]string, []string) {
endpointAFieldGroupName := ct.KeyDefinition.Hash.FieldGroupARef
endpointBFieldGroupName := ct.KeyDefinition.Hash.FieldGroupBRef
var endpointAFields []string
var endpointBFields []string
for _, fg := range ct.KeyDefinition.FieldGroups {
if fg.Name == endpointAFieldGroupName {
endpointAFields = fg.Fields
}
if fg.Name == endpointBFieldGroupName {
endpointBFields = fg.Fields
}
}
return endpointAFields, endpointBFields
}

// addToSet adds an item to a set and returns true if it's a new item. Otherwise, it returns false.
func addToSet(set map[string]struct{}, item string) bool {
if _, found := set[item]; found {
Expand Down Expand Up @@ -253,6 +290,9 @@ type conntrackInvalidError struct {
undefinedSelectorKey bool
defaultGroupAndNotLast bool
exactlyOneDefaultSelector bool
swapABWithNoBidi bool
emptyTCPFlagsField bool
mismatchABFieldsCount bool
}

func (err conntrackInvalidError) Error() string {
Expand Down
63 changes: 63 additions & 0 deletions pkg/api/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,45 @@ func TestConnTrackValidate(t *testing.T) {
},
conntrackInvalidError{defaultGroupAndNotLast: true},
},
{
"Empty TCPFlags field name 1",
ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
TCPFlags: ConnTrackTCPFlags{DetectEndConnection: true},
},
conntrackInvalidError{emptyTCPFlagsField: true},
},
{
"Empty TCPFlags field name 2",
ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
TCPFlags: ConnTrackTCPFlags{SwapAB: true},
},
conntrackInvalidError{emptyTCPFlagsField: true},
},
{
"Mismatch between field count of FieldGroupARef and FieldGroupBRef",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{
Name: "src",
Fields: []string{"SrcIP", "SrcPort"},
},
{
Name: "dst",
Fields: []string{"DstIP"},
},
},
Hash: ConnTrackHash{
FieldGroupARef: "src",
FieldGroupBRef: "dst",
},
},
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
},
conntrackInvalidError{mismatchABFieldsCount: true},
},
}

for _, tt := range tests {
Expand All @@ -221,3 +260,27 @@ func TestConnTrackValidate(t *testing.T) {
})
}
}

func TestGetABFields(t *testing.T) {
fieldsA := []string{"SrcIP", "SrcPort"}
fieldsB := []string{"DstIP", "DstPort"}
conf := ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: fieldsA},
{Name: "dst", Fields: fieldsB},
},
Hash: ConnTrackHash{
FieldGroupARef: "src",
FieldGroupBRef: "dst",
},
},
}

require.NoError(t, conf.Validate())

actualA, actualB := conf.GetABFields()
require.Equal(t, fieldsA, actualA)
require.Equal(t, fieldsB, actualB)
}
6 changes: 4 additions & 2 deletions pkg/config/generic_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import "github.com/netobserv/flowlogs-pipeline/pkg/utils"

type GenericMap map[string]interface{}

const duplicateFieldName = "Duplicate"

// Copy will create a flat copy of GenericMap
func (m GenericMap) Copy() GenericMap {
result := make(GenericMap, len(m))
Expand All @@ -33,8 +35,8 @@ func (m GenericMap) Copy() GenericMap {
}

func (m GenericMap) IsDuplicate() bool {
if duplicate, hasKey := m["Duplicate"]; hasKey {
if isDuplicate, err := utils.ConvertToBool(duplicate); err != nil {
if duplicate, hasKey := m[duplicateFieldName]; hasKey {
if isDuplicate, err := utils.ConvertToBool(duplicate); err == nil {
return isDuplicate
}
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/config/generic_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func BenchmarkGenericMap_Copy(b *testing.B) {
Expand All @@ -15,3 +17,49 @@ func BenchmarkGenericMap_Copy(b *testing.B) {
_ = m.Copy()
}
}

func TestGenericMap_IsDuplicate(t *testing.T) {
table := []struct {
name string
input GenericMap
expected bool
}{
{
"Duplicate: true",
GenericMap{duplicateFieldName: true},
true,
},
{
"Duplicate: false",
GenericMap{duplicateFieldName: false},
false,
},
{
"Missing field",
GenericMap{},
false,
},
{
"Convert 'true'",
GenericMap{duplicateFieldName: "true"},
true,
},
{
"Convert 'false'",
GenericMap{duplicateFieldName: "false"},
false,
},
{
"Conversion failure: 'maybe'",
GenericMap{duplicateFieldName: "maybe"},
false,
},
}

for _, testCase := range table {
t.Run(testCase.name, func(tt *testing.T) {
actual := testCase.input.IsDuplicate()
require.Equal(tt, testCase.expected, actual)
})
}
}
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"MaxConnectionsTracked":0,"OutputFields":null,"Scheduling":null}}}`, string(b))
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"keyDefinition":{"hash":{}},"tcpFlags":{}}}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
"AgentIP": ipToStr(flow.AgentIp),
"Flags": flow.Flags,
}
return out
}
Expand Down
Loading

0 comments on commit 9740aa3

Please sign in to comment.