From 00ceb285438d794187ed3f5135ca3e714d07e71d Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 7 Feb 2024 13:29:27 +0200 Subject: [PATCH 01/12] feat: migrate netflow input to v2.Plugin --- .../dockerlogbeat/pipelinemock/pipelines.go | 7 + .../input/default-inputs/inputs_other.go | 2 + x-pack/filebeat/input/netflow/input.go | 351 ++++++++++-------- x-pack/filebeat/input/netflow/input_test.go | 19 +- x-pack/filebeat/input/netflow/netflow_test.go | 133 +++++++ 5 files changed, 356 insertions(+), 156 deletions(-) diff --git a/x-pack/dockerlogbeat/pipelinemock/pipelines.go b/x-pack/dockerlogbeat/pipelinemock/pipelines.go index d9054cf6eb4..4babe94409a 100644 --- a/x-pack/dockerlogbeat/pipelinemock/pipelines.go +++ b/x-pack/dockerlogbeat/pipelinemock/pipelines.go @@ -85,3 +85,10 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er return c, nil } + +func (pc *MockPipelineConnector) GetConnectedClients() []*MockBeatClient { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + return pc.clients +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index d396d4635a1..d0b2b71c6bd 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" "github.com/elastic/elastic-agent-libs/logp" @@ -39,5 +40,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 awscloudwatch.Plugin(), lumberjack.Plugin(), shipper.Plugin(log, store), + netflow.Plugin(log), } } diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 97f9931f325..f34e090889e 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -11,18 +11,19 @@ import ( "sync" "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/go-concert/unison" ) const ( @@ -34,21 +35,76 @@ var ( numDropped = monitoring.NewUint(nil, "filebeat.input.netflow.packets.dropped") numFlows = monitoring.NewUint(nil, "filebeat.input.netflow.flows") aliveInputs atomic.Int - logger *logp.Logger - initLogger sync.Once ) +func Plugin(log *logp.Logger) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "collect and decode packets of netflow protocol", + Manager: &netflowInputManager{ + log: log.Named(inputName), + }, + } +} + +type netflowInputManager struct { + log *logp.Logger +} + +func (im *netflowInputManager) Init(_ unison.Group, _ v2.Mode) error { + return nil +} + +func (im *netflowInputManager) Create(cfg *conf.C) (v2.Input, error) { + inputCfg := defaultConfig + if err := cfg.Unpack(&inputCfg); err != nil { + return nil, err + } + + customFields := make([]fields.FieldDict, len(inputCfg.CustomDefinitions)) + for idx, yamlPath := range inputCfg.CustomDefinitions { + f, err := LoadFieldDefinitionsFromFile(yamlPath) + if err != nil { + return nil, fmt.Errorf("failed parsing custom field definitions from file '%s': %w", yamlPath, err) + } + customFields[idx] = f + } + + dec, err := decoder.NewDecoder(decoder.NewConfig(). + WithProtocols(inputCfg.Protocols...). + WithExpiration(inputCfg.ExpirationTimeout). + WithLogOutput(&logDebugWrapper{Logger: im.log}). + WithCustomFields(customFields...). + WithSequenceResetEnabled(inputCfg.DetectSequenceReset). + WithSharedTemplates(inputCfg.ShareTemplates)) + if err != nil { + return nil, fmt.Errorf("error initializing netflow decoder: %w", err) + } + + input := &netflowInput{ + decoder: dec, + internalNetworks: inputCfg.InternalNetworks, + logger: im.log, + queueSize: inputCfg.PacketQueueSize, + } + + input.udp = udp.New(&inputCfg.Config, input.packetDispatch) + + return input, nil +} + type packet struct { data []byte source net.Addr } type netflowInput struct { - mutex sync.Mutex + mtx sync.Mutex udp *udp.Server decoder *decoder.Decoder - outlet channel.Outleter - forwarder *harvester.Forwarder + client beat.Client internalNetworks []string logger *logp.Logger queueC chan packet @@ -56,11 +112,99 @@ type netflowInput struct { started bool } -func init() { - err := input.Register(inputName, NewInput) +func (n *netflowInput) Name() string { + return inputName +} + +func (n *netflowInput) Test(_ v2.TestContext) error { + return nil +} + +func (n *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { + select { + case n.queueC <- packet{data, metadata.RemoteAddr}: + numPackets.Inc() + default: + numDropped.Inc() + } +} + +func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) error { + n.mtx.Lock() + if n.started { + n.mtx.Unlock() + return nil + } + + n.started = true + n.mtx.Unlock() + + n.logger.Info("Starting netflow input") + + n.logger.Info("Connecting to beat event publishing") + client, err := connector.ConnectWith(beat.ClientConfig{ + PublishMode: beat.DefaultGuarantees, + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: boolPtr(false), + }, + CloseRef: context.Cancelation, + EventListener: nil, + }) + if err != nil { + n.logger.Errorw("Failed connecting to beat event publishing", "error", err) + n.stop() + return err + } + + n.logger.Info("Starting netflow decoder") + if err := n.decoder.Start(); err != nil { + n.logger.Errorw("Failed to start netflow decoder", "error", err) + n.stop() + return err + } + + n.queueC = make(chan packet, n.queueSize) + + n.logger.Info("Starting udp") + err = n.udp.Start() if err != nil { - panic(err) + n.logger.Errorf("Failed to start udp: %v", err) + n.stop() + return err + } + + if aliveInputs.Inc() == 1 && n.logger.IsDebug() { + go n.statsLoop(context.Cancelation.Done()) } + defer aliveInputs.Dec() + + go func() { + <-context.Cancelation.Done() + n.stop() + }() + + for packet := range n.queueC { + flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) + if err != nil { + n.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) + continue + } + + fLen := len(flows) + if fLen == 0 { + continue + } + evs := make([]beat.Event, fLen) + numFlows.Add(uint64(fLen)) + for i, flow := range flows { + evs[i] = toBeatEvent(flow, n.internalNetworks) + } + client.PublishAll(evs) + } + + return nil } // An adapter so that logp.Logger can be used as a log.Logger. @@ -83,171 +227,72 @@ func (w *logDebugWrapper) Write(p []byte) (n int, err error) { return n, nil } -// NewInput creates a new Netflow input -func NewInput( - cfg *conf.C, - connector channel.Connector, - context input.Context, -) (input.Input, error) { - initLogger.Do(func() { - logger = logp.NewLogger(inputName) - }) - out, err := connector.Connect(cfg) - if err != nil { - return nil, err - } +// stop stops the netflow input +func (n *netflowInput) stop() { + n.mtx.Lock() + defer n.mtx.Unlock() - config := defaultConfig - if err = cfg.Unpack(&config); err != nil { - out.Close() - return nil, err + if !n.started { + return } - var customFields []fields.FieldDict - for _, yamlPath := range config.CustomDefinitions { - f, err := LoadFieldDefinitionsFromFile(yamlPath) - if err != nil { - return nil, fmt.Errorf("failed parsing custom field definitions from file '%s': %w", yamlPath, err) - } - customFields = append(customFields, f) + if n.udp != nil { + n.udp.Stop() } - decoder, err := decoder.NewDecoder(decoder.NewConfig(). - WithProtocols(config.Protocols...). - WithExpiration(config.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: logger}). - WithCustomFields(customFields...). - WithSequenceResetEnabled(config.DetectSequenceReset). - WithSharedTemplates(config.ShareTemplates)) - if err != nil { - return nil, fmt.Errorf("error initializing netflow decoder: %w", err) - } - - input := &netflowInput{ - outlet: out, - internalNetworks: config.InternalNetworks, - forwarder: harvester.NewForwarder(out), - decoder: decoder, - logger: logger, - queueSize: config.PacketQueueSize, - } - - input.udp = udp.New(&config.Config, input.packetDispatch) - return input, nil -} - -func (p *netflowInput) Publish(events []beat.Event) error { - for _, evt := range events { - p.forwarder.Send(evt) - } - return nil -} -// Run starts listening for NetFlow events over the network. -func (p *netflowInput) Run() { - p.mutex.Lock() - defer p.mutex.Unlock() - - if !p.started { - logger.Info("Starting UDP input") - - if err := p.decoder.Start(); err != nil { - logger.Errorw("Failed to start netflow decoder", "error", err) - p.outlet.Close() - return - } - - p.queueC = make(chan packet, p.queueSize) - err := p.udp.Start() - if err != nil { - logger.Errorf("Error running harvester: %v", err) - p.outlet.Close() - p.decoder.Stop() - close(p.queueC) - return + if n.decoder != nil { + if err := n.decoder.Stop(); err != nil { + n.logger.Info("Error stopping decoder", "error", err) } + } - go p.recvRoutine() - // Only the first active input launches the stats thread - if aliveInputs.Inc() == 1 && logger.IsDebug() { - go p.statsLoop() + if n.client != nil { + if err := n.client.Close(); err != nil { + n.logger.Info("Error closing beat client", "error", err) } - p.started = true } -} - -// Stop stops the UDP input -func (p *netflowInput) Stop() { - p.mutex.Lock() - defer p.mutex.Unlock() - if p.started { - aliveInputs.Dec() - defer p.outlet.Close() - defer close(p.queueC) - logger.Info("Stopping UDP input") - p.udp.Stop() - p.started = false - } -} + close(n.queueC) -// Wait suspends the UDP input -func (p *netflowInput) Wait() { - p.Stop() + n.started = false } -func (p *netflowInput) statsLoop() { +func (n *netflowInput) statsLoop(done <-chan struct{}) { prevPackets := numPackets.Get() prevFlows := numFlows.Get() prevDropped := numDropped.Get() // The stats thread only monitors queue length for the first input - prevQueue := len(p.queueC) + prevQueue := len(n.queueC) t := time.NewTicker(time.Second) defer t.Stop() - for range t.C { - packets := numPackets.Get() - flows := numFlows.Get() - dropped := numDropped.Get() - queue := len(p.queueC) - if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { - logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", - packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) - prevFlows = flows - prevPackets = packets - prevQueue = queue - prevDropped = dropped - } else { - p.mutex.Lock() + for { + select { + case <-t.C: + packets := numPackets.Get() + flows := numFlows.Get() + dropped := numDropped.Get() + queue := len(n.queueC) + if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { + n.logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", + packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) + prevFlows = flows + prevPackets = packets + prevQueue = queue + prevDropped = dropped + continue + } + + n.mtx.Lock() count := aliveInputs.Load() - p.mutex.Unlock() + n.mtx.Unlock() if count == 0 { - break + return } - } - } -} - -func (p *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { - select { - case p.queueC <- packet{data, metadata.RemoteAddr}: - numPackets.Inc() - default: - numDropped.Inc() - } -} -func (p *netflowInput) recvRoutine() { - for packet := range p.queueC { - flows, err := p.decoder.Read(bytes.NewBuffer(packet.data), packet.source) - if err != nil { - p.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) - } - if n := len(flows); n > 0 { - evs := make([]beat.Event, n) - numFlows.Add(uint64(n)) - for i, flow := range flows { - evs[i] = toBeatEvent(flow, p.internalNetworks) - } - p.Publish(evs) + case <-done: + return } } } + +func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/netflow/input_test.go b/x-pack/filebeat/input/netflow/input_test.go index 506cf48db16..b988b5db87a 100644 --- a/x-pack/filebeat/input/netflow/input_test.go +++ b/x-pack/filebeat/input/netflow/input_test.go @@ -9,11 +9,24 @@ package netflow import ( "testing" - "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/tests/resources" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/stretchr/testify/require" ) func TestNewInputDone(t *testing.T) { - config := mapstr.M{} - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + configMap := conf.MustNewConfigFrom(mapstr.M{}) + + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := conf.NewConfigFrom(configMap) + require.NoError(t, err) + + _, err = Plugin(logp.NewLogger("netflow_test")).Manager.Create(config) + require.NoError(t, err) } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index b567d67bc43..b69e085d788 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -6,6 +6,7 @@ package netflow import ( "bytes" + "context" "encoding/binary" "encoding/json" "flag" @@ -15,17 +16,24 @@ import ( "path/filepath" "strings" "testing" + "time" "github.com/google/gopacket" "github.com/google/gopacket/pcap" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) var ( @@ -60,6 +68,131 @@ type TestResult struct { Flows []beat.Event `json:"events,omitempty"` } +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger("netflow_test"), + ID: "test_id", + Cancelation: ctx, + }, cancel +} + +func TestNetFlow(t *testing.T) { + pcaps, err := filepath.Glob(filepath.Join(pcapDir, "*.pcap")) + if err != nil { + t.Fatal(err) + } + + for _, file := range pcaps { + testName := strings.TrimSuffix(filepath.Base(file), ".pcap") + + t.Run(testName, func(t *testing.T) { + + configMap := conf.MustNewConfigFrom(mapstr.M{}) + pluginCfg, err := conf.NewConfigFrom(configMap) + require.NoError(t, err) + + netflowPlugin, err := Plugin(logp.NewLogger("netflow_test")).Manager.Create(pluginCfg) + require.NoError(t, err) + + mockPipeline := &pipelinemock.MockPipelineConnector{} + + ctx, cancelFn := newV2Context() + errChan := make(chan error) + go func() { + defer close(errChan) + errChan <- netflowPlugin.Run(ctx, mockPipeline) + }() + + defer cancelFn() + + clientSeenTries := 0 + for clientSeenTries < 5 { + if len(mockPipeline.GetConnectedClients()) > 0 { + break + } + time.Sleep(1 * time.Second) + clientSeenTries++ + continue + } + + if clientSeenTries == 5 { + t.Fatal("client did not connect to pipeline") + return + } + + udpAddr, err := net.ResolveUDPAddr("udp", defaultConfig.Config.Host) + require.NoError(t, err) + + conn, err := net.DialUDP("udp", nil, udpAddr) + require.NoError(t, err) + + f, err := pcap.OpenOffline(file) + require.NoError(t, err) + defer f.Close() + + goldenData := readGoldenFile(t, filepath.Join(goldenDir, testName+".pcap.golden.json")) + + // Process packets in PCAP and get flow records. + var totalBytes, totalPackets int + packetSource := gopacket.NewPacketSource(f, f.LinkType()) + for pkt := range packetSource.Packets() { + payloadData := pkt.TransportLayer().LayerPayload() + + n, err := conn.Write(payloadData) + require.NoError(t, err) + totalBytes += n + totalPackets++ + } + + publishedEventsTries := 0 + for publishedEventsTries < 10 { + if len(mockPipeline.GetAllEvents()) == len(goldenData.Flows) { + break + } + time.Sleep(1 * time.Second) + publishedEventsTries++ + continue + } + + if publishedEventsTries == 10 { + t.Fatal("did not see expected events published in pipeline") + return + } + + for _, event := range goldenData.Flows { + // fields that cannot be matched at runtime + _ = event.Delete("netflow.exporter.address") + _ = event.Delete("event.created") + _ = event.Delete("observer.ip") + } + + publishedEvents := mockPipeline.GetAllEvents() + for _, event := range publishedEvents { + // fields that cannot be matched at runtime + _ = event.Delete("netflow.exporter.address") + _ = event.Delete("event.created") + _ = event.Delete("observer.ip") + } + + require.EqualValues(t, goldenData, normalize(t, TestResult{ + Name: goldenData.Name, + Error: "", + Flows: publishedEvents, + })) + + cancelFn() + select { + case err := <-errChan: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("netflow plugin did not stop") + } + + }) + } +} + func TestPCAPFiles(t *testing.T) { pcaps, err := filepath.Glob(filepath.Join(pcapDir, "*.pcap")) if err != nil { From eb59d6519e62f6edd5b9f61f976d46b732d2492c Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 7 Feb 2024 13:29:27 +0200 Subject: [PATCH 02/12] fix: replace deprecated ioutil usage --- x-pack/dockerlogbeat/pipelinemock/reader.go | 3 +-- x-pack/filebeat/input/netflow/definitions.go | 4 ++-- x-pack/filebeat/input/netflow/netflow_test.go | 13 ++++++------- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/x-pack/dockerlogbeat/pipelinemock/reader.go b/x-pack/dockerlogbeat/pipelinemock/reader.go index 263e1ff2f78..ff51cc5fbb1 100644 --- a/x-pack/dockerlogbeat/pipelinemock/reader.go +++ b/x-pack/dockerlogbeat/pipelinemock/reader.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "io/ioutil" "testing" "github.com/docker/docker/api/types/plugins/logdriver" @@ -33,7 +32,7 @@ func CreateTestInputFromLine(t *testing.T, line string) io.ReadCloser { writer := new(bytes.Buffer) encodeLog(t, writer, exampleStruct) - return ioutil.NopCloser(writer) + return io.NopCloser(writer) } func encodeLog(t *testing.T, out io.Writer, entry *logdriver.LogEntry) { diff --git a/x-pack/filebeat/input/netflow/definitions.go b/x-pack/filebeat/input/netflow/definitions.go index 62e6dc3f1b0..a5ed094d8ff 100644 --- a/x-pack/filebeat/input/netflow/definitions.go +++ b/x-pack/filebeat/input/netflow/definitions.go @@ -7,7 +7,7 @@ package netflow import ( "errors" "fmt" - "io/ioutil" + "io" "math" "os" "strconv" @@ -95,7 +95,7 @@ func LoadFieldDefinitionsFromFile(path string) (defs fields.FieldDict, err error return nil, err } defer file.Close() - contents, err := ioutil.ReadAll(file) + contents, err := io.ReadAll(file) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index b69e085d788..f14cce33e55 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -10,7 +10,6 @@ import ( "encoding/binary" "encoding/json" "flag" - "io/ioutil" "net" "os" "path/filepath" @@ -216,7 +215,7 @@ func TestPCAPFiles(t *testing.T) { t.Fatal(err) } - err = ioutil.WriteFile(goldenName, data, 0o644) + err = os.WriteFile(goldenName, data, 0o644) if err != nil { t.Fatal(err) } @@ -248,7 +247,7 @@ func TestDatFiles(t *testing.T) { t.Fatal(err) } - err = ioutil.WriteFile(goldenName, data, 0o644) + err = os.WriteFile(goldenName, data, 0o644) if err != nil { t.Fatal(err) } @@ -274,7 +273,7 @@ func TestDatFiles(t *testing.T) { } func readDatTests(t testing.TB) *DatTests { - data, err := ioutil.ReadFile("testdata/dat_tests.yaml") + data, err := os.ReadFile("testdata/dat_tests.yaml") if err != nil { t.Fatal(err) } @@ -312,7 +311,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { source := test.MakeAddress(t, datSourceIP+":4444") var events []beat.Event for _, f := range testCase.Files { - dat, err := ioutil.ReadFile(filepath.Join(datDir, f)) + dat, err := os.ReadFile(filepath.Join(datDir, f)) if err != nil { t.Fatal(err) } @@ -401,7 +400,7 @@ func normalize(t testing.TB, result TestResult) TestResult { } func readGoldenFile(t testing.TB, file string) TestResult { - data, err := ioutil.ReadFile(file) + data, err := os.ReadFile(file) if err != nil { t.Fatal(err) } @@ -476,7 +475,7 @@ func TestReverseFlows(t *testing.T) { }, } - var evs []beat.Event + evs := make([]beat.Event, 0, len(flows)) for _, f := range flows { evs = append(evs, toBeatEvent(f, []string{"private"})) } From c51ba25781d935d437a3f767f7f8a92ed790f264 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 7 Feb 2024 13:29:27 +0200 Subject: [PATCH 03/12] fix: remove unnecessary type conversions --- x-pack/filebeat/input/netflow/case.go | 3 +-- x-pack/filebeat/input/netflow/definitions.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/netflow/case.go b/x-pack/filebeat/input/netflow/case.go index 62e2a9aeeff..6e2f74f2c4c 100644 --- a/x-pack/filebeat/input/netflow/case.go +++ b/x-pack/filebeat/input/netflow/case.go @@ -60,9 +60,8 @@ func CamelCaseToSnakeCase(in string) string { } out := make([]rune, 0, len(in)+4) - runes := []rune(in) upperCount := 1 - for _, r := range runes { + for _, r := range in { lr := unicode.ToLower(r) isUpper := lr != r if isUpper { diff --git a/x-pack/filebeat/input/netflow/definitions.go b/x-pack/filebeat/input/netflow/definitions.go index a5ed094d8ff..bc00ed5d2fc 100644 --- a/x-pack/filebeat/input/netflow/definitions.go +++ b/x-pack/filebeat/input/netflow/definitions.go @@ -169,7 +169,7 @@ func loadFields(def map[interface{}]interface{}, pem uint32, dest fields.FieldDi return fmt.Errorf("bad field ID %d: should have two items (type, name) or one (:skip) (Got %+v)", fieldID, list) } key := fields.Key{ - EnterpriseID: uint32(pem), + EnterpriseID: pem, FieldID: uint16(fieldID), } if _, exists := dest[key]; exists { From 59a57f90dc18880224a4fc96c1fb60af50b166f6 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Wed, 7 Feb 2024 13:46:06 +0200 Subject: [PATCH 04/12] doc: update CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5389ca6551b..eb570a0d4e6 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -24,6 +24,9 @@ fields added to events containing the Beats version. {pull}37553[37553] *Filebeat* +- Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] + + *Heartbeat* From 9b64dca467372e2b71ef562bce41940b0d016202 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 14:58:35 +0200 Subject: [PATCH 05/12] ci: install missing libpcap-dev for linux lint step --- .github/workflows/golangci-lint.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 140b9ce302e..f97bb2029a7 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -32,6 +32,9 @@ jobs: with: go-version-file: .go-version + - name: Install Apt Package + run: sudo apt-get update && sudo apt-get install -y libpcap-dev + - name: golangci-lint env: GOOS: ${{ matrix.GOOS }} From 86f40dc545bd326dbb4e056cdaecb824517027bc Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:47:31 +0200 Subject: [PATCH 06/12] fix: utilise require.Eventually to wait for conditions to become true in netflow_test.go --- .../dockerlogbeat/pipelinemock/pipelines.go | 5 +-- x-pack/filebeat/input/netflow/netflow_test.go | 34 ++++--------------- 2 files changed, 9 insertions(+), 30 deletions(-) diff --git a/x-pack/dockerlogbeat/pipelinemock/pipelines.go b/x-pack/dockerlogbeat/pipelinemock/pipelines.go index 4babe94409a..04c7972e8ee 100644 --- a/x-pack/dockerlogbeat/pipelinemock/pipelines.go +++ b/x-pack/dockerlogbeat/pipelinemock/pipelines.go @@ -86,9 +86,10 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er return c, nil } -func (pc *MockPipelineConnector) GetConnectedClients() []*MockBeatClient { +// HasConnectedClients returns true if there are clients connected. +func (pc *MockPipelineConnector) HasConnectedClients() bool { pc.mtx.Lock() defer pc.mtx.Unlock() - return pc.clients + return len(pc.clients) > 0 } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index f14cce33e55..12d10707e1b 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -105,20 +105,8 @@ func TestNetFlow(t *testing.T) { defer cancelFn() - clientSeenTries := 0 - for clientSeenTries < 5 { - if len(mockPipeline.GetConnectedClients()) > 0 { - break - } - time.Sleep(1 * time.Second) - clientSeenTries++ - continue - } - - if clientSeenTries == 5 { - t.Fatal("client did not connect to pipeline") - return - } + require.Eventually(t, mockPipeline.HasConnectedClients, 5*time.Second, 100*time.Millisecond, + "client connects to pipeline") udpAddr, err := net.ResolveUDPAddr("udp", defaultConfig.Config.Host) require.NoError(t, err) @@ -144,20 +132,10 @@ func TestNetFlow(t *testing.T) { totalPackets++ } - publishedEventsTries := 0 - for publishedEventsTries < 10 { - if len(mockPipeline.GetAllEvents()) == len(goldenData.Flows) { - break - } - time.Sleep(1 * time.Second) - publishedEventsTries++ - continue - } - - if publishedEventsTries == 10 { - t.Fatal("did not see expected events published in pipeline") - return - } + require.Eventually(t, func() bool { + return len(mockPipeline.GetAllEvents()) == len(goldenData.Flows) + }, 5*time.Second, 100*time.Millisecond, + "see all expected events in pipeline") for _, event := range goldenData.Flows { // fields that cannot be matched at runtime From 63be27468899783c81bfa120d0488990a2654228 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:48:40 +0200 Subject: [PATCH 07/12] fix: proper log verbosity for errors during stopping netflow input --- x-pack/filebeat/input/netflow/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index f34e090889e..e1885b24fd6 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -242,13 +242,13 @@ func (n *netflowInput) stop() { if n.decoder != nil { if err := n.decoder.Stop(); err != nil { - n.logger.Info("Error stopping decoder", "error", err) + n.logger.Errorw("Error stopping decoder", "error", err) } } if n.client != nil { if err := n.client.Close(); err != nil { - n.logger.Info("Error closing beat client", "error", err) + n.logger.Errorw("Error closing beat client", "error", err) } } From 9449fb141b9b99a84882aa846c844262b2da222b Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:49:49 +0200 Subject: [PATCH 08/12] fix: remove redundant config creation --- x-pack/filebeat/input/netflow/input_test.go | 3 +-- x-pack/filebeat/input/netflow/netflow_test.go | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/x-pack/filebeat/input/netflow/input_test.go b/x-pack/filebeat/input/netflow/input_test.go index b988b5db87a..2f78cf83c7e 100644 --- a/x-pack/filebeat/input/netflow/input_test.go +++ b/x-pack/filebeat/input/netflow/input_test.go @@ -19,12 +19,11 @@ import ( ) func TestNewInputDone(t *testing.T) { - configMap := conf.MustNewConfigFrom(mapstr.M{}) goroutines := resources.NewGoroutinesChecker() defer goroutines.Check(t) - config, err := conf.NewConfigFrom(configMap) + config, err := conf.NewConfigFrom(mapstr.M{}) require.NoError(t, err) _, err = Plugin(logp.NewLogger("netflow_test")).Manager.Create(config) diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index 12d10707e1b..6a8ff5b1324 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -87,8 +87,7 @@ func TestNetFlow(t *testing.T) { t.Run(testName, func(t *testing.T) { - configMap := conf.MustNewConfigFrom(mapstr.M{}) - pluginCfg, err := conf.NewConfigFrom(configMap) + pluginCfg, err := conf.NewConfigFrom(mapstr.M{}) require.NoError(t, err) netflowPlugin, err := Plugin(logp.NewLogger("netflow_test")).Manager.Create(pluginCfg) From d34cac25e4620a847d407b80b68381f6c28204b3 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:53:30 +0200 Subject: [PATCH 09/12] fix: utilise a std context in statsLoop --- x-pack/filebeat/input/netflow/input.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index e1885b24fd6..763c6870c33 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -6,6 +6,7 @@ package netflow import ( "bytes" + "context" "fmt" "net" "sync" @@ -23,6 +24,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/go-concert/ctxtool" "github.com/elastic/go-concert/unison" ) @@ -176,7 +178,7 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) } if aliveInputs.Inc() == 1 && n.logger.IsDebug() { - go n.statsLoop(context.Cancelation.Done()) + go n.statsLoop(ctxtool.FromCanceller(context.Cancelation)) } defer aliveInputs.Dec() @@ -257,7 +259,7 @@ func (n *netflowInput) stop() { n.started = false } -func (n *netflowInput) statsLoop(done <-chan struct{}) { +func (n *netflowInput) statsLoop(ctx context.Context) { prevPackets := numPackets.Get() prevFlows := numFlows.Get() prevDropped := numDropped.Get() @@ -289,7 +291,7 @@ func (n *netflowInput) statsLoop(done <-chan struct{}) { return } - case <-done: + case <-ctx.Done(): return } } From f64f6974d531865b4b82cb11199127f76b030b88 Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:56:10 +0200 Subject: [PATCH 10/12] fix: add server in udp regarding log messages --- x-pack/filebeat/input/netflow/input.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 763c6870c33..addd3d39c25 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -169,10 +169,10 @@ func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) n.queueC = make(chan packet, n.queueSize) - n.logger.Info("Starting udp") + n.logger.Info("Starting udp server") err = n.udp.Start() if err != nil { - n.logger.Errorf("Failed to start udp: %v", err) + n.logger.Errorf("Failed to start udp server: %v", err) n.stop() return err } From a06c8c726ae67cfe3de9799a2d8aee9b68f61f5a Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 17:57:47 +0200 Subject: [PATCH 11/12] fix: remove redundant empty line in CHANGELOG.next.asciidoc --- CHANGELOG.next.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eb570a0d4e6..ace4835d288 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -27,7 +27,6 @@ fields added to events containing the Beats version. {pull}37553[37553] - Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] - *Heartbeat* *Metricbeat* From 1e5e4d11cbd2c5ee687b353edea5a64ba9c5f22d Mon Sep 17 00:00:00 2001 From: Panos Koutsovasilis Date: Fri, 9 Feb 2024 21:14:43 +0200 Subject: [PATCH 12/12] fix: enrich more the messages of eventual condition in netflow tests --- x-pack/filebeat/input/netflow/netflow_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index 6a8ff5b1324..a7cf3038173 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -105,7 +105,7 @@ func TestNetFlow(t *testing.T) { defer cancelFn() require.Eventually(t, mockPipeline.HasConnectedClients, 5*time.Second, 100*time.Millisecond, - "client connects to pipeline") + "no client has connected to the pipeline") udpAddr, err := net.ResolveUDPAddr("udp", defaultConfig.Config.Host) require.NoError(t, err) @@ -134,7 +134,7 @@ func TestNetFlow(t *testing.T) { require.Eventually(t, func() bool { return len(mockPipeline.GetAllEvents()) == len(goldenData.Flows) }, 5*time.Second, 100*time.Millisecond, - "see all expected events in pipeline") + "got a different number of events than expected") for _, event := range goldenData.Flows { // fields that cannot be matched at runtime