Skip to content

Commit

Permalink
Allow to start FLP directly from the flow logs producer
Browse files Browse the repository at this point in the history
- New "InProcess" ingest stage
- New entry point to start the whole FLP in-process: pipeline.StartFLPInProcess
- Add some tests
  • Loading branch information
jotak committed Nov 16, 2023
1 parent 7c78d63 commit 86379eb
Show file tree
Hide file tree
Showing 10 changed files with 339 additions and 99 deletions.
26 changes: 3 additions & 23 deletions cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -35,7 +34,7 @@ import (
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -182,27 +181,8 @@ func run() {

// Setup (threads) exit manager
utils.SetupElegantExit()

// set up private prometheus registry
if cfg.MetricsSettings.SuppressGoMetrics {
reg := prometheus.NewRegistry()
prometheus.DefaultRegisterer = reg
prometheus.DefaultGatherer = reg
}

// create prometheus server for operational metrics
// if value of address is empty, then by default it will take 0.0.0.0
addr := fmt.Sprintf("%s:%v", cfg.MetricsSettings.Address, cfg.MetricsSettings.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.MetricsSettings.TLS
go utils.StartPromServer(tlsConfig, promServer, !cfg.MetricsSettings.NoPanic, prometheus.DefaultGatherer.(*prometheus.Registry))
prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings)
promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil)

// Create new flows pipeline
mainPipeline, err = pipeline.NewPipeline(&cfg)
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PipelineBuilderStage struct {
pipeline *pipeline
}

const PresetIngesterStage = "preset-ingester"

// NewPipeline creates a new pipeline from an existing ingest
func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) {
if ingest.Collector != nil {
Expand Down Expand Up @@ -89,6 +91,15 @@ func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage
return PipelineBuilderStage{pipeline: &p, lastStage: name}
}

// NewPresetIngesterPipeline creates a new partial pipeline without ingest stage
func NewPresetIngesterPipeline() PipelineBuilderStage {
p := pipeline{
stages: []Stage{},
config: []StageParam{},
}
return PipelineBuilderStage{pipeline: &p, lastStage: PresetIngesterStage}
}

func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuilderStage {
b.pipeline.stages = append(b.pipeline.stages, Stage{Name: name, Follows: b.lastStage})
b.pipeline.config = append(b.pipeline.config, param)
Expand Down
15 changes: 2 additions & 13 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@
package encode

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
putils "github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
promserver "github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/flowlogs-pipeline/pkg/utils"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -279,17 +278,7 @@ func NewEncodeProm(opMetrics *operational.Metrics, params config.StageParam) (En
if cfg.PromConnectionInfo != nil {
registry := prometheus.NewRegistry()
registerer = registry
addr := fmt.Sprintf("%s:%v", cfg.PromConnectionInfo.Address, cfg.PromConnectionInfo.Port)
log.Infof("startServer: addr = %s", addr)
promServer := &http.Server{
Addr: addr,
// TLS clients must use TLS 1.2 or higher
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
tlsConfig := cfg.PromConnectionInfo.TLS
go putils.StartPromServer(tlsConfig, promServer, true, registry)
promserver.StartServerAsync(cfg.PromConnectionInfo, nil)
} else {
registerer = prometheus.DefaultRegisterer
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/pipeline/ingest/ingest_inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package ingest

import (
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"

"github.com/netobserv/netobserv-ebpf-agent/pkg/decode"
"github.com/sirupsen/logrus"
)

var ilog = logrus.WithField("component", "ingest.InProcess")

// InProcess ingester, meant to be imported and used from another program via
type InProcess struct {
flowPackets chan *pbflow.Records
}

func NewInProcess(flowPackets chan *pbflow.Records) *InProcess {
return &InProcess{flowPackets: flowPackets}
}

func (d *InProcess) Ingest(out chan<- config.GenericMap) {
go func() {
<-utils.ExitChannel()
d.Close()
}()
for fp := range d.flowPackets {
ilog.Debugf("Ingested %v records", len(fp.Entries))
for _, entry := range fp.Entries {
out <- decode.PBFlowToMap(entry)
}
}
}

func (d *InProcess) Write(record *pbflow.Records) {
d.flowPackets <- record
}

func (d *InProcess) Close() {
close(d.flowPackets)
}
32 changes: 32 additions & 0 deletions pkg/pipeline/inprocess.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pipeline

import (
"context"
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/flowlogs-pipeline/pkg/prometheus"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
)

// StartFLPInProcess is an entry point to start the whole FLP / pipeline processing from imported code
func StartFLPInProcess(cfg config.ConfigFileStruct) (*ingest.InProcess, error) {
prometheus.SetGlobalMetricsSettings(&cfg.MetricsSettings)
promServer := prometheus.StartServerAsync(&cfg.MetricsSettings.PromConnectionInfo, nil)

// Create new flows pipeline
ingester := ingest.NewInProcess(make(chan *pbflow.Records, 100))
flp, err := newPipelineFromIngester(&cfg, ingester)
if err != nil {
return nil, fmt.Errorf("failed to initialize pipeline %w", err)
}

// Starts the flows pipeline; blocking call
go func() {
flp.Run()
_ = promServer.Shutdown(context.Background())
}()

return ingester, nil
}
125 changes: 125 additions & 0 deletions pkg/pipeline/inprocess_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package pipeline

import (
"bufio"
"encoding/json"
"os"
"testing"
"time"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/netobserv-ebpf-agent/pkg/pbflow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestInProcessFLP(t *testing.T) {
pipeline := config.NewPresetIngesterPipeline()
pipeline = pipeline.WriteStdout("writer", api.WriteStdout{Format: "json"})
cfs := config.ConfigFileStruct{
Pipeline: pipeline.GetStages(),
Parameters: pipeline.GetStageParams(),
}
ingester, err := StartFLPInProcess(cfs)
require.NoError(t, err)
defer ingester.Close()

capturedOut, w, _ := os.Pipe()
old := os.Stdout
os.Stdout = w
defer func() {
os.Stdout = old
}()

// yield thread to allow pipe services correctly start
time.Sleep(10 * time.Millisecond)

startTime := time.Now()
endTime := startTime.Add(7 * time.Second)
someDuration := endTime.Sub(startTime)

ingester.Write(&pbflow.Records{
Entries: []*pbflow.Record{{
Interface: "eth0",
EthProtocol: 2048,
Bytes: 456,
Packets: 123,
Direction: pbflow.Direction_EGRESS,
TimeFlowStart: timestamppb.New(startTime),
TimeFlowEnd: timestamppb.New(endTime),
Network: &pbflow.Network{
SrcAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x01020304},
},
DstAddr: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x05060708},
},
Dscp: 1,
},
DataLink: &pbflow.DataLink{
DstMac: 0x112233445566,
SrcMac: 0x010203040506,
},
Transport: &pbflow.Transport{
Protocol: 17,
SrcPort: 23000,
DstPort: 443,
},
AgentIp: &pbflow.IP{
IpFamily: &pbflow.IP_Ipv4{Ipv4: 0x0a0b0c0d},
},
PktDropBytes: 100,
PktDropPackets: 10,
PktDropLatestFlags: 1,
PktDropLatestState: 1,
PktDropLatestDropCause: 8,
DnsLatency: durationpb.New(someDuration),
DnsId: 1,
DnsFlags: 0x80,
DnsErrno: 0,
TimeFlowRtt: durationpb.New(someDuration),
}},
})

scanner := bufio.NewScanner(capturedOut)
require.True(t, scanner.Scan())
capturedRecord := map[string]interface{}{}
bytes := scanner.Bytes()
require.NoError(t, json.Unmarshal(bytes, &capturedRecord), string(bytes))

assert.NotZero(t, capturedRecord["TimeReceived"])
delete(capturedRecord, "TimeReceived")
assert.EqualValues(t, map[string]interface{}{
"FlowDirection": float64(1),
"Bytes": float64(456),
"SrcAddr": "1.2.3.4",
"DstAddr": "5.6.7.8",
"Dscp": float64(1),
"DstMac": "11:22:33:44:55:66",
"SrcMac": "01:02:03:04:05:06",
"SrcPort": float64(23000),
"DstPort": float64(443),
"Duplicate": false,
"Etype": float64(2048),
"Packets": float64(123),
"Proto": float64(17),
"TimeFlowStartMs": float64(startTime.UnixMilli()),
"TimeFlowEndMs": float64(endTime.UnixMilli()),
"Interface": "eth0",
"AgentIP": "10.11.12.13",
"PktDropBytes": float64(100),
"PktDropPackets": float64(10),
"PktDropLatestFlags": float64(1),
"PktDropLatestState": "TCP_ESTABLISHED",
"PktDropLatestDropCause": "SKB_DROP_REASON_NETFILTER_DROP",
"DnsLatencyMs": float64(someDuration.Milliseconds()),
"DnsId": float64(1),
"DnsFlags": float64(0x80),
"DnsErrno": float64(0),
"DnsFlagsResponseCode": "NoError",
"TimeFlowRttNs": float64(someDuration.Nanoseconds()),
}, capturedRecord)
}
23 changes: 15 additions & 8 deletions pkg/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/operational"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/ingest"
"github.com/netobserv/gopipes/pkg/node"
log "github.com/sirupsen/logrus"
)
Expand All @@ -48,18 +49,24 @@ type Pipeline struct {

// NewPipeline defines the pipeline elements
func NewPipeline(cfg *config.ConfigFileStruct) (*Pipeline, error) {
log.Debugf("entering NewPipeline")
return newPipelineFromIngester(cfg, nil)
}

// newPipelineFromIngester defines the pipeline elements from a preset ingester (e.g. for in-process receiver)
func newPipelineFromIngester(cfg *config.ConfigFileStruct, ing ingest.Ingester) (*Pipeline, error) {
log.Debugf("entering newPipelineFromIngester")

stages := cfg.Pipeline
log.Debugf("stages = %v ", stages)
configParams := cfg.Parameters
log.Debugf("configParams = %v ", configParams)
log.Debugf("stages = %v ", cfg.Pipeline)
log.Debugf("configParams = %v ", cfg.Parameters)

build := newBuilder(cfg)
if err := build.readStages(); err != nil {
builder := newBuilder(cfg)
if ing != nil {
builder.presetIngester(ing)
}
if err := builder.readStages(); err != nil {
return nil, err
}
return build.build()
return builder.build()
}

func (p *Pipeline) Run() {
Expand Down
21 changes: 18 additions & 3 deletions pkg/pipeline/pipeline_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ func newBuilder(cfg *config.ConfigFileStruct) *builder {
}
}

// use a preset ingester
func (b *builder) presetIngester(ing ingest.Ingester) {
name := config.PresetIngesterStage
log.Debugf("stage = %v", name)
b.appendEntry(pipelineEntry{
stageName: name,
stageType: StageIngest,
Ingester: ing,
})
}

// read the configuration stages definition and instantiate the corresponding native Go objects
func (b *builder) readStages() error {
for _, param := range b.configParams {
Expand All @@ -124,14 +135,18 @@ func (b *builder) readStages() error {
if err != nil {
return err
}
b.pipelineEntryMap[param.Name] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
b.appendEntry(pEntry)
}
log.Debugf("pipeline = %v", b.pipelineStages)
return nil
}

func (b *builder) appendEntry(pEntry pipelineEntry) {
b.pipelineEntryMap[pEntry.stageName] = &pEntry
b.pipelineStages = append(b.pipelineStages, &pEntry)
log.Debugf("pipeline = %v", b.pipelineStages)
}

// reads the configured Go stages and connects between them
// readStages must be invoked before this
func (b *builder) build() (*Pipeline, error) {
Expand Down
Loading

0 comments on commit 86379eb

Please sign in to comment.