Skip to content

Commit

Permalink
filter toGenericMap instead of aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Aug 7, 2023
1 parent 35a89c6 commit 74611d2
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ Following is the supported API format for specifying connection tracking:
last: last
splitAB: When true, 2 output fields will be created. One for A->B and one for B->A flows.
input: The input field to base the operation on. When omitted, 'name' is used
reportMissing: When true, missing input will produce MissingFieldError metric and error logs
scheduling: list of timeouts and intervals to apply per selector
selector: key-value map to match against connection fields to apply this scheduling
endConnectionTimeout: duration of time to wait from the last flow log to end a connection
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/conntrack/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (agg *aMax) update(conn connection, flowLog config.GenericMap, d direction,
}

func (cp *aFirst) update(conn connection, flowLog config.GenericMap, d direction, isNew bool) {
if isNew && flowLog[cp.inputField] != nil {
if isNew {
conn.updateAggValue(cp.outputField, flowLog[cp.inputField])
}
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/pipeline/extract/conntrack/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package conntrack

import (
"fmt"
"reflect"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/utils"
Expand Down Expand Up @@ -100,7 +101,9 @@ func (c *connType) getNextHeartbeatTime() time.Time {
func (c *connType) toGenericMap() config.GenericMap {
gm := config.GenericMap{}
for k, v := range c.aggFields {
gm[k] = v
if v != nil && (reflect.TypeOf(v).Kind() != reflect.Float64 || v.(float64) != 0) {
gm[k] = v
}
}

// In case of a conflict between the keys and the aggFields / cpFields, the keys should prevail.
Expand Down
20 changes: 16 additions & 4 deletions pkg/pipeline/extract/conntrack/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,26 @@ func newMockRecordConnAB(srcIP string, srcPort int, dstIP string, dstPort int, p
"DstAddr": dstIP,
"DstPort": dstPort,
"Proto": protocol,
"Bytes_AB": bytesAB,
"Bytes_BA": bytesBA,
"Packets_AB": packetsAB,
"Packets_BA": packetsBA,
"numFlowLogs": numFlowLogs,
api.IsFirstFieldName: false,
},
}

if bytesAB != 0 {
mock.record["Bytes_AB"] = bytesAB
}

if bytesBA != 0 {
mock.record["Bytes_BA"] = bytesBA
}

if bytesAB != 0 {
mock.record["Packets_AB"] = packetsAB
}

if bytesBA != 0 {
mock.record["Packets_BA"] = packetsBA
}
return mock
}

Expand Down

0 comments on commit 74611d2

Please sign in to comment.