Skip to content

Commit

Permalink
Merge pull request netobserv#203 from jotak/pb-decode-ms
Browse files Browse the repository at this point in the history
[breaking change] Fix protobuf decode for new time fields in ms
  • Loading branch information
jotak authored May 13, 2022
2 parents 0b25b2f + a03ea25 commit 3f5317c
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 43 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,7 @@ kubernetes
kind
create-kind-cluster Create cluster
delete-kind-cluster Delete cluster
kind-load-image Load image to kind
metrics
generate-configuration Generate metrics configuration
Expand Down
30 changes: 15 additions & 15 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,21 @@ func pbFlowToMap(flow *pbflow.Record) config.GenericMap {
return config.GenericMap{}
}
out := config.GenericMap{
"FlowDirection": int(flow.Direction.Number()),
"Bytes": flow.Bytes,
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
"DstMac": macToStr(flow.DataLink.GetDstMac()),
"SrcPort": flow.Transport.GetSrcPort(),
"DstPort": flow.Transport.GetDstPort(),
"Etype": flow.EthProtocol,
"Packets": flow.Packets,
"Proto": flow.Transport.GetProtocol(),
"TimeFlowStart": flow.TimeFlowStart.GetSeconds(),
"TimeFlowEnd": flow.TimeFlowEnd.GetSeconds(),
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
"FlowDirection": int(flow.Direction.Number()),
"Bytes": flow.Bytes,
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
"DstMac": macToStr(flow.DataLink.GetDstMac()),
"SrcPort": flow.Transport.GetSrcPort(),
"DstPort": flow.Transport.GetDstPort(),
"Etype": flow.EthProtocol,
"Packets": flow.Packets,
"Proto": flow.Transport.GetProtocol(),
"TimeFlowStartMs": flow.TimeFlowStart.AsTime().UnixMilli(),
"TimeFlowEndMs": flow.TimeFlowEnd.AsTime().UnixMilli(),
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
}
return out
}
Expand Down
28 changes: 14 additions & 14 deletions pkg/pipeline/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ func TestDecodePBFlows(t *testing.T) {
assert.NotZero(t, out[0]["TimeReceived"])
delete(out[0], "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
"DstPort": uint32(443),
"Etype": uint32(2048),
"Packets": uint64(123),
"Proto": uint32(1),
"TimeFlowStart": someTime.Unix(),
"TimeFlowEnd": someTime.Unix(),
"Interface": "eth0",
"FlowDirection": 1,
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
"DstPort": uint32(443),
"Etype": uint32(2048),
"Packets": uint64(123),
"Proto": uint32(1),
"TimeFlowStartMs": someTime.UnixMilli(),
"TimeFlowEndMs": someTime.UnixMilli(),
"Interface": "eth0",
}, out[0])

}
28 changes: 14 additions & 14 deletions pkg/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,20 @@ parameters:
assert.NotZero(t, capturedRecord["TimeReceived"])
delete(capturedRecord, "TimeReceived")
assert.EqualValues(t, map[string]interface{}{
"FlowDirection": float64(1),
"Bytes": float64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": float64(23000),
"DstPort": float64(443),
"Etype": float64(2048),
"Packets": float64(123),
"Proto": float64(1),
"TimeFlowStart": float64(startTime.Unix()),
"TimeFlowEnd": float64(endTime.Unix()),
"Interface": "eth0",
"FlowDirection": float64(1),
"Bytes": float64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": float64(23000),
"DstPort": float64(443),
"Etype": float64(2048),
"Packets": float64(123),
"Proto": float64(1),
"TimeFlowStartMs": float64(startTime.UnixMilli()),
"TimeFlowEndMs": float64(endTime.UnixMilli()),
"Interface": "eth0",
}, capturedRecord)
}

Expand Down

0 comments on commit 3f5317c

Please sign in to comment.