Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conntrack: handle TCP flags #391

Merged
merged 24 commits into from
Mar 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,9 @@ While, in bidirectional setting, they are grouped together.

Bidirectional setting requires defining both `fieldGroupARef` and `fieldGroupBRef` sections to allow the connection
tracking module to identify which set of fields can swap values and still be considered as the same connection.
The pairs of fields that can swap are determined by their order in the fieldGroup.
In the example below, `SrcAddr` and `DstAddr` are first in their fieldGroup, so they are swappable.
The same is true for `SrcPort` and `DstPort` which are second.

The configuration example below defines a bidirectional setting. So flow-logs that have the values of `SrcAddr` and `SrcPort`
swapped with `DstAddr` and `DstPort` are grouped together as long as they have the same `Proto` field.
Expand Down
4 changes: 4 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ Following is the supported API format for specifying connection tracking:
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
heartbeatInterval: duration of time to wait between heartbeat reports of a connection
maxConnectionsTracked: maximum number of connections we keep in our cache (0 means no limit)
tcpFlags: settings for handling TCP flags
fieldName: name of the field containing TCP flags
detectEndConnection: detect end connections by FIN_ACK flag
swapAB: swap source and destination when the first flowlog contains the SYN_ACK flag
Comment on lines +242 to +243
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if these two should be the default behavior as it's more reliable & convenient than the timeouts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The swapAB feature isn't related to the timeouts.
The detectEndConnection feature is in addition to the timeouts. It can't replace the timeouts because it's not guaranteed that will receive a flowlog with FIN_ACK flag for every TCP connection (either because of sampling or because of SYN attack). But, it may allow us to increase the endConnectionTimeout for TCP.

</pre>
## Time-based Filters API
Following is the supported API format for specifying metrics time-based filters:
Expand Down
8 changes: 8 additions & 0 deletions docs/operational-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ Each table below provides documentation for an exported flowlogs-pipeline operat
| **Labels** | type |


### conntrack_tcp_flags
| **Name** | conntrack_tcp_flags |
|:---|:---|
| **Description** | The total number of actions taken based on TCP flags. |
| **Type** | counter |
| **Labels** | action |


### encode_prom_errors
| **Name** | encode_prom_errors |
|:---|:---|
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/netobserv/flowlogs-pipeline

go 1.17
go 1.18

