Skip to content

Commit

Permalink
Merge pull request netobserv#160 from mariomac/ebpf
Browse files Browse the repository at this point in the history
[NETOBSERV-237] gRPC+PB flow ingest/decoder for NetObserv eBPF Agent
  • Loading branch information
Mario Macias authored Mar 30, 2022
2 parents 485fa3c + 7f92efd commit 20dbc8d
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 208 deletions.
15 changes: 15 additions & 0 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ Following is the supported API format for the kafka ingest:
startoffset: FirstOffset (least recent - default) or LastOffset (most recent) offset available for a partition
batchreadtimeout: how often (in milliseconds) to process input
</pre>
## Ingest GRPC from Network Observability eBPF Agent
Following is the supported API format for the Network Observability eBPF ingest:

<pre>
grpc:
port: the port number to listen on
buffer_length: the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)
</pre>
## Aws ingest API
Following is the supported API format for Aws flow entries:

Expand Down Expand Up @@ -126,6 +134,13 @@ Following is the supported API format for writing to loki:
timestampLabel: label to use for time indexing
timestampScale: timestamp units scale (e.g. for UNIX = 1s)
</pre>
## Write Standard Output
Following is the supported API format for writing to standard output:

<pre>
stdout:
format: the format of each line: printf (default) or json
</pre>
## Aggregate metrics API
Following is the supported API format for specifying metrics aggregations:

