Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection tracking - hash mechanism #201

Merged
merged 18 commits into from
May 24, 2022
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ kubernetes
kind
create-kind-cluster Create cluster
delete-kind-cluster Delete cluster
kind-load-image Load image to kind

metrics
generate-configuration Generate metrics configuration
Expand Down
53 changes: 53 additions & 0 deletions pkg/api/conn_track.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package api

type ConnTrack struct {
KeyFields KeyFields `yaml:"keyFields" doc:"fields that are used to identify the connection"`
OutputRecordTypes []string `yaml:"outputRecordTypes" doc:"output record types to emit"`
OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"`
}

type KeyFields struct {
eranra marked this conversation as resolved.
Show resolved Hide resolved
FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"`
Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"`
}

type FieldGroup struct {
Name string `yaml:"name" doc:"field group name"`
Fields []string `yaml:"fields" doc:"list of fields in the group"`
}

// ConnTrackHash determines how to compute the connection hash.
// A and B are treated as the endpoints of the connection.
// When FieldGroupARef and FieldGroupBRef are set, the hash is computed in a way
// that flow logs from A to B will have the same hash as flow logs from B to A.
// When they are not set, a different hash will be computed for A->B and B->A,
// and they are tracked as different connections.
type ConnTrackHash struct {
FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"`
FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"`
FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"`
}

type OutputField struct {
Name string `yaml:"name" doc:"output field name"`
Operation string `yaml:"operation" doc:"aggregate operation on the field value"`
SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."`
Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"`
}
92 changes: 92 additions & 0 deletions pkg/pipeline/conntrack/hash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
KalmanMeth marked this conversation as resolved.
Show resolved Hide resolved
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package conntrack

import (
"bytes"
"encoding/gob"
"fmt"
"hash/fnv"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
log "github.com/sirupsen/logrus"
)

// ComputeHash computes the hash of a flow log according to keyFields.
// Two flow logs will have the same hash if they belong to the same connection.
func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, error) {
type hashType []byte
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ronensc If you externalize the hashType type you will be able to also return hashType and not []byte

fieldGroup2hash := make(map[string]hashType)

// Compute the hash of each field group
for _, fg := range keyFields.FieldGroups {
h, err := computeHashFields(flowLog, fg.Fields)
if err != nil {
return nil, fmt.Errorf("compute hash: %w", err)
}
fieldGroup2hash[fg.Name] = h
}

// Compute the total hash
hash := fnv.New32a()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we "trick" somehow (maybe interface?) so that it will be a parameter to the functions what length of Hash to use >???

@mariomac maybe you have an idea ??? ^^^

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually the fnv.New32a and rest of hashers implement the io.Writer interface. You can change the function signature to something like:

func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher io.Writer) ([]byte, error) {

and invoke it like:

h, err := computeHashFields(flowLog, fg.Fields, fnv.New32a())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, if you want to be even more restrictive in what you can pass as argument, you can use the hash.Hash interface instead of io.Writer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

for _, fgName := range keyFields.Hash.FieldGroupRefs {
hash.Write(fieldGroup2hash[fgName])
}
if keyFields.Hash.FieldGroupARef != "" {
hashA := fieldGroup2hash[keyFields.Hash.FieldGroupARef]
hashB := fieldGroup2hash[keyFields.Hash.FieldGroupBRef]
// Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A.
if bytes.Compare(hashA, hashB) < 0 {
eranra marked this conversation as resolved.
Show resolved Hide resolved
hash.Write(hashA)
hash.Write(hashB)
} else {
hash.Write(hashB)
hash.Write(hashA)
}
}
return hash.Sum([]byte{}), nil
}

func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) {
h := fnv.New32a()
for _, fn := range fieldNames {
f, ok := flowLog[fn]
if !ok {
log.Warningf("Missing field %v", fn)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be risky. It could end up flooding the log file. Is there a way to avoid repeated logs?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can replace it with a prometheus metric. WDYT?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe this needs to be tracked as a metric.  It doesn't seem like it's likely to occur but can if someone wants to be malicious.  For now, maybe let it go, but we should have a general solution to avoid spamming the log file.

continue
}
bytes, err := toBytes(f)
if err != nil {
return nil, err
}
h.Write(bytes)
}
return h.Sum([]byte{}), nil
}

