Skip to content

Commit

Permalink
pcapng implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jpinsonneau committed Jul 8, 2024
1 parent cccd05d commit 4297ee6
Show file tree
Hide file tree
Showing 2,232 changed files with 991 additions and 437,826 deletions.
20 changes: 15 additions & 5 deletions cmd/flow_capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
func runFlowCapture(_ *cobra.Command, _ []string) {
go scanner()

captureType = "Flow"
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i := range ports {
Expand Down Expand Up @@ -104,12 +105,14 @@ func runFlowCaptureOnAddr(port int, filename string) {
if stopReceived {
return
}
// parse and display flow async
go parseGenericMapAndDisplay(fp.GenericMap.Value)

// Write flows to sqlite DB
err = queryFlowDB(fp.GenericMap.Value, db)
if err != nil {
log.Error("Error while writing to DB:", err.Error())
}
go manageFlowsDisplay(fp.GenericMap.Value)
// append new line between each record to read file easilly
_, err = f.Write(append(fp.GenericMap.Value, []byte(",\n")...))
if err != nil {
Expand All @@ -118,20 +121,27 @@ func runFlowCaptureOnAddr(port int, filename string) {
}
}

func manageFlowsDisplay(line []byte) {
func parseGenericMapAndDisplay(bytes []byte) {
genericMap := config.GenericMap{}
err := json.Unmarshal(line, &genericMap)
err := json.Unmarshal(bytes, &genericMap)
if err != nil {
log.Error("Error while parsing json", err)
return
}

manageFlowsDisplay(genericMap)
}

func manageFlowsDisplay(genericMap config.GenericMap) {
// lock since we are updating lastFlows concurrently
mutex.Lock()

lastFlows = append(lastFlows, genericMap)
sort.Slice(lastFlows, func(i, j int) bool {
return lastFlows[i]["TimeFlowEndMs"].(float64) < lastFlows[j]["TimeFlowEndMs"].(float64)
if captureType == "Flow" {
return toFloat64(lastFlows[i], "TimeFlowEndMs") < toFloat64(lastFlows[j], "TimeFlowEndMs")
}
return toFloat64(lastFlows[i], "Time") < toFloat64(lastFlows[j], "Time")
})
if len(regexes) > 0 {
// regexes may change during the render so we make a copy first
Expand Down Expand Up @@ -191,7 +201,7 @@ func updateTable() {
resetTerminal()

if outputBuffer == nil {
fmt.Print("Running network-observability-cli as Flow Capture\n")
fmt.Printf("Running network-observability-cli as %s Capture\n", captureType)
fmt.Printf("Log level: %s\n", logLevel)
fmt.Printf("Collection filters: %s\n", filter)
fmt.Printf("Showing last: %d Use Up / Down keyboard arrows to increase / decrease limit\n", flowsToShow)
Expand Down
8 changes: 4 additions & 4 deletions cmd/flow_capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestFlowTableRefreshDelay(t *testing.T) {
buf := bytes.Buffer{}
setOutputBuffer(&buf)

manageFlowsDisplay([]byte(`{"TimeFlowEndMs": 1709741962017}`))
parseGenericMapAndDisplay([]byte(`{"TimeFlowEndMs": 1709741962017}`))

out := buf.String()
assert.Empty(t, out)
Expand All @@ -33,7 +33,7 @@ func TestFlowTableDefaultDisplay(t *testing.T) {
// add 1s to current time to avoid maxRefreshRate limit
tickTime()

manageFlowsDisplay([]byte(sampleFlow))
parseGenericMapAndDisplay([]byte(sampleFlow))

// get table output as string
rows := strings.Split(buf.String(), "\n")
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestFlowTableMultipleFlows(t *testing.T) {
bytes = bytes + 1000

// add flow to table
manageFlowsDisplay([]byte(fmt.Sprintf(`{
parseGenericMapAndDisplay([]byte(fmt.Sprintf(`{
"AgentIP":"10.0.1.1",
"Bytes":%d,
"DstAddr":"10.0.0.6",
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestFlowTableAdvancedDisplay(t *testing.T) {

// add one second to time and draw table
tickTime()
manageFlowsDisplay([]byte(sampleFlow))
parseGenericMapAndDisplay([]byte(sampleFlow))

// get table output per rows
return strings.Split(buf.String(), "\n")
Expand Down
20 changes: 18 additions & 2 deletions cmd/map_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,20 @@ func toText(genericMap config.GenericMap, fieldName string) interface{} {
return emptyText
}

func toFloat64(genericMap config.GenericMap, fieldName string) float64 {
v, ok := genericMap[fieldName]
if ok {
return v.(float64)
}
return 0
}

func toTimeString(genericMap config.GenericMap, fieldName string) string {
return time.UnixMilli(int64(genericMap[fieldName].(float64))).Format("15:04:05.000000")
v, ok := genericMap[fieldName]
if ok {
return time.UnixMilli(int64(v.(float64))).Format("15:04:05.000000")
}
return emptyText
}

func ToTableRow(genericMap config.GenericMap, cols []string) []interface{} {
Expand All @@ -411,7 +423,11 @@ func ToTableRow(genericMap config.GenericMap, cols []string) []interface{} {
// convert field name / value accordingly
switch col {
case "Time":
row = append(row, toTimeString(genericMap, "TimeFlowEndMs"))
if captureType == "Flow" {
row = append(row, toTimeString(genericMap, "TimeFlowEndMs"))
} else {
row = append(row, toTimeString(genericMap, "Time"))
}
case "SrcZone":
row = append(row, toText(genericMap, "SrcK8S_Zone"))
case "DstZone":
Expand Down
175 changes: 77 additions & 98 deletions cmd/packet_capture.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package cmd

import (
"encoding/base64"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/eiannone/keyboard"
"github.com/fatih/color"
"github.com/google/gopacket/layers"
"github.com/jpillora/sizestr"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/rodaine/table"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/write/grpc/genericmap"
"github.com/ryankurte/go-pcapng"
"github.com/ryankurte/go-pcapng/types"
"github.com/spf13/cobra"
)

Expand All @@ -32,14 +32,10 @@ type PcapResult struct {
ByteCount int64
}

var packets = []PcapResult{}

// Setting Snapshot length to 0 sets it to maximum packet size
var snapshotlen uint32

func runPacketCapture(_ *cobra.Command, _ []string) {
go packetCaptureScanner()
go scanner()

captureType = "Packet"
wg := sync.WaitGroup{}
wg.Add(len(ports))
for i := range ports {
Expand Down Expand Up @@ -67,19 +63,23 @@ func runPacketCaptureOnAddr(port int, filename string) {
log.Errorf("Create directory failed: %v", err.Error())
log.Fatal(err)
}
f, err = os.Create("./output/pcap/" + filename + ".pcap")
pw, err := pcapng.NewFileWriter("./output/pcap/" + filename + ".pcapng")
if err != nil {
log.Errorf("Create file %s failed: %v", filename, err.Error())
log.Fatal(err)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
so := types.SectionHeaderOptions{
Comment: filename,
Application: "netobserv-cli",
}
err = pw.WriteSectionHeader(so)
if err != nil {
log.Fatal(err)
}
defer f.Close()

flowPackets := make(chan *pbpacket.Packet, 100)
flowPackets := make(chan *genericmap.Flow, 100)
collector, err := grpc.StartCollector(port, flowPackets)
if err != nil {
log.Error("StartCollector failed:", err.Error())
Expand All @@ -94,96 +94,75 @@ func runPacketCaptureOnAddr(port int, filename string) {
if stopReceived {
return
}
go managePacketsDisplay(PcapResult{Name: filename, ByteCount: int64(len(fp.Pcap.Value)), PacketCount: 1})
// append new line between each record to read file easilly
_, err = f.Write(fp.Pcap.Value)
genericMap := config.GenericMap{}
err := json.Unmarshal(fp.GenericMap.Value, &genericMap)
if err != nil {
log.Fatal(err)
log.Error("Error while parsing json", err)
return
}
}
}

func managePacketsDisplay(result PcapResult) {
// lock since we are updating results concurrently
mutex.Lock()

// find result in array
found := false
for i, r := range packets {
if r.Name == result.Name {
found = true
// update existing result
packets[i].PacketCount += result.PacketCount
packets[i].ByteCount += result.ByteCount
break
data, ok := genericMap["Data"]
if ok {
// clear generic map data
delete(genericMap, "Data")

// display as flow async
go manageFlowsDisplay(genericMap)

// Get capture timestamp
ts := time.Unix(int64(genericMap["Time"].(float64)), 0)

// Decode b64 encoded data
b, err := base64.StdEncoding.DecodeString(data.(string))
if err != nil {
log.Error("Error while decoding data", err)
return
}

// write enriched data as interface
writeEnrichedData(pw, &genericMap)

// then append packet to file
err = pw.WriteEnhancedPacketBlock(0, ts, b, types.EnhancedPacketOptions{})
if err != nil {
log.Fatal(err)
}
} else {
// display as flow async
go manageFlowsDisplay(genericMap)
}
}
if !found {
packets = append(packets, result)
}

// don't refresh terminal too often to avoid blinking
now := currentTime()
if int(now.Sub(lastRefresh)) > int(maxRefreshRate) {
lastRefresh = now
resetTerminal()

if outputBuffer == nil {
log.Infof("Running network-observability-cli as Packet Capture\nLog level: %s\nFilters: %s\n", logLevel, filter)
}

// recreate table from scratch
headerFmt := color.New(color.BgHiBlue, color.Bold).SprintfFunc()
columnFmt := color.New(color.FgHiYellow).SprintfFunc()
tbl := table.New(
"Name",
"Packets",
"Bytes",
)
if outputBuffer != nil {
tbl.WithWriter(outputBuffer)
}
tbl.WithHeaderFormatter(headerFmt).WithFirstColumnFormatter(columnFmt).WithPadding(10)

for _, result := range packets {
tbl.AddRow(
result.Name,
result.PacketCount,
sizestr.ToString(result.ByteCount),
)
}

// print table
tbl.Print()
}

if len(keyboardError) > 0 {
fmt.Println(keyboardError)
}

// unlock
mutex.Unlock()
}

func packetCaptureScanner() {
if err := keyboard.Open(); err != nil {
keyboardError = fmt.Sprintf("Keyboard not supported %v", err)
return
}
defer func() {
_ = keyboard.Close()
}()

for {
_, key, err := keyboard.GetKey()
if err != nil {
panic(err)
func writeEnrichedData(pw *pcapng.FileWriter, genericMap *config.GenericMap) {
var io types.InterfaceOptions
srcType := toText(*genericMap, "SrcK8S_Type").(string)
if srcType != emptyText {
io = types.InterfaceOptions{
Name: fmt.Sprintf(
"%s: %s -> %s: %s",
srcType,
toText(*genericMap, "SrcK8S_Name"),
toText(*genericMap, "DstK8S_Type"),
toText(*genericMap, "DstK8S_Name")),
Description: fmt.Sprintf(
"%s: %s Namespace: %s -> %s: %s Namespace: %s",
toText(*genericMap, "SrcK8S_OwnerType"),
toText(*genericMap, "SrcK8S_OwnerName"),
toText(*genericMap, "SrcK8S_Namespace"),
toText(*genericMap, "DstK8S_OwnerType"),
toText(*genericMap, "DstK8S_OwnerName"),
toText(*genericMap, "DstK8S_Namespace"),
),
}
if key == keyboard.KeyCtrlC || stopReceived {
log.Info("Ctrl-C pressed, exiting program.")

// exit program
os.Exit(0)
} else {
io.Name = "Unknown resource"
io = types.InterfaceOptions{
Name: "Unknown kubernetes resource",
}
}
err := pw.WriteInterfaceDescription(uint16(layers.LinkTypeEthernet), io)
if err != nil {
log.Fatal(err)
}
}
Loading

0 comments on commit 4297ee6

Please sign in to comment.