Expand Down
13 changes: 5 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ require (
github.com/heptiolabs/healthcheck v0.0.0-20211123025425-613501dd5deb
github.com/ip2location/ip2location-go/v9 v9.2.0
github.com/json-iterator/go v1.1.12
github.com/mariomac/guara v0.0.0-20220321135847-54b7fb6a8657
github.com/mitchellh/mapstructure v1.4.3
github.com/netobserv/gopipes v0.1.0
github.com/netobserv/gopipes v0.1.1
github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9
github.com/netobserv/netobserv-agent v0.0.0-20220328101628-406b2999d580
github.com/netsampler/goflow2 v1.0.5-0.20220106210010-20e8e567090c
github.com/prometheus/client_golang v1.12.1
github.com/prometheus/common v0.32.1
Expand All @@ -20,7 +22,7 @@ require (
github.com/spf13/cobra v1.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.10.1
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/vladimirvivien/gexe v0.1.1
github.com/vmware/go-ipfix v0.5.12
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
Expand All @@ -34,13 +36,10 @@ require (
)

require (
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/alessio/shellescape v1.4.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/evanphx/json-patch/v5 v5.2.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/go-kit/log v0.2.0 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
Expand All @@ -57,7 +56,6 @@ require (
github.com/klauspost/compress v1.13.6 // indirect
github.com/libp2p/go-reuseport v0.1.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
Expand All @@ -82,7 +80,7 @@ require (
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/grpc v1.43.0 // indirect
google.golang.org/grpc v1.45.0 // indirect
gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand All @@ -93,7 +91,6 @@ require (
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
sigs.k8s.io/controller-runtime v0.11.0 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/kind v0.11.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
202 changes: 10 additions & 192 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ type API struct {
KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"`
IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the netflow collector:\n"`
IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"`
IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"`
DecodeAws DecodeAws `yaml:"aws" doc:"## Aws ingest API\nFollowing is the supported API format for Aws flow entries:\n"`
TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"`
TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"`
TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"`
WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"`
WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"`
ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"`
}
6 changes: 6 additions & 0 deletions pkg/api/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package api

type IngestGRPCProto struct {
Port int `yaml:"port" doc:"the port number to listen on"`
BufferLen int `yaml:"buffer_length" doc:"the length of the ingest channel buffer, in groups of flows, containing each group hundreds of flows (default: 100)"`
}
5 changes: 5 additions & 0 deletions pkg/api/write_stdout.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package api

type WriteStdout struct {
Format string `yaml:"format" doc:"the format of each line: printf (default) or json"`
}
6 changes: 4 additions & 2 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Ingest struct {
File File
Collector api.IngestCollector
Kafka api.IngestKafka
GRPC api.IngestGRPCProto
}

type File struct {
Expand Down Expand Up @@ -98,8 +99,9 @@ type Encode struct {
}

type Write struct {
Type string
Loki api.WriteLoki
Type string
Loki api.WriteLoki
Stdout api.WriteStdout
}

// ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json
Expand Down
85 changes: 85 additions & 0 deletions pkg/pipeline/decode/decode_protobuf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package decode

import (
"fmt"
"net"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
"github.com/sirupsen/logrus"
)

var pflog = logrus.WithField("component", "Protobuf")

// Protobuf decodes protobuf flow records definitions, as forwarded by
// ingest.NetObservAgent, into a Generic Map that follows the same naming conventions
// as the IPFIX flows from ingest.IngestCollector
type Protobuf struct {
}

func NewProtobuf() (Decoder, error) {
return &Protobuf{}, nil
}

// Decode decodes input strings to a list of flow entries
func (p *Protobuf) Decode(in []interface{}) []config.GenericMap {
if len(in) == 0 {
pflog.Warn("empty input. Skipping")
return []config.GenericMap{}
}
pb, ok := in[0].(*pbflow.Records)
if !ok {
pflog.WithField("type", fmt.Sprintf("%T", pb)).
Warn("expecting input to be *pbflow.Records. Skipping")
}
out := make([]config.GenericMap, 0, len(pb.Entries))
for _, entry := range pb.Entries {
out = append(out, pbFlowToMap(entry))
}
return out
}

func pbFlowToMap(flow *pbflow.Record) config.GenericMap {
if flow == nil {
return config.GenericMap{}
}
out := config.GenericMap{
"FlowDirection": int(flow.Direction.Number()),
"Bytes": flow.Bytes,
"SrcAddr": ipToStr(flow.Network.GetSrcAddr()),
"DstAddr": ipToStr(flow.Network.GetDstAddr()),
"SrcMac": macToStr(flow.DataLink.GetSrcMac()),
"DstMac": macToStr(flow.DataLink.GetDstMac()),
"SrcPort": flow.Transport.GetSrcPort(),
"DstPort": flow.Transport.GetDstPort(),
"Etype": flow.EthProtocol,
"Packets": flow.Packets,
"Proto": flow.Transport.GetProtocol(),
"TimeFlowStart": flow.TimeFlowStart.GetSeconds(),
"TimeFlowEnd": flow.TimeFlowEnd.GetSeconds(),
"TimeReceived": time.Now().Unix(),
"Interface": flow.Interface,
}
return out
}

func ipToStr(ip *pbflow.IP) string {
if ip.GetIpv6() != nil {
return net.IP(ip.GetIpv6()).String()
} else {
n := ip.GetIpv4()
return fmt.Sprintf("%d.%d.%d.%d",
byte(n>>24), byte(n>>16), byte(n>>8), byte(n))
}
}

func macToStr(mac uint64) string {
return fmt.Sprintf("%02X:%02X:%02X:%02X:%02X:%02X",
uint8(mac>>40),
uint8(mac>>32),
uint8(mac>>24),
uint8(mac>>16),
uint8(mac>>8),
uint8(mac))
}
66 changes: 66 additions & 0 deletions pkg/pipeline/decode/decode_protobuf_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package decode

import (
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestDecodePBFlows(t *testing.T) {
decoder := Protobuf{}

someTime := time.Now()
flow := &pbflow.Record{
Interface: "eth0",
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.New(someTime),
TimeFlowEnd: timestamppb.New(someTime),
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x010203040506,
},
Transport: &pbflow.Transport{
Protocol: 1,
SrcPort: 23000,
DstPort: 443,
},
}

out := decoder.Decode([]interface{}{&pbflow.Records{Entries: []*pbflow.Record{flow}}})
require.Len(t, out, 1)
assert.NotZero(t, out[0]["TimeReceived"])
delete(out[0], "TimeReceived")
assert.Equal(t, config.GenericMap{
"FlowDirection": 1,
"Bytes": uint64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": uint32(23000),
"DstPort": uint32(443),
"Etype": uint32(2048),
"Packets": uint64(123),
"Proto": uint32(1),
"TimeFlowStart": someTime.Unix(),
"TimeFlowEnd": someTime.Unix(),
"Interface": "eth0",
}, out[0])

}
49 changes: 49 additions & 0 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ingest

import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-agent/pkg/grpc"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
)

const defaultBufferLen = 100

// GRPCProtobuf ingests data from the NetObserv eBPF Agent, using Protocol Buffers over gRPC
type GRPCProtobuf struct {
collector *grpc.CollectorServer
flowPackets chan *pbflow.Records
}

func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
netObserv := params.Ingest.GRPC
if netObserv.Port == 0 {
return nil, fmt.Errorf("ingest port not specified")
}
bufLen := netObserv.BufferLen
if bufLen == 0 {
bufLen = defaultBufferLen
}
flowPackets := make(chan *pbflow.Records, bufLen)
collector, err := grpc.StartCollector(netObserv.Port, flowPackets)
if err != nil {
return nil, err
}
return &GRPCProtobuf{
collector: collector,
flowPackets: flowPackets,
}, nil
}

func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
for fp := range no.flowPackets {
out <- []interface{}{fp}
}
}

func (no *GRPCProtobuf) Close() error {
err := no.collector.Close()
close(no.flowPackets)
return err
}
6 changes: 5 additions & 1 deletion pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ func getIngester(params config.StageParam) (ingest.Ingester, error) {
ingester, err = ingest.NewIngestCollector(params)
case "kafka":
ingester, err = ingest.NewIngestKafka(params)
case "grpc":
ingester, err = ingest.NewGRPCProtobuf(params)
default:
panic(fmt.Sprintf("`ingest` type %s not defined", params.Ingest.Type))
}
Expand All @@ -287,6 +289,8 @@ func getDecoder(params config.StageParam) (decode.Decoder, error) {
decoder, err = decode.NewDecodeJson()
case "aws":
decoder, err = decode.NewDecodeAws(params)
case "protobuf":
decoder, err = decode.NewProtobuf()
case "none":
decoder, err = decode.NewDecodeNone()
default:
Expand All @@ -300,7 +304,7 @@ func getWriter(params config.StageParam) (write.Writer, error) {
var err error
switch params.Write.Type {
case "stdout":
writer, err = write.NewWriteStdout()
writer, err = write.NewWriteStdout(params)
case "none":
writer, err = write.NewWriteNone()
case "loki":
Expand Down
Loading

0 comments on commit 20dbc8d

Please sign in to comment.