Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NETOBSERV-1703 Add enrichment in packet capture #364

Merged
merged 2 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/packetcapture-dump/client/packetcapture-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"os"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/exporter"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/google/gopacket/layers"
)
Expand Down Expand Up @@ -67,7 +67,7 @@ func main() {
os.Exit(1)
}
// write pcap file header
_, err = f.Write(exporter.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
_, err = f.Write(utils.GetPCAPFileHeader(snapshotlen, layers.LinkTypeEthernet))
if err != nil {
fmt.Println("Write file header failed:", err.Error())
os.Exit(1)
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func buildFlowExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
case "ipfix+tcp":
return buildIPFIXExporter(cfg, "tcp")
case "direct-flp":
return buildDirectFLPExporter(cfg)
return buildFlowDirectFLPExporter(cfg)
default:
return nil, fmt.Errorf("wrong flow export type %s", cfg.Export)
}
Expand All @@ -323,7 +323,7 @@ func buildGRPCExporter(cfg *Config, m *metrics.Metrics) (node.TerminalFunc[[]*fl
return grpcExporter.ExportFlows, nil
}

func buildDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
func buildFlowDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.Record], error) {
flpExporter, err := exporter.StartDirectFLP(cfg.FLPConfig, cfg.BuffersLength)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Config struct {
AgentIPType string `env:"AGENT_IP_TYPE" envDefault:"any"`
// Export selects the exporter protocol.
// Accepted values for Flows are: grpc (default), kafka, ipfix+udp, ipfix+tcp or direct-flp.
// Accepted values for Packets are: grpc (default) or tcp
// Accepted values for Packets are: grpc (default) or direct-flp
Export string `env:"EXPORT" envDefault:"grpc"`
// Host is the host name or IP of the flow or packet collector, when the EXPORT variable is
// set to "grpc"
Expand Down
10 changes: 10 additions & 0 deletions pkg/agent/packets_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,21 @@ func buildPacketExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord],
switch cfg.Export {
case "grpc":
return buildGRPCPacketExporter(cfg)
case "direct-flp":
return buildPacketDirectFLPExporter(cfg)
default:
return nil, fmt.Errorf("unsupported packet export type %s", cfg.Export)
}
}

func buildPacketDirectFLPExporter(cfg *Config) (node.TerminalFunc[[]*flow.PacketRecord], error) {
flpExporter, err := exporter.StartDirectFLP(cfg.FLPConfig, cfg.BuffersLength)
if err != nil {
return nil, err
}
return flpExporter.ExportPackets, nil
}

// Run a Packets agent. The function will keep running in the same thread
// until the passed context is canceled
func (p *Packets) Run(ctx context.Context) error {
Expand Down
68 changes: 68 additions & 0 deletions pkg/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package decode

import (
"encoding/base64"
"fmt"
"syscall"
"time"
Expand All @@ -9,6 +10,8 @@ import (
"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/mdlayher/ethernet"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -139,6 +142,71 @@ func RecordToMap(fr *flow.Record) config.GenericMap {
return out
}

func PacketToMap(pr *flow.PacketRecord) config.GenericMap {
out := config.GenericMap{}

if pr == nil {
return out
}

packet := gopacket.NewPacket(pr.Stream, layers.LayerTypeEthernet, gopacket.Lazy)
if ethLayer := packet.Layer(layers.LayerTypeEthernet); ethLayer != nil {
eth, _ := ethLayer.(*layers.Ethernet)
out["SrcMac"] = eth.SrcMAC.String()
out["DstMac"] = eth.DstMAC.String()
}

if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, _ := tcpLayer.(*layers.TCP)
out["SrcPort"] = tcp.SrcPort.String()
out["DstPort"] = tcp.DstPort.String()
} else if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, _ := udpLayer.(*layers.UDP)
out["SrcPort"] = udp.SrcPort.String()
out["DstPort"] = udp.DstPort.String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are missing SCTP protocol support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure I can add it !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} else if sctpLayer := packet.Layer(layers.LayerTypeSCTP); sctpLayer != nil {
sctp, _ := sctpLayer.(*layers.SCTP)
out["SrcPort"] = sctp.SrcPort.String()
out["DstPort"] = sctp.DstPort.String()
}

if ipv4Layer := packet.Layer(layers.LayerTypeIPv4); ipv4Layer != nil {
ipv4, _ := ipv4Layer.(*layers.IPv4)
out["SrcAddr"] = ipv4.SrcIP.String()
out["DstAddr"] = ipv4.DstIP.String()
out["Proto"] = ipv4.Protocol
} else if ipv6Layer := packet.Layer(layers.LayerTypeIPv6); ipv6Layer != nil {
ipv6, _ := ipv6Layer.(*layers.IPv6)
out["SrcAddr"] = ipv6.SrcIP.String()
out["DstAddr"] = ipv6.DstIP.String()
out["Proto"] = ipv6.NextHeader
}

if icmpv4Layer := packet.Layer(layers.LayerTypeICMPv4); icmpv4Layer != nil {
icmpv4, _ := icmpv4Layer.(*layers.ICMPv4)
out["IcmpType"] = icmpv4.TypeCode.Type()
out["IcmpCode"] = icmpv4.TypeCode.Code()
} else if icmpv6Layer := packet.Layer(layers.LayerTypeICMPv6); icmpv6Layer != nil {
icmpv6, _ := icmpv6Layer.(*layers.ICMPv6)
out["IcmpType"] = icmpv6.TypeCode.Type()
out["IcmpCode"] = icmpv6.TypeCode.Code()
}

if dnsLayer := packet.Layer(layers.LayerTypeDNS); dnsLayer != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we allow DNS in packet capture mode ? pca code doesn't enable any feature including DNS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We capture any packets as far as I saw

image

dns, _ := dnsLayer.(*layers.DNS)
out["DnsId"] = dns.ID
out["DnsFlagsResponseCode"] = dns.ResponseCode.String()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DNS enrichement not enabled with pca

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//TODO: add DNS questions / answers / authorities
}

out["Bytes"] = len(pr.Stream)
// Data is base64 encoded to avoid marshal / unmarshal issues
out["Data"] = base64.StdEncoding.EncodeToString(packet.Data())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this efficient I didn't look for any better alternative did u get a chance to explore other options ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm considering to update the gRPC message to send flows and packets as two separate byte array but during the pipeline lifetime I feel that's the easiest solution.

On my local kind cluster it's super fast so for now I'm not looking deeper in this.

out["Time"] = pr.Time.Unix()

return out
}