require (
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible
Expand All @@ -13,9 +13,9 @@ require (
github.com/mariomac/guara v0.0.0-20220523124851-5fc279816f1f
github.com/minio/minio-go/v7 v7.0.44
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.2.0
github.com/netobserv/gopipes v0.3.0
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d
github.com/netobserv/netobserv-ebpf-agent v0.2.4
github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,14 @@ github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/netobserv/gopipes v0.2.0 h1:CnJQq32+xNuM85eVYy/HOf+StTJdh2K6RdaEg7NAJDg=
github.com/netobserv/gopipes v0.2.0/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI=
github.com/netobserv/gopipes v0.3.0 h1:IYmPnnAVCdSK7VmHmpFhrVBOEm45qpgbZmJz1sSW+60=
github.com/netobserv/gopipes v0.3.0/go.mod h1:N7/Gz05EOF0CQQSKWsv3eof22Cj2PB08Pbttw98YFYU=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500 h1:RmnoJe/ci5q+QdM7upFdxiU+D8F3L3qTd5wXCwwHefw=
github.com/netobserv/loki-client-go v0.0.0-20220927092034-f37122a54500/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E=
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d h1:1tn5bJBtv44nglFmMlAbxWYf78Lz3RFxqTxZUsrXKaQ=
github.com/netobserv/netobserv-ebpf-agent v0.2.4-0.20221220155455-aa7838d82f4d/go.mod h1:sKo7bEgMHchUkD+0c0qTQXZxQDRdgiPzKD9kgdRxgLU=
github.com/netobserv/netobserv-ebpf-agent v0.2.4 h1:I8dLLF4NJvnO8HPcZIzgEutMh9sP2YHWZcaZa+3osX8=
github.com/netobserv/netobserv-ebpf-agent v0.2.4/go.mod h1:IRGkUU+tFKr7mbaT/KyQDmuY3Nk4V1IwEZkNezzYLj0=
github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881 h1:hx5bi6xBovRjmwUoVJBzhJ3EDo4K4ZUsqqKrJuQ2vMI=
github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls=
github.com/netsampler/goflow2 v1.1.1-0.20220509155230-5300494e4785 h1:qhDrIMXlk8YV7BxwA6UR/dQVdUzohjLlmrUXymsBx6g=
Expand Down
96 changes: 68 additions & 28 deletions pkg/api/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@ const (
)

type ConnTrack struct {
// TODO: should by a pointer instead?
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"`
Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" json:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" json:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields,omitempty" json:"outputFields,omitempty" doc:"list of output fields"`
Scheduling []ConnTrackSchedulingGroup `yaml:"scheduling,omitempty" json:"scheduling,omitempty" doc:"list of timeouts and intervals to apply per selector"`
MaxConnectionsTracked int `yaml:"maxConnectionsTracked,omitempty" json:"maxConnectionsTracked,omitempty" doc:"maximum number of connections we keep in our cache (0 means no limit)"`
TCPFlags ConnTrackTCPFlags `yaml:"tcpFlags,omitempty" json:"tcpFlags,omitempty" doc:"settings for handling TCP flags"`
}

type ConnTrackOutputRecordTypeEnum struct {
NewConnection string `yaml:"newConnection" doc:"New connection"`
EndConnection string `yaml:"endConnection" doc:"End connection"`
Heartbeat string `yaml:"heartbeat" doc:"Heartbeat"`
FlowLog string `yaml:"flowLog" doc:"Flow log"`
NewConnection string `yaml:"newConnection" json:"newConnection" doc:"New connection"`
EndConnection string `yaml:"endConnection" json:"endConnection" doc:"End connection"`
Heartbeat string `yaml:"heartbeat" json:"heartbeat" doc:"Heartbeat"`
FlowLog string `yaml:"flowLog" json:"flowLog" doc:"Flow log"`
}

func ConnTrackOutputRecordTypeName(operation string) string {
return GetEnumName(ConnTrackOutputRecordTypeEnum{}, operation)
}

type KeyDefinition struct {
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" doc:"how to build the connection hash"`
FieldGroups []FieldGroup `yaml:"fieldGroups,omitempty" json:"fieldGroups,omitempty" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash,omitempty" json:"hash,omitempty" doc:"how to build the connection hash"`
}

type FieldGroup struct {
Name string `yaml:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" doc:"list of fields in the group"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"field group name"`
Fields []string `yaml:"fields" json:"fields" doc:"list of fields in the group"`
}

// ConnTrackHash determines how to compute the connection hash.
Expand All @@ -64,35 +64,41 @@ type FieldGroup struct {
// When they are not set, a different hash will be computed for A->B and B->A,
// and they are tracked as different connections.
type ConnTrackHash struct {
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
FieldGroupRefs []string `yaml:"fieldGroupRefs,omitempty" json:"fieldGroupRefs,omitempty" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef,omitempty" json:"fieldGroupARef,omitempty" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef,omitempty" json:"fieldGroupBRef,omitempty" doc:"field group name of endpoint B"`
}

type OutputField struct {
Name string `yaml:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
Name string `yaml:"name,omitempty" json:"name,omitempty" doc:"output field name"`
Operation string `yaml:"operation,omitempty" json:"operation,omitempty" enum:"ConnTrackOperationEnum" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB,omitempty" json:"splitAB,omitempty" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input,omitempty" json:"input,omitempty" doc:"The input field to base the operation on. When omitted, 'name' is used"`
}

type ConnTrackOperationEnum struct {
Sum string `yaml:"sum" doc:"sum"`
Count string `yaml:"count" doc:"count"`
Min string `yaml:"min" doc:"min"`
Max string `yaml:"max" doc:"max"`
Sum string `yaml:"sum" json:"sum" doc:"sum"`
Count string `yaml:"count" json:"count" doc:"count"`
Min string `yaml:"min" json:"min" doc:"min"`
Max string `yaml:"max" json:"max" doc:"max"`
}

type ConnTrackSchedulingGroup struct {
Selector map[string]interface{} `yaml:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
Selector map[string]interface{} `yaml:"selector,omitempty" json:"selector,omitempty" doc:"key-value map to match against connection fields to apply this scheduling"`
EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" json:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"`
HeartbeatInterval Duration `yaml:"heartbeatInterval,omitempty" json:"heartbeatInterval,omitempty" doc:"duration of time to wait between heartbeat reports of a connection"`
}

func ConnTrackOperationName(operation string) string {
return GetEnumName(ConnTrackOperationEnum{}, operation)
}

type ConnTrackTCPFlags struct {
FieldName string `yaml:"fieldName,omitempty" json:"fieldName,omitempty" doc:"name of the field containing TCP flags"`
DetectEndConnection bool `yaml:"detectEndConnection,omitempty" json:"detectEndConnection,omitempty" doc:"detect end connections by FIN_ACK flag"`
SwapAB bool `yaml:"swapAB,omitempty" json:"swapAB,omitempty" doc:"swap source and destination when the first flowlog contains the SYN_ACK flag"`
}

func (ct *ConnTrack) Validate() error {
isGroupAEmpty := ct.KeyDefinition.Hash.FieldGroupARef == ""
isGroupBEmpty := ct.KeyDefinition.Hash.FieldGroupBRef == ""
Expand Down Expand Up @@ -201,9 +207,40 @@ func (ct *ConnTrack) Validate() error {
msg: fmt.Errorf("found %v default selectors. There should be exactly 1", numOfDefault)}
}

if len(ct.TCPFlags.FieldName) == 0 && (ct.TCPFlags.DetectEndConnection || ct.TCPFlags.SwapAB) {
return conntrackInvalidError{emptyTCPFlagsField: true,
msg: fmt.Errorf("TCPFlags.FieldName is empty although DetectEndConnection or SwapAB are enabled")}
}
if ct.TCPFlags.SwapAB && !isBidi {
return conntrackInvalidError{swapABWithNoBidi: true,
msg: fmt.Errorf("SwapAB is enabled although bidirection is not enabled (fieldGroupARef is empty)")}
}

fieldsA, fieldsB := ct.GetABFields()
if len(fieldsA) != len(fieldsB) {
return conntrackInvalidError{mismatchABFieldsCount: true,
msg: fmt.Errorf("mismatch between the field count of fieldGroupARef %v and fieldGroupBRef %v", len(fieldsA), len(fieldsB))}
}

return nil
}

func (ct *ConnTrack) GetABFields() ([]string, []string) {
endpointAFieldGroupName := ct.KeyDefinition.Hash.FieldGroupARef
endpointBFieldGroupName := ct.KeyDefinition.Hash.FieldGroupBRef
var endpointAFields []string
var endpointBFields []string
for _, fg := range ct.KeyDefinition.FieldGroups {
if fg.Name == endpointAFieldGroupName {
endpointAFields = fg.Fields
}
if fg.Name == endpointBFieldGroupName {
endpointBFields = fg.Fields
}
}
return endpointAFields, endpointBFields
}

// addToSet adds an item to a set and returns true if it's a new item. Otherwise, it returns false.
func addToSet(set map[string]struct{}, item string) bool {
if _, found := set[item]; found {
Expand Down Expand Up @@ -253,6 +290,9 @@ type conntrackInvalidError struct {
undefinedSelectorKey bool
defaultGroupAndNotLast bool
exactlyOneDefaultSelector bool
swapABWithNoBidi bool
emptyTCPFlagsField bool
mismatchABFieldsCount bool
}

func (err conntrackInvalidError) Error() string {
Expand Down
63 changes: 63 additions & 0 deletions pkg/api/conntrack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,45 @@ func TestConnTrackValidate(t *testing.T) {
},
conntrackInvalidError{defaultGroupAndNotLast: true},
},
{
"Empty TCPFlags field name 1",
ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
TCPFlags: ConnTrackTCPFlags{DetectEndConnection: true},
},
conntrackInvalidError{emptyTCPFlagsField: true},
},
{
"Empty TCPFlags field name 2",
ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
TCPFlags: ConnTrackTCPFlags{SwapAB: true},
},
conntrackInvalidError{emptyTCPFlagsField: true},
},
{
"Mismatch between field count of FieldGroupARef and FieldGroupBRef",
ConnTrack{
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{
Name: "src",
Fields: []string{"SrcIP", "SrcPort"},
},
{
Name: "dst",
Fields: []string{"DstIP"},
},
},
Hash: ConnTrackHash{
FieldGroupARef: "src",
FieldGroupBRef: "dst",
},
},
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
},
conntrackInvalidError{mismatchABFieldsCount: true},
},
}

for _, tt := range tests {
Expand All @@ -221,3 +260,27 @@ func TestConnTrackValidate(t *testing.T) {
})
}
}

func TestGetABFields(t *testing.T) {
fieldsA := []string{"SrcIP", "SrcPort"}
fieldsB := []string{"DstIP", "DstPort"}
conf := ConnTrack{
Scheduling: []ConnTrackSchedulingGroup{{Selector: map[string]interface{}{}}},
KeyDefinition: KeyDefinition{
FieldGroups: []FieldGroup{
{Name: "src", Fields: fieldsA},
{Name: "dst", Fields: fieldsB},
},
Hash: ConnTrackHash{
FieldGroupARef: "src",
FieldGroupBRef: "dst",
},
},
}

require.NoError(t, conf.Validate())

actualA, actualB := conf.GetABFields()
require.Equal(t, fieldsA, actualA)
require.Equal(t, fieldsB, actualB)
}
6 changes: 4 additions & 2 deletions pkg/config/generic_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import "github.com/netobserv/flowlogs-pipeline/pkg/utils"

type GenericMap map[string]interface{}

const duplicateFieldName = "Duplicate"

// Copy will create a flat copy of GenericMap
func (m GenericMap) Copy() GenericMap {
result := make(GenericMap, len(m))
Expand All @@ -33,8 +35,8 @@ func (m GenericMap) Copy() GenericMap {
}

func (m GenericMap) IsDuplicate() bool {
if duplicate, hasKey := m["Duplicate"]; hasKey {
if isDuplicate, err := utils.ConvertToBool(duplicate); err != nil {
if duplicate, hasKey := m[duplicateFieldName]; hasKey {
if isDuplicate, err := utils.ConvertToBool(duplicate); err == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While testing this PR I noticed I've made a mistake here 👼
Sorry about that

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! I missed that too...

return isDuplicate
}
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/config/generic_map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package config
import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func BenchmarkGenericMap_Copy(b *testing.B) {
Expand All @@ -15,3 +17,49 @@ func BenchmarkGenericMap_Copy(b *testing.B) {
_ = m.Copy()
}
}

func TestGenericMap_IsDuplicate(t *testing.T) {
table := []struct {
name string
input GenericMap
expected bool
}{
{
"Duplicate: true",
GenericMap{duplicateFieldName: true},
true,
},
{
"Duplicate: false",
GenericMap{duplicateFieldName: false},
false,
},
{
"Missing field",
GenericMap{},
false,
},
{
"Convert 'true'",
GenericMap{duplicateFieldName: "true"},
true,
},
{
"Convert 'false'",
GenericMap{duplicateFieldName: "false"},
false,
},
{
"Conversion failure: 'maybe'",
GenericMap{duplicateFieldName: "maybe"},
false,
},
}

for _, testCase := range table {
t.Run(testCase.name, func(tt *testing.T) {
actual := testCase.input.IsDuplicate()
require.Equal(tt, testCase.expected, actual)
})
}
}
2 changes: 1 addition & 1 deletion pkg/config/pipeline_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestKafkaPromPipeline(t *testing.T) {

b, err = json.Marshal(params[2])
require.NoError(t, err)
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"KeyDefinition":{"FieldGroups":null,"Hash":{"FieldGroupRefs":null,"FieldGroupARef":"","FieldGroupBRef":""}},"OutputRecordTypes":null,"MaxConnectionsTracked":0,"OutputFields":null,"Scheduling":null}}}`, string(b))
require.JSONEq(t, `{"name":"conntrack","extract":{"type":"conntrack","conntrack":{"keyDefinition":{"hash":{}},"tcpFlags":{}}}}`, string(b))

b, err = json.Marshal(params[3])
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func PBFlowToMap(flow *pbflow.Record) config.GenericMap {
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
"AgentIP": ipToStr(flow.AgentIp),
"Flags": flow.Flags,
}
return out
}
Expand Down
Loading