From da38508afc0d7b4a539f434b68c064fcf52c87fb Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 27 Apr 2022 11:44:15 +0300 Subject: [PATCH 01/18] conn_track config draft --- contrib/kubernetes/conn_track_config.yaml | 126 ++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 contrib/kubernetes/conn_track_config.yaml diff --git a/contrib/kubernetes/conn_track_config.yaml b/contrib/kubernetes/conn_track_config.yaml new file mode 100644 index 000000000..bd8677a62 --- /dev/null +++ b/contrib/kubernetes/conn_track_config.yaml @@ -0,0 +1,126 @@ +log-level: error +parameters: + - ingest: + ... + name: ingest_collector + - decode: + ... + name: decode_json + + - name: transform_generic + transform: + generic: + ... + type: generic + + - name: transform_network + transform: + network: + ... + type: network + + - name: connection_tracking + conn_track: # Or connTrack/connectionTracking/connection_tracking + + finishTimeouts: # Per protocol. What if protocol is not one of key fields? + tcp: 2m + udp: 30s + default: 5m + + updateInterval: 1m + updateFlowLogsCount: 100 + + outputRecordTypes: + - start # newConnection + - updates # updatedConnection + - end # finishedConnection + # Optional: we might not need the following option + - input # flows/originalFlowLogs + + + keyFields: + nonPairs: # think of a better name + - protocol + # If ObservationPoint is NOT set, then we'll have duplicate flow logs from different ObservationPoints in the + # same connection. Should we handle it somehow? + - ObservationPoint + # Need to verify that the following 2 flows will end up in different connections. + # srcIP | srcPort | dstIP | dstPort + # ---------------------------------- + # IP1 | port1 | IP2 | port2 + # IP2 | port2 | IP1 | port1 + + + # IP1 | port2 | IP2 | port1 + pairs: # key1 and key2 will be grouped across all pairs. + - name: IP + key1: srcIP + key2: dstIP + - name: port + key1: srcPort + key2: dstPort + # `direction` and `perObservationPoint`are redundant if keyFields is specified. + # In IPFIX, the terms "Unidirectional" and "bidirectional" are used instead of "one-way" and "two-way". + # https://datatracker.ietf.org/doc/html/rfc5103#section-2 + direction: one-way # two-way + perObservationPoint: true + + + # In case of two-way, each tracking field must be duplicated one for src->dst and one for dst->src. + # What prefix/suffix to use? + # Do we really want this complication? + # I guess summing bytes from the two directions makes no sense. + outputFields: + - name: bytes + operation: sum + - name: packets + operation: sum + # Number of flows in a connection + # The following is confusing because it says "count bytes" + - name: numFlowLogEntries # This could be any field in the flow log + operation: count + # If the operation is `count` we don't need an 'input' field + # add an example for input in a different field + input: bytes # if input is not specified, it's set the same as "name" + - name: startTime + operation: min + - name: endTime + operation: max + + - name: extract_aggregate + extract: + aggregates: + ... + type: aggregates + + - name: encode_prom + encode: + prom: + ... + type: prom + + + - name: write_none + write: + ... + +pipeline: + - name: ingest_collector + - follows: ingest_collector + name: decode_json + - follows: decode_json + name: transform_generic + - follows: transform_generic + name: transform_network + + - follows: transform_network + name: connection_tracking + + - follows: connection_tracking + name: encode_prom + + - follows: extract_aggregate + name: encode_prom + - follows: encode_prom + name: write_none + From 957823e0eafb996230cdbea4cf0f7ca0c815e42b Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 27 Apr 2022 16:48:06 +0300 Subject: [PATCH 02/18] Change keyFields format and add TODOs --- contrib/kubernetes/conn_track_config.yaml | 51 +++++++++++------------ 1 file changed, 25 insertions(+), 26 deletions(-) diff --git a/contrib/kubernetes/conn_track_config.yaml b/contrib/kubernetes/conn_track_config.yaml index bd8677a62..89a330e16 100644 --- a/contrib/kubernetes/conn_track_config.yaml +++ b/contrib/kubernetes/conn_track_config.yaml @@ -30,40 +30,37 @@ parameters: updateInterval: 1m updateFlowLogsCount: 100 + # TODO: Add config option to set the field of the hash outputRecordTypes: - start # newConnection - updates # updatedConnection - end # finishedConnection - # Optional: we might not need the following option + # The input fields should include the hash of the connection - input # flows/originalFlowLogs + # All the key fields, along with the hash of the connection will be outputted as well. keyFields: - nonPairs: # think of a better name - - protocol - # If ObservationPoint is NOT set, then we'll have duplicate flow logs from different ObservationPoints in the - # same connection. Should we handle it somehow? - - ObservationPoint - # Need to verify that the following 2 flows will end up in different connections. - # srcIP | srcPort | dstIP | dstPort - # ---------------------------------- - # IP1 | port1 | IP2 | port2 - # IP2 | port2 | IP1 | port1 - - - # IP1 | port2 | IP2 | port1 - pairs: # key1 and key2 will be grouped across all pairs. - - name: IP - key1: srcIP - key2: dstIP - - name: port - key1: srcPort - key2: dstPort - # `direction` and `perObservationPoint`are redundant if keyFields is specified. - # In IPFIX, the terms "Unidirectional" and "bidirectional" are used instead of "one-way" and "two-way". - # https://datatracker.ietf.org/doc/html/rfc5103#section-2 - direction: one-way # two-way - perObservationPoint: true + fieldsGroup: + - name: src + fields: + - src_ip + - src_port + - InIf + - name: dst + fields: + - dst_ip + - dst_port + - OutIf + - name: protocol + fields: + - protocol_field + hash: + fieldGroups: + - protocol + fieldGroupPair: + keyA: dst + keyB: src # In case of two-way, each tracking field must be duplicated one for src->dst and one for dst->src. @@ -73,6 +70,8 @@ parameters: outputFields: - name: bytes operation: sum + # think of a different name for bidi + bidi: true # default: false - name: packets operation: sum # Number of flows in a connection From 1da5f2481152dd0b7219fe7e7b5ee31686096983 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 3 May 2022 11:48:34 +0300 Subject: [PATCH 03/18] WIP --- .../kubernetes/conn_track_config-temp.yaml | 114 ++++++++++++++++++ contrib/kubernetes/conn_track_config.yaml | 64 +++++----- 2 files changed, 142 insertions(+), 36 deletions(-) create mode 100644 contrib/kubernetes/conn_track_config-temp.yaml diff --git a/contrib/kubernetes/conn_track_config-temp.yaml b/contrib/kubernetes/conn_track_config-temp.yaml new file mode 100644 index 000000000..b8f53973e --- /dev/null +++ b/contrib/kubernetes/conn_track_config-temp.yaml @@ -0,0 +1,114 @@ +log-level: error +parameters: + - ingest: + ... + name: ingest_collector + - decode: + ... + name: decode_json + + - name: transform_generic + transform: + generic: + ... + type: generic + + - name: transform_network + transform: + network: + ... + type: network + + - name: connection_tracking + conn_track: # Or connTrack/connectionTracking/connection_tracking + finishTimeouts: # Per protocol. What if protocol is not one of key fields? + tcp: 2m + udp: 30s + default: 5m + updateInterval: 1m + updateFlowLogsCount: 100 + # TODO: Add config option to set the field of the hash + outputRecordTypes: + - start # newConnection + - updates # updatedConnection + - end # finishedConnection + # The input fields should include the hash of the connection + - input # flows/originalFlowLogs + + # All the key fields, along with the hash of the connection will be outputted as well. + keyFields: + fieldsGroup: + - name: src + fields: + - SrcAddr + - SrcPort + - InIf + - name: dst + fields: + - DstAddr + - DstPort + - OutIf + - name: protocol + fields: + - Proto + hash: + fieldGroups: + - protocol + fieldGroupPair: + keyA: dst + keyB: src + + outputFields: + - name: Bytes + operation: sum + splitBA: true # default: false + - name: Packets + operation: sum + splitBA: true + - name: numFlowLogEntries + operation: count + # If the operation is `count` we don't need an 'input' field + - name: StartTime + operation: min + input: TimeReceived + - name: EndTime + operation: max + input: TimeReceived + + - name: extract_aggregate + extract: + aggregates: + ... + type: aggregates + + - name: encode_prom + encode: + prom: + ... + type: prom + + + - name: write_none + write: + ... + +pipeline: + - name: ingest_collector + - follows: ingest_collector + name: decode_json + - follows: decode_json + name: transform_generic + - follows: transform_generic + name: transform_network + + - follows: transform_network + name: connection_tracking + + - follows: connection_tracking + name: encode_prom + + - follows: extract_aggregate + name: encode_prom + - follows: encode_prom + name: write_none + diff --git a/contrib/kubernetes/conn_track_config.yaml b/contrib/kubernetes/conn_track_config.yaml index 89a330e16..132c75eb9 100644 --- a/contrib/kubernetes/conn_track_config.yaml +++ b/contrib/kubernetes/conn_track_config.yaml @@ -21,70 +21,62 @@ parameters: - name: connection_tracking conn_track: # Or connTrack/connectionTracking/connection_tracking - - finishTimeouts: # Per protocol. What if protocol is not one of key fields? + finishTimeouts: # Per protocol tcp: 2m udp: 30s default: 5m - updateInterval: 1m updateFlowLogsCount: 100 - # TODO: Add config option to set the field of the hash outputRecordTypes: - - start # newConnection - - updates # updatedConnection - - end # finishedConnection - # The input fields should include the hash of the connection - - input # flows/originalFlowLogs - + - newConnection + - updatedConnection + - endConnection + - flowlogs # All the key fields, along with the hash of the connection will be outputted as well. keyFields: fieldsGroup: - name: src fields: - - src_ip - - src_port + - SrcAddr + - SrcPort - InIf - name: dst fields: - - dst_ip - - dst_port + - DstAddr + - DstPort - OutIf - name: protocol fields: - - protocol_field + - Proto hash: fieldGroups: - protocol - fieldGroupPair: - keyA: dst - keyB: src - + # Option1 + fieldGroupPair: # fieldGroupAB + keyA: src + keyB: dst + # Option2 + fieldGroupA: src + fieldGroupB: dst - # In case of two-way, each tracking field must be duplicated one for src->dst and one for dst->src. - # What prefix/suffix to use? - # Do we really want this complication? - # I guess summing bytes from the two directions makes no sense. outputFields: - - name: bytes + - name: Bytes operation: sum - # think of a different name for bidi - bidi: true # default: false - - name: packets + splitAB: true # default: false + - name: Packets operation: sum - # Number of flows in a connection - # The following is confusing because it says "count bytes" - - name: numFlowLogEntries # This could be any field in the flow log - operation: count - # If the operation is `count` we don't need an 'input' field - # add an example for input in a different field - input: bytes # if input is not specified, it's set the same as "name" - - name: startTime + splitAB: true + - name: numFlowLogEntries + operation: count # If the operation is `count` we don't need an 'input' field + splitAB: true + - name: StartTime operation: min - - name: endTime + input: TimeReceived + - name: EndTime operation: max + input: TimeReceived - name: extract_aggregate extract: From a6776a07661a2950a86c6c3a70055725f1de5381 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 11:59:21 +0300 Subject: [PATCH 04/18] Add hash --- pkg/api/conn_track.go | 49 ++++++ pkg/pipeline/conntrack/conntrack.go | 106 ++++++++++++ pkg/pipeline/conntrack/conntrack_test.go | 212 +++++++++++++++++++++++ 3 files changed, 367 insertions(+) create mode 100644 pkg/api/conn_track.go create mode 100644 pkg/pipeline/conntrack/conntrack.go create mode 100644 pkg/pipeline/conntrack/conntrack_test.go diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go new file mode 100644 index 000000000..212c10426 --- /dev/null +++ b/pkg/api/conn_track.go @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package api + +type ConnTrack struct { + KeyFields KeyFields `yaml:"keyFields" doc:"fields that are used to identify the connection"` + OutputRecordTypes []string `yaml:"outputRecordTypes" doc:"output record types to emit"` + OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"` +} + +// TODO: add annotations + +type KeyFields struct { + FieldGroups []FieldGroup + Hash ConnTrackHash +} + +type FieldGroup struct { + Name string + Fields []string +} + +type ConnTrackHash struct { + FieldGroups []string + FieldGroupA string + FieldGroupB string +} + +type OutputField struct { + Name string `yaml:"name" doc:"entry input field"` + Operation string `yaml:"operation" doc:"entry output field"` + SplitAB bool `yaml:"splitAB" doc:"one of the following:"` + Input string `yaml:"input" doc:"parameters specific to type"` +} diff --git a/pkg/pipeline/conntrack/conntrack.go b/pkg/pipeline/conntrack/conntrack.go new file mode 100644 index 000000000..5b2c33d57 --- /dev/null +++ b/pkg/pipeline/conntrack/conntrack.go @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package contrack + +import ( + "bytes" + "encoding/gob" + "fmt" + "hash/fnv" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" +) + +type ConnectionTracker interface { + ConnectionTrack(in []config.GenericMap) []config.GenericMap +} + +type connectionTrackNone struct { +} + +// ConnectionTrack TODO +func (ct *connectionTrackNone) ConnectionTrack(f []config.GenericMap) []config.GenericMap { + return f +} + +// ComputeHash computes the hash of a flow log according to keyFields. +// 2 flow logs will have the same hash if they belong to the same connection. +func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, error) { + type hashType []byte + fieldGroup2hash := make(map[string]hashType) + + // Compute the hash of each field group + for _, fg := range keyFields.FieldGroups { + h, err := computeHashFields(flowLog, fg.Fields) + if err != nil { + return nil, fmt.Errorf("compute hash: %w", err) + } + fieldGroup2hash[fg.Name] = h + } + + // Compute the total hash + hash := fnv.New32a() + for _, fgName := range keyFields.Hash.FieldGroups { + hash.Write(fieldGroup2hash[fgName]) + } + if keyFields.Hash.FieldGroupA != "" { + hashA := fieldGroup2hash[keyFields.Hash.FieldGroupA] + hashB := fieldGroup2hash[keyFields.Hash.FieldGroupB] + // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. + if bytes.Compare(hashA, hashB) < 0 { + hash.Write(hashA) + hash.Write(hashB) + } else { + hash.Write(hashB) + hash.Write(hashA) + } + } + return hash.Sum([]byte{}), nil +} + +func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) { + h := fnv.New32a() + for _, fn := range fieldNames { + f := flowLog[fn] + bytes, err := toBytes(f) + if err != nil { + return nil, err + } + h.Write(bytes) + } + return h.Sum([]byte{}), nil +} + +func toBytes(data interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(data) + if err != nil { + return nil, err + } + bytes := buf.Bytes() + return bytes, nil +} + +// NewConnectionTrackNone create a new ConnectionTrack +func NewConnectionTrackNone() (ConnectionTracker, error) { + log.Debugf("entering NewConnectionTrackNone") + return &connectionTrackNone{}, nil +} diff --git a/pkg/pipeline/conntrack/conntrack_test.go b/pkg/pipeline/conntrack/conntrack_test.go new file mode 100644 index 000000000..e97b60ae5 --- /dev/null +++ b/pkg/pipeline/conntrack/conntrack_test.go @@ -0,0 +1,212 @@ +/* + * Copyright (C) 2022 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package contrack + +import ( + "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/stretchr/testify/require" +) + +func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol int, bytes, packets int) config.GenericMap { + return config.GenericMap{ + "SrcAddr": srcIP, + "SrcPort": srcPort, + "DstAddr": dstIP, + "DstPort": dstPort, + "Proto": protocol, + "Bytes": bytes, + "Packets": packets, + } +} + +func TestComputeHash_Unidirectional(t *testing.T) { + keyFields := api.KeyFields{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "SrcPort", + }, + }, + { + Name: "dst", + Fields: []string{ + "DstAddr", + "DstPort", + }, + }, + { + Name: "protocol", + Fields: []string{ + "Proto", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroups: []string{"src", "dst", "protocol"}, + }, + } + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 9001 + portB := 9002 + protocolA := 6 + protocolB := 7 + table := []struct { + name string + flowLog1 config.GenericMap + flowLog2 config.GenericMap + sameHash bool + }{ + { + "Same IP, port and protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11), + true, + }, + { + "Alternating ip+port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11), + false, + }, + { + "Alternating ip", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11), + false, + }, + { + "Alternating port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11), + false, + }, + { + "Same IP+port, different protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11), + false, + }, + } + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + h1, err1 := ComputeHash(test.flowLog1, keyFields) + h2, err2 := ComputeHash(test.flowLog2, keyFields) + require.NoError(t, err1) + require.NoError(t, err2) + if test.sameHash { + require.Equal(t, h1, h2) + } else { + require.NotEqual(t, h1, h2) + } + }) + } +} + +func TestComputeHash_Bidirectional(t *testing.T) { + keyFields := api.KeyFields{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "SrcPort", + }, + }, + { + Name: "dst", + Fields: []string{ + "DstAddr", + "DstPort", + }, + }, + { + Name: "protocol", + Fields: []string{ + "Proto", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroups: []string{"protocol"}, + FieldGroupA: "src", + FieldGroupB: "dst", + }, + } + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 1 + portB := 9002 + protocolA := 6 + protocolB := 7 + table := []struct { + name string + flowLog1 config.GenericMap + flowLog2 config.GenericMap + sameHash bool + }{ + { + "Same IP, port and protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolA, 222, 11), + true, + }, + { + "Alternating ip+port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portB, ipA, portA, protocolA, 222, 11), + true, + }, + { + "Alternating ip", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipB, portA, ipA, portB, protocolA, 222, 11), + false, + }, + { + "Alternating port", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portB, ipB, portA, protocolA, 222, 11), + false, + }, + { + "Same IP+port, different protocol", + NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22), + NewFlowLog(ipA, portA, ipB, portB, protocolB, 222, 11), + false, + }, + } + for _, test := range table { + t.Run(test.name, func(t *testing.T) { + h1, err1 := ComputeHash(test.flowLog1, keyFields) + h2, err2 := ComputeHash(test.flowLog2, keyFields) + require.NoError(t, err1) + require.NoError(t, err2) + if test.sameHash { + require.Equal(t, h1, h2) + } else { + require.NotEqual(t, h1, h2) + } + }) + } +} From 3bd4bfc27fffa3c97da2d27160949e66a4e07d84 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 12:05:00 +0300 Subject: [PATCH 05/18] Add annotations --- pkg/api/conn_track.go | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go index 212c10426..86fd5bcda 100644 --- a/pkg/api/conn_track.go +++ b/pkg/api/conn_track.go @@ -23,22 +23,20 @@ type ConnTrack struct { OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"` } -// TODO: add annotations - type KeyFields struct { - FieldGroups []FieldGroup - Hash ConnTrackHash + FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field groups"` + Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"` } type FieldGroup struct { - Name string - Fields []string + Name string `yaml:"name" doc:"field group name"` + Fields []string `yaml:"fields" doc:"list of fields in the group"` } type ConnTrackHash struct { - FieldGroups []string - FieldGroupA string - FieldGroupB string + FieldGroups []string `yaml:"fieldGroups" doc:"list of field groups"` + FieldGroupA string `yaml:"fieldGroupA" doc:"field group A"` + FieldGroupB string `yaml:"fieldGroupB" doc:"field group B"` } type OutputField struct { From 5c21b9f76077ed3288f82614e761987579a14cff Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 12:09:05 +0300 Subject: [PATCH 06/18] Add a TODO comment --- .../transform/connection_tracking/connection_tracking.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/pipeline/transform/connection_tracking/connection_tracking.go b/pkg/pipeline/transform/connection_tracking/connection_tracking.go index b5fb5a67a..0e8f045b6 100644 --- a/pkg/pipeline/transform/connection_tracking/connection_tracking.go +++ b/pkg/pipeline/transform/connection_tracking/connection_tracking.go @@ -15,6 +15,8 @@ * */ +// TODO: Delete this package once the connection tracking module is done. + package connection_tracking import ( From ebe9aa8ee9accd452d71d1d4c3308335080fae21 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 12:12:41 +0300 Subject: [PATCH 07/18] Cleanup and rename --- .../conntrack/{conntrack.go => hash.go} | 21 +------------------ .../{conntrack_test.go => hash_test.go} | 0 2 files changed, 1 insertion(+), 20 deletions(-) rename pkg/pipeline/conntrack/{conntrack.go => hash.go} (80%) rename pkg/pipeline/conntrack/{conntrack_test.go => hash_test.go} (100%) diff --git a/pkg/pipeline/conntrack/conntrack.go b/pkg/pipeline/conntrack/hash.go similarity index 80% rename from pkg/pipeline/conntrack/conntrack.go rename to pkg/pipeline/conntrack/hash.go index 5b2c33d57..2cc1474e2 100644 --- a/pkg/pipeline/conntrack/conntrack.go +++ b/pkg/pipeline/conntrack/hash.go @@ -25,23 +25,10 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" - log "github.com/sirupsen/logrus" ) -type ConnectionTracker interface { - ConnectionTrack(in []config.GenericMap) []config.GenericMap -} - -type connectionTrackNone struct { -} - -// ConnectionTrack TODO -func (ct *connectionTrackNone) ConnectionTrack(f []config.GenericMap) []config.GenericMap { - return f -} - // ComputeHash computes the hash of a flow log according to keyFields. -// 2 flow logs will have the same hash if they belong to the same connection. +// Two flow logs will have the same hash if they belong to the same connection. func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, error) { type hashType []byte fieldGroup2hash := make(map[string]hashType) @@ -98,9 +85,3 @@ func toBytes(data interface{}) ([]byte, error) { bytes := buf.Bytes() return bytes, nil } - -// NewConnectionTrackNone create a new ConnectionTrack -func NewConnectionTrackNone() (ConnectionTracker, error) { - log.Debugf("entering NewConnectionTrackNone") - return &connectionTrackNone{}, nil -} diff --git a/pkg/pipeline/conntrack/conntrack_test.go b/pkg/pipeline/conntrack/hash_test.go similarity index 100% rename from pkg/pipeline/conntrack/conntrack_test.go rename to pkg/pipeline/conntrack/hash_test.go From a68f132239103bca636878bd14c86947584c0c82 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 12:13:59 +0300 Subject: [PATCH 08/18] Cleanup --- .../kubernetes/conn_track_config-temp.yaml | 114 ----------------- contrib/kubernetes/conn_track_config.yaml | 117 ------------------ 2 files changed, 231 deletions(-) delete mode 100644 contrib/kubernetes/conn_track_config-temp.yaml delete mode 100644 contrib/kubernetes/conn_track_config.yaml diff --git a/contrib/kubernetes/conn_track_config-temp.yaml b/contrib/kubernetes/conn_track_config-temp.yaml deleted file mode 100644 index b8f53973e..000000000 --- a/contrib/kubernetes/conn_track_config-temp.yaml +++ /dev/null @@ -1,114 +0,0 @@ -log-level: error -parameters: - - ingest: - ... - name: ingest_collector - - decode: - ... - name: decode_json - - - name: transform_generic - transform: - generic: - ... - type: generic - - - name: transform_network - transform: - network: - ... - type: network - - - name: connection_tracking - conn_track: # Or connTrack/connectionTracking/connection_tracking - finishTimeouts: # Per protocol. What if protocol is not one of key fields? - tcp: 2m - udp: 30s - default: 5m - updateInterval: 1m - updateFlowLogsCount: 100 - # TODO: Add config option to set the field of the hash - outputRecordTypes: - - start # newConnection - - updates # updatedConnection - - end # finishedConnection - # The input fields should include the hash of the connection - - input # flows/originalFlowLogs - - # All the key fields, along with the hash of the connection will be outputted as well. - keyFields: - fieldsGroup: - - name: src - fields: - - SrcAddr - - SrcPort - - InIf - - name: dst - fields: - - DstAddr - - DstPort - - OutIf - - name: protocol - fields: - - Proto - hash: - fieldGroups: - - protocol - fieldGroupPair: - keyA: dst - keyB: src - - outputFields: - - name: Bytes - operation: sum - splitBA: true # default: false - - name: Packets - operation: sum - splitBA: true - - name: numFlowLogEntries - operation: count - # If the operation is `count` we don't need an 'input' field - - name: StartTime - operation: min - input: TimeReceived - - name: EndTime - operation: max - input: TimeReceived - - - name: extract_aggregate - extract: - aggregates: - ... - type: aggregates - - - name: encode_prom - encode: - prom: - ... - type: prom - - - - name: write_none - write: - ... - -pipeline: - - name: ingest_collector - - follows: ingest_collector - name: decode_json - - follows: decode_json - name: transform_generic - - follows: transform_generic - name: transform_network - - - follows: transform_network - name: connection_tracking - - - follows: connection_tracking - name: encode_prom - - - follows: extract_aggregate - name: encode_prom - - follows: encode_prom - name: write_none - diff --git a/contrib/kubernetes/conn_track_config.yaml b/contrib/kubernetes/conn_track_config.yaml deleted file mode 100644 index 132c75eb9..000000000 --- a/contrib/kubernetes/conn_track_config.yaml +++ /dev/null @@ -1,117 +0,0 @@ -log-level: error -parameters: - - ingest: - ... - name: ingest_collector - - decode: - ... - name: decode_json - - - name: transform_generic - transform: - generic: - ... - type: generic - - - name: transform_network - transform: - network: - ... - type: network - - - name: connection_tracking - conn_track: # Or connTrack/connectionTracking/connection_tracking - finishTimeouts: # Per protocol - tcp: 2m - udp: 30s - default: 5m - updateInterval: 1m - updateFlowLogsCount: 100 - # TODO: Add config option to set the field of the hash - outputRecordTypes: - - newConnection - - updatedConnection - - endConnection - - flowlogs - - # All the key fields, along with the hash of the connection will be outputted as well. - keyFields: - fieldsGroup: - - name: src - fields: - - SrcAddr - - SrcPort - - InIf - - name: dst - fields: - - DstAddr - - DstPort - - OutIf - - name: protocol - fields: - - Proto - hash: - fieldGroups: - - protocol - # Option1 - fieldGroupPair: # fieldGroupAB - keyA: src - keyB: dst - # Option2 - fieldGroupA: src - fieldGroupB: dst - - outputFields: - - name: Bytes - operation: sum - splitAB: true # default: false - - name: Packets - operation: sum - splitAB: true - - name: numFlowLogEntries - operation: count # If the operation is `count` we don't need an 'input' field - splitAB: true - - name: StartTime - operation: min - input: TimeReceived - - name: EndTime - operation: max - input: TimeReceived - - - name: extract_aggregate - extract: - aggregates: - ... - type: aggregates - - - name: encode_prom - encode: - prom: - ... - type: prom - - - - name: write_none - write: - ... - -pipeline: - - name: ingest_collector - - follows: ingest_collector - name: decode_json - - follows: decode_json - name: transform_generic - - follows: transform_generic - name: transform_network - - - follows: transform_network - name: connection_tracking - - - follows: connection_tracking - name: encode_prom - - - follows: extract_aggregate - name: encode_prom - - follows: encode_prom - name: write_none - From 8a0532bfc0ccdee246802200c207326079c62d51 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 13:00:08 +0300 Subject: [PATCH 09/18] Update tags --- pkg/api/conn_track.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go index 86fd5bcda..ff144dc47 100644 --- a/pkg/api/conn_track.go +++ b/pkg/api/conn_track.go @@ -40,8 +40,8 @@ type ConnTrackHash struct { } type OutputField struct { - Name string `yaml:"name" doc:"entry input field"` - Operation string `yaml:"operation" doc:"entry output field"` - SplitAB bool `yaml:"splitAB" doc:"one of the following:"` - Input string `yaml:"input" doc:"parameters specific to type"` + Name string `yaml:"name" doc:"output field name"` + Operation string `yaml:"operation" doc:"aggregate operation on the field value"` + SplitAB bool `yaml:"splitAB" doc:"When true, 2 output fields will be created. One for A->B and one for B->A flows."` + Input string `yaml:"input" doc:"The input field to base the operation on. When omitted, 'name' is used"` } From 747a19baf4a5d6fac3b7c88ac630f0bbb56ae7b5 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 13:00:22 +0300 Subject: [PATCH 10/18] Update docs --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index badc43868..1cec31441 100644 --- a/README.md +++ b/README.md @@ -655,6 +655,7 @@ kubernetes kind create-kind-cluster Create cluster delete-kind-cluster Delete cluster + kind-load-image Load image to kind metrics generate-configuration Generate metrics configuration From 74e8a18f327e67dc82dacaa1a98e3b8638ff6dd3 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Wed, 11 May 2022 14:34:41 +0300 Subject: [PATCH 11/18] Add a comment --- pkg/pipeline/conntrack/hash.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 2cc1474e2..02681ab57 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -65,6 +65,7 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, er func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) { h := fnv.New32a() for _, fn := range fieldNames { + // TODO: How should we handle a missing fieldName? f := flowLog[fn] bytes, err := toBytes(f) if err != nil { From 909ef581e43985c00e8379949cbf865b98bd2e26 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 12:34:44 +0300 Subject: [PATCH 12/18] Rename fields and add comments --- pkg/api/conn_track.go | 14 ++++++++++---- pkg/pipeline/conntrack/hash.go | 8 ++++---- pkg/pipeline/conntrack/hash_test.go | 8 ++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go index ff144dc47..b7f8b11dc 100644 --- a/pkg/api/conn_track.go +++ b/pkg/api/conn_track.go @@ -24,7 +24,7 @@ type ConnTrack struct { } type KeyFields struct { - FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field groups"` + FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"` Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"` } @@ -33,10 +33,16 @@ type FieldGroup struct { Fields []string `yaml:"fields" doc:"list of fields in the group"` } +// ConnTrackHash determines how to compute the connection hash. +// A and B are treated as the endpoints of the connection. +// When FieldGroupARef and FieldGroupBRef are set, the hash is computed in a way +// that flow logs from A to B will have the same hash as flow logs from B to A. +// When they are not set, a different hash will be computed for A->B and B->A, +// and they are tracked as different connections. type ConnTrackHash struct { - FieldGroups []string `yaml:"fieldGroups" doc:"list of field groups"` - FieldGroupA string `yaml:"fieldGroupA" doc:"field group A"` - FieldGroupB string `yaml:"fieldGroupB" doc:"field group B"` + FieldGroupRefs []string `yaml:"fieldGroupRefs" doc:"list of field group names to build the hash"` + FieldGroupARef string `yaml:"fieldGroupARef" doc:"field group name of endpoint A"` + FieldGroupBRef string `yaml:"fieldGroupBRef" doc:"field group name of endpoint B"` } type OutputField struct { diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 02681ab57..9c8b22a0a 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -44,12 +44,12 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, er // Compute the total hash hash := fnv.New32a() - for _, fgName := range keyFields.Hash.FieldGroups { + for _, fgName := range keyFields.Hash.FieldGroupRefs { hash.Write(fieldGroup2hash[fgName]) } - if keyFields.Hash.FieldGroupA != "" { - hashA := fieldGroup2hash[keyFields.Hash.FieldGroupA] - hashB := fieldGroup2hash[keyFields.Hash.FieldGroupB] + if keyFields.Hash.FieldGroupARef != "" { + hashA := fieldGroup2hash[keyFields.Hash.FieldGroupARef] + hashB := fieldGroup2hash[keyFields.Hash.FieldGroupBRef] // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. if bytes.Compare(hashA, hashB) < 0 { hash.Write(hashA) diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go index e97b60ae5..b492abeab 100644 --- a/pkg/pipeline/conntrack/hash_test.go +++ b/pkg/pipeline/conntrack/hash_test.go @@ -62,7 +62,7 @@ func TestComputeHash_Unidirectional(t *testing.T) { }, }, Hash: api.ConnTrackHash{ - FieldGroups: []string{"src", "dst", "protocol"}, + FieldGroupRefs: []string{"src", "dst", "protocol"}, }, } ipA := "10.0.0.1" @@ -148,9 +148,9 @@ func TestComputeHash_Bidirectional(t *testing.T) { }, }, Hash: api.ConnTrackHash{ - FieldGroups: []string{"protocol"}, - FieldGroupA: "src", - FieldGroupB: "dst", + FieldGroupRefs: []string{"protocol"}, + FieldGroupARef: "src", + FieldGroupBRef: "dst", }, } ipA := "10.0.0.1" From 1a052c56115bbb86fdfdd971591decde09d472cf Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 12:55:20 +0300 Subject: [PATCH 13/18] Fix package name --- pkg/pipeline/conntrack/hash.go | 2 +- pkg/pipeline/conntrack/hash_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 9c8b22a0a..5881dd1d0 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -15,7 +15,7 @@ * */ -package contrack +package conntrack import ( "bytes" diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go index b492abeab..64377dc8a 100644 --- a/pkg/pipeline/conntrack/hash_test.go +++ b/pkg/pipeline/conntrack/hash_test.go @@ -15,7 +15,7 @@ * */ -package contrack +package conntrack import ( "testing" From 487643c6107f1e289df313b958b670ffdff369bc Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 16:54:07 +0300 Subject: [PATCH 14/18] Log missing field --- pkg/pipeline/conntrack/hash.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 5881dd1d0..0b1e7098d 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -25,6 +25,7 @@ import ( "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" + log "github.com/sirupsen/logrus" ) // ComputeHash computes the hash of a flow log according to keyFields. @@ -65,8 +66,11 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, er func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) { h := fnv.New32a() for _, fn := range fieldNames { - // TODO: How should we handle a missing fieldName? - f := flowLog[fn] + f, ok := flowLog[fn] + if !ok { + log.Warningf("Missing field %v", fn) + continue + } bytes, err := toBytes(f) if err != nil { return nil, err From 3e909c13f2d093a4b52f4583c6eacccfd99a4dc2 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 17:08:07 +0300 Subject: [PATCH 15/18] Extract hash instance --- pkg/pipeline/conntrack/hash.go | 28 ++++++++++++++-------------- pkg/pipeline/conntrack/hash_test.go | 11 +++++++---- 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 0b1e7098d..800407cb6 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -21,7 +21,7 @@ import ( "bytes" "encoding/gob" "fmt" - "hash/fnv" + "hash" "github.com/netobserv/flowlogs-pipeline/pkg/api" "github.com/netobserv/flowlogs-pipeline/pkg/config" @@ -30,13 +30,13 @@ import ( // ComputeHash computes the hash of a flow log according to keyFields. // Two flow logs will have the same hash if they belong to the same connection. -func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, error) { +func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash.Hash) ([]byte, error) { type hashType []byte fieldGroup2hash := make(map[string]hashType) // Compute the hash of each field group for _, fg := range keyFields.FieldGroups { - h, err := computeHashFields(flowLog, fg.Fields) + h, err := computeHashFields(flowLog, fg.Fields, hasher) if err != nil { return nil, fmt.Errorf("compute hash: %w", err) } @@ -44,27 +44,27 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields) ([]byte, er } // Compute the total hash - hash := fnv.New32a() + hasher.Reset() for _, fgName := range keyFields.Hash.FieldGroupRefs { - hash.Write(fieldGroup2hash[fgName]) + hasher.Write(fieldGroup2hash[fgName]) } if keyFields.Hash.FieldGroupARef != "" { hashA := fieldGroup2hash[keyFields.Hash.FieldGroupARef] hashB := fieldGroup2hash[keyFields.Hash.FieldGroupBRef] // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. if bytes.Compare(hashA, hashB) < 0 { - hash.Write(hashA) - hash.Write(hashB) + hasher.Write(hashA) + hasher.Write(hashB) } else { - hash.Write(hashB) - hash.Write(hashA) + hasher.Write(hashB) + hasher.Write(hashA) } } - return hash.Sum([]byte{}), nil + return hasher.Sum([]byte{}), nil } -func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, error) { - h := fnv.New32a() +func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) ([]byte, error) { + hasher.Reset() for _, fn := range fieldNames { f, ok := flowLog[fn] if !ok { @@ -75,9 +75,9 @@ func computeHashFields(flowLog config.GenericMap, fieldNames []string) ([]byte, if err != nil { return nil, err } - h.Write(bytes) + hasher.Write(bytes) } - return h.Sum([]byte{}), nil + return hasher.Sum([]byte{}), nil } func toBytes(data interface{}) ([]byte, error) { diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go index 64377dc8a..af1c968d3 100644 --- a/pkg/pipeline/conntrack/hash_test.go +++ b/pkg/pipeline/conntrack/hash_test.go @@ -18,6 +18,7 @@ package conntrack import ( + "hash/fnv" "testing" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -37,6 +38,8 @@ func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol i } } +var hasher = fnv.New32a() + func TestComputeHash_Unidirectional(t *testing.T) { keyFields := api.KeyFields{ FieldGroups: []api.FieldGroup{ @@ -110,8 +113,8 @@ func TestComputeHash_Unidirectional(t *testing.T) { } for _, test := range table { t.Run(test.name, func(t *testing.T) { - h1, err1 := ComputeHash(test.flowLog1, keyFields) - h2, err2 := ComputeHash(test.flowLog2, keyFields) + h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher) require.NoError(t, err1) require.NoError(t, err2) if test.sameHash { @@ -198,8 +201,8 @@ func TestComputeHash_Bidirectional(t *testing.T) { } for _, test := range table { t.Run(test.name, func(t *testing.T) { - h1, err1 := ComputeHash(test.flowLog1, keyFields) - h2, err2 := ComputeHash(test.flowLog2, keyFields) + h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher) require.NoError(t, err1) require.NoError(t, err2) if test.sameHash { From acb30a0171856a755492ace8471262c21eb3ef84 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 17:11:10 +0300 Subject: [PATCH 16/18] Externalize hashType --- pkg/pipeline/conntrack/hash.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 800407cb6..503f4b097 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -28,10 +28,11 @@ import ( log "github.com/sirupsen/logrus" ) +type hashType []byte + // ComputeHash computes the hash of a flow log according to keyFields. // Two flow logs will have the same hash if they belong to the same connection. -func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash.Hash) ([]byte, error) { - type hashType []byte +func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash.Hash) (hashType, error) { fieldGroup2hash := make(map[string]hashType) // Compute the hash of each field group @@ -63,7 +64,7 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash return hasher.Sum([]byte{}), nil } -func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) ([]byte, error) { +func computeHashFields(flowLog config.GenericMap, fieldNames []string, hasher hash.Hash) (hashType, error) { hasher.Reset() for _, fn := range fieldNames { f, ok := flowLog[fn] From bb8c08af8abdc23a5065f6b0ac3ffb62a2d7b7d4 Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Thu, 12 May 2022 17:19:14 +0300 Subject: [PATCH 17/18] Rename keyFields -> keyDefinition --- pkg/api/conn_track.go | 4 ++-- pkg/pipeline/conntrack/hash.go | 14 +++++++------- pkg/pipeline/conntrack/hash_test.go | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/api/conn_track.go b/pkg/api/conn_track.go index b7f8b11dc..7b9517aeb 100644 --- a/pkg/api/conn_track.go +++ b/pkg/api/conn_track.go @@ -18,12 +18,12 @@ package api type ConnTrack struct { - KeyFields KeyFields `yaml:"keyFields" doc:"fields that are used to identify the connection"` + KeyDefinition KeyDefinition `yaml:"keyDefinition" doc:"fields that are used to identify the connection"` OutputRecordTypes []string `yaml:"outputRecordTypes" doc:"output record types to emit"` OutputFields []OutputField `yaml:"outputFields" doc:"list of output fields"` } -type KeyFields struct { +type KeyDefinition struct { FieldGroups []FieldGroup `yaml:"fieldGroups" doc:"list of field group definitions"` Hash ConnTrackHash `yaml:"hash" doc:"how to build the connection hash"` } diff --git a/pkg/pipeline/conntrack/hash.go b/pkg/pipeline/conntrack/hash.go index 503f4b097..331b94d7f 100644 --- a/pkg/pipeline/conntrack/hash.go +++ b/pkg/pipeline/conntrack/hash.go @@ -30,13 +30,13 @@ import ( type hashType []byte -// ComputeHash computes the hash of a flow log according to keyFields. +// ComputeHash computes the hash of a flow log according to keyDefinition. // Two flow logs will have the same hash if they belong to the same connection. -func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash.Hash) (hashType, error) { +func ComputeHash(flowLog config.GenericMap, keyDefinition api.KeyDefinition, hasher hash.Hash) (hashType, error) { fieldGroup2hash := make(map[string]hashType) // Compute the hash of each field group - for _, fg := range keyFields.FieldGroups { + for _, fg := range keyDefinition.FieldGroups { h, err := computeHashFields(flowLog, fg.Fields, hasher) if err != nil { return nil, fmt.Errorf("compute hash: %w", err) @@ -46,12 +46,12 @@ func ComputeHash(flowLog config.GenericMap, keyFields api.KeyFields, hasher hash // Compute the total hash hasher.Reset() - for _, fgName := range keyFields.Hash.FieldGroupRefs { + for _, fgName := range keyDefinition.Hash.FieldGroupRefs { hasher.Write(fieldGroup2hash[fgName]) } - if keyFields.Hash.FieldGroupARef != "" { - hashA := fieldGroup2hash[keyFields.Hash.FieldGroupARef] - hashB := fieldGroup2hash[keyFields.Hash.FieldGroupBRef] + if keyDefinition.Hash.FieldGroupARef != "" { + hashA := fieldGroup2hash[keyDefinition.Hash.FieldGroupARef] + hashB := fieldGroup2hash[keyDefinition.Hash.FieldGroupBRef] // Determine order between A's and B's hash to get the same hash for both flow logs from A to B and from B to A. if bytes.Compare(hashA, hashB) < 0 { hasher.Write(hashA) diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go index af1c968d3..82ce545d3 100644 --- a/pkg/pipeline/conntrack/hash_test.go +++ b/pkg/pipeline/conntrack/hash_test.go @@ -41,7 +41,7 @@ func NewFlowLog(srcIP string, srcPort int, dstIP string, dstPort int, protocol i var hasher = fnv.New32a() func TestComputeHash_Unidirectional(t *testing.T) { - keyFields := api.KeyFields{ + keyDefinition := api.KeyDefinition{ FieldGroups: []api.FieldGroup{ { Name: "src", @@ -113,8 +113,8 @@ func TestComputeHash_Unidirectional(t *testing.T) { } for _, test := range table { t.Run(test.name, func(t *testing.T) { - h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher) - h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher) + h1, err1 := ComputeHash(test.flowLog1, keyDefinition, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyDefinition, hasher) require.NoError(t, err1) require.NoError(t, err2) if test.sameHash { @@ -127,7 +127,7 @@ func TestComputeHash_Unidirectional(t *testing.T) { } func TestComputeHash_Bidirectional(t *testing.T) { - keyFields := api.KeyFields{ + keyDefinition := api.KeyDefinition{ FieldGroups: []api.FieldGroup{ { Name: "src", @@ -201,8 +201,8 @@ func TestComputeHash_Bidirectional(t *testing.T) { } for _, test := range table { t.Run(test.name, func(t *testing.T) { - h1, err1 := ComputeHash(test.flowLog1, keyFields, hasher) - h2, err2 := ComputeHash(test.flowLog2, keyFields, hasher) + h1, err1 := ComputeHash(test.flowLog1, keyDefinition, hasher) + h2, err2 := ComputeHash(test.flowLog2, keyDefinition, hasher) require.NoError(t, err1) require.NoError(t, err2) if test.sameHash { From 862a303aec92179ba2d4c11bf498164d8c52061f Mon Sep 17 00:00:00 2001 From: Ronen Schaffer Date: Tue, 17 May 2022 16:41:38 +0300 Subject: [PATCH 18/18] Add test --- pkg/pipeline/conntrack/hash_test.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pkg/pipeline/conntrack/hash_test.go b/pkg/pipeline/conntrack/hash_test.go index 82ce545d3..68cb0839c 100644 --- a/pkg/pipeline/conntrack/hash_test.go +++ b/pkg/pipeline/conntrack/hash_test.go @@ -213,3 +213,32 @@ func TestComputeHash_Bidirectional(t *testing.T) { }) } } + +func TestComputeHash_MissingField(t *testing.T) { + keyDefinition := api.KeyDefinition{ + FieldGroups: []api.FieldGroup{ + { + Name: "src", + Fields: []string{ + "SrcAddr", + "Missing", + }, + }, + }, + Hash: api.ConnTrackHash{ + FieldGroupRefs: []string{"src"}, + }, + } + + ipA := "10.0.0.1" + ipB := "10.0.0.2" + portA := 1 + portB := 9002 + protocolA := 6 + + fl := NewFlowLog(ipA, portA, ipB, portB, protocolA, 111, 22) + + h, err := ComputeHash(fl, keyDefinition, hasher) + require.NoError(t, err) + require.NotNil(t, h) +}