// TCPStateToStr is based on kernel TCP state definition
// https://elixir.bootlin.com/linux/v6.3/source/include/net/tcp_states.h#L12
func TCPStateToStr(state uint32) string {
Expand Down
12 changes: 12 additions & 0 deletions pkg/exporter/direct_flp.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ func (d *DirectFLP) ExportFlows(input <-chan []*flow.Record) {
}
}

// ExportPackets accepts slices of *flow.PacketRecord by its input channel, converts them
// to *pbflow.Records instances, and submits them to the collector.
func (d *DirectFLP) ExportPackets(input <-chan []*flow.PacketRecord) {
for inputPackets := range input {
for _, packet := range inputPackets {
if len(packet.Stream) != 0 {
d.fwd <- decode.PacketToMap(packet)
}
}
}
}

func (d *DirectFLP) Close() {
close(d.fwd)
}
31 changes: 8 additions & 23 deletions pkg/exporter/grpc_packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ package exporter

import (
"context"
"fmt"
"time"

"github.com/netobserv/netobserv-ebpf-agent/pkg/flow"
grpc "github.com/netobserv/netobserv-ebpf-agent/pkg/grpc/packet"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbpacket"
"github.com/netobserv/netobserv-ebpf-agent/pkg/utils"

"github.com/google/gopacket"
"github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/anypb"
)
Expand All @@ -22,22 +22,14 @@ type GRPCPacketProto struct {
var gplog = logrus.WithField("component", "packet/GRPCPackets")

// WritePacket writes the given packet data out to gRPC.
func writeGRPCPacket(ci gopacket.CaptureInfo, data []byte, conn *grpc.ClientConnection) error {
if ci.CaptureLength != len(data) {
return fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data))
}
if ci.CaptureLength > ci.Length {
return fmt.Errorf("invalid capture info %+v: capture length > length", ci)
}
gplog.Debugf("Sending Packet to client. Length: %d", len(data))
b, err := GetPacketHeader(ci)
func writeGRPCPacket(time time.Time, data []byte, conn *grpc.ClientConnection) error {
bytes, err := utils.GetPacketBytesWithHeader(time, data)
if err != nil {
return fmt.Errorf("error writing packet header: %w", err)
return err
}
// write 16 byte packet header & data all at once
_, err = conn.Client().Send(context.TODO(), &pbpacket.Packet{
Pcap: &anypb.Any{
Value: append(b, data...),
Value: bytes,
},
})
return err
Expand All @@ -59,15 +51,8 @@ func (p *GRPCPacketProto) ExportGRPCPackets(in <-chan []*flow.PacketRecord) {
for packetRecord := range in {
var errs []error
for _, packet := range packetRecord {
packetStream := packet.Stream
packetTimestamp := packet.Time
if len(packetStream) != 0 {
captureInfo := gopacket.CaptureInfo{
Timestamp: packetTimestamp,
CaptureLength: len(packetStream),
Length: len(packetStream),
}
if err := writeGRPCPacket(captureInfo, packetStream, p.clientConn); err != nil {
if len(packet.Stream) != 0 {
if err := writeGRPCPacket(packet.Time, packet.Stream, p.clientConn); err != nil {
errs = append(errs, err)
}
}
Expand Down
23 changes: 22 additions & 1 deletion pkg/exporter/packets_proto.go → pkg/utils/packets.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package exporter
package utils

import (
"encoding/binary"
"fmt"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/layers"
Expand Down Expand Up @@ -38,3 +39,23 @@ func GetPacketHeader(ci gopacket.CaptureInfo) ([]byte, error) {
binary.LittleEndian.PutUint32(buf[12:16], uint32(ci.Length))
return buf[:], nil
}

func GetPacketBytesWithHeader(time time.Time, data []byte) ([]byte, error) {
ci := gopacket.CaptureInfo{
Timestamp: time,
CaptureLength: len(data),
Length: len(data),
}
if ci.CaptureLength != len(data) {
return nil, fmt.Errorf("capture length %d does not match data length %d", ci.CaptureLength, len(data))
}
if ci.CaptureLength > ci.Length {
return nil, fmt.Errorf("invalid capture info %+v: capture length > length", ci)
}
b, err := GetPacketHeader(ci)
if err != nil {
return nil, fmt.Errorf("error writing packet header: %w", err)
}
// append 16 byte packet header & data all at once
return append(b, data...), nil
}
Loading