func toBytes(data interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(data)
if err != nil {
return nil, err
}
bytes := buf.Bytes()
return bytes, nil
}
212 changes: 212 additions & 0 deletions pkg/pipeline/conntrack/hash_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright (C) 2022 IBM, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package conntrack

import (
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/stretchr/testify/require"
)

func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets int) config.GenericMap {
return config.GenericMap{
"SrcAddr": srcIP,
"SrcPort": srcPort,
"DstAddr": dstIP,
"DstPort": dstPort,
"Proto": protocol,
"Bytes": bytes,
"Packets": packets,
}
}

func TestComputeHash_Unidirectional(t *testing.T) {
keyFields := api.KeyFields{
FieldGroups: []api.FieldGroup{
{
Name: "src",
Fields: []string{
"SrcAddr",
"SrcPort",
},
},
{
Name: "dst",
Fields: []string{
"DstAddr",
"DstPort",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you include the ephemeral port in the hash, it will count a lot of connections.  We need to get an agreement on what a "connection" is.  Example: Let's say you access a web page.  Is that one connection?  With this implementation, it could be anywhere from 1 to 6 connections.

A typical web page will refer to JavaScript files, CSS files, images, etc.  The browser will need to fetch these files.  Pretty much all modern browsers today will allow up to 6 simultaneous connections to the same domain, and will reuse these connections to fetch all the files.  Each of these connections will have a different ephemeral port.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I tried to make the decision of what is a connection configurable. But it's still not flexible enough to support this use-case.

This specific unit test configures the classic 5-tuple to distinguish between connections. The other unit test uses a slightly different configuration. It uses the same 5-tuple but includes both flow directions in the same connection.

},
},
{
Name: "protocol",
Fields: []string{
"Proto",
},
},
},
Hash: api.ConnTrackHash{
FieldGroupRefs: []string{"src", "dst", "protocol"},
},
}
ipA := "10.0.0.1"
ipB := "10.0.0.2"
portA := 9001
portB := 9002
protocolA := 6
protocolB := 7
table := []struct {
name string
flowLog1 config.GenericMap
flowLog2 config.GenericMap
sameHash bool
}{
{
"Same IP, port and protocol",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11),
true,
},
{
"Alternating ip+port",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11),
false,
},
{
"Alternating ip",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11),
false,
},
{
"Alternating port",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11),
false,
},
{
"Same IP+port, different protocol",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11),
false,
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyFields)
h2, err2 := ComputeHash(test.flowLog2, keyFields)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
require.Equal(t, h1, h2)
} else {
require.NotEqual(t, h1, h2)
}
})
}
}

func TestComputeHash_Bidirectional(t *testing.T) {
keyFields := api.KeyFields{
FieldGroups: []api.FieldGroup{
{
Name: "src",
Fields: []string{
"SrcAddr",
"SrcPort",
},
},
{
Name: "dst",
Fields: []string{
"DstAddr",
"DstPort",
},
},
{
Name: "protocol",
Fields: []string{
"Proto",
},
},
},
Hash: api.ConnTrackHash{
FieldGroupRefs: []string{"protocol"},
FieldGroupARef: "src",
FieldGroupBRef: "dst",
},
}
ipA := "10.0.0.1"
ipB := "10.0.0.2"
portA := 1
portB := 9002
protocolA := 6
protocolB := 7
table := []struct {
name string
flowLog1 config.GenericMap
flowLog2 config.GenericMap
sameHash bool
}{
{
"Same IP, port and protocol",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11),
true,
},
{
"Alternating ip+port",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11),
true,
},
{
"Alternating ip",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11),
false,
},
{
"Alternating port",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11),
false,
},
{
"Same IP+port, different protocol",
NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22),
NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11),
false,
},
}
for _, test := range table {
t.Run(test.name, func(t *testing.T) {
h1, err1 := ComputeHash(test.flowLog1, keyFields)
h2, err2 := ComputeHash(test.flowLog2, keyFields)
require.NoError(t, err1)
require.NoError(t, err2)
if test.sameHash {
require.Equal(t, h1, h2)
} else {
require.NotEqual(t, h1, h2)
}
})
}
}
eranra marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*
*/

// TODO: Delete this package once the connection tracking module is done.

package connection_tracking

import (
Expand Down