diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 140b9ce302e..f97bb2029a7 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -32,6 +32,9 @@ jobs: with: go-version-file: .go-version + - name: Install Apt Package + run: sudo apt-get update && sudo apt-get install -y libpcap-dev + - name: golangci-lint env: GOOS: ${{ matrix.GOOS }} diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 48497df4957..0c3cb842acd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -24,6 +24,8 @@ fields added to events containing the Beats version. {pull}37553[37553] *Filebeat* +- Convert netflow input to API v2 and disable event normalisation {pull}37901[37901] + *Heartbeat* diff --git a/x-pack/dockerlogbeat/pipelinemock/pipelines.go b/x-pack/dockerlogbeat/pipelinemock/pipelines.go index d9054cf6eb4..04c7972e8ee 100644 --- a/x-pack/dockerlogbeat/pipelinemock/pipelines.go +++ b/x-pack/dockerlogbeat/pipelinemock/pipelines.go @@ -85,3 +85,11 @@ func (pc *MockPipelineConnector) ConnectWith(beat.ClientConfig) (beat.Client, er return c, nil } + +// HasConnectedClients returns true if there are clients connected. +func (pc *MockPipelineConnector) HasConnectedClients() bool { + pc.mtx.Lock() + defer pc.mtx.Unlock() + + return len(pc.clients) > 0 +} diff --git a/x-pack/dockerlogbeat/pipelinemock/reader.go b/x-pack/dockerlogbeat/pipelinemock/reader.go index 263e1ff2f78..ff51cc5fbb1 100644 --- a/x-pack/dockerlogbeat/pipelinemock/reader.go +++ b/x-pack/dockerlogbeat/pipelinemock/reader.go @@ -8,7 +8,6 @@ import ( "bytes" "encoding/binary" "io" - "io/ioutil" "testing" "github.com/docker/docker/api/types/plugins/logdriver" @@ -33,7 +32,7 @@ func CreateTestInputFromLine(t *testing.T, line string) io.ReadCloser { writer := new(bytes.Buffer) encodeLog(t, writer, exampleStruct) - return ioutil.NopCloser(writer) + return io.NopCloser(writer) } func encodeLog(t *testing.T, out io.Writer, entry *logdriver.LogEntry) { diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index 5b55cecc56e..d0725a23dd9 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -20,6 +20,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" "github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson" "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" + "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" "github.com/elastic/beats/v7/x-pack/filebeat/input/websocket" @@ -41,5 +42,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 lumberjack.Plugin(), shipper.Plugin(log, store), websocket.Plugin(log, store), + netflow.Plugin(log), } } diff --git a/x-pack/filebeat/input/netflow/case.go b/x-pack/filebeat/input/netflow/case.go index 62e2a9aeeff..6e2f74f2c4c 100644 --- a/x-pack/filebeat/input/netflow/case.go +++ b/x-pack/filebeat/input/netflow/case.go @@ -60,9 +60,8 @@ func CamelCaseToSnakeCase(in string) string { } out := make([]rune, 0, len(in)+4) - runes := []rune(in) upperCount := 1 - for _, r := range runes { + for _, r := range in { lr := unicode.ToLower(r) isUpper := lr != r if isUpper { diff --git a/x-pack/filebeat/input/netflow/definitions.go b/x-pack/filebeat/input/netflow/definitions.go index 62e6dc3f1b0..bc00ed5d2fc 100644 --- a/x-pack/filebeat/input/netflow/definitions.go +++ b/x-pack/filebeat/input/netflow/definitions.go @@ -7,7 +7,7 @@ package netflow import ( "errors" "fmt" - "io/ioutil" + "io" "math" "os" "strconv" @@ -95,7 +95,7 @@ func LoadFieldDefinitionsFromFile(path string) (defs fields.FieldDict, err error return nil, err } defer file.Close() - contents, err := ioutil.ReadAll(file) + contents, err := io.ReadAll(file) if err != nil { return nil, err } @@ -169,7 +169,7 @@ func loadFields(def map[interface{}]interface{}, pem uint32, dest fields.FieldDi return fmt.Errorf("bad field ID %d: should have two items (type, name) or one (:skip) (Got %+v)", fieldID, list) } key := fields.Key{ - EnterpriseID: uint32(pem), + EnterpriseID: pem, FieldID: uint16(fieldID), } if _, exists := dest[key]; exists { diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index 97f9931f325..addd3d39c25 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -6,23 +6,26 @@ package netflow import ( "bytes" + "context" "fmt" "net" "sync" "time" - "github.com/elastic/beats/v7/filebeat/channel" - "github.com/elastic/beats/v7/filebeat/harvester" - "github.com/elastic/beats/v7/filebeat/input" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/filebeat/inputsource" "github.com/elastic/beats/v7/filebeat/inputsource/udp" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/go-concert/unison" ) const ( @@ -34,21 +37,76 @@ var ( numDropped = monitoring.NewUint(nil, "filebeat.input.netflow.packets.dropped") numFlows = monitoring.NewUint(nil, "filebeat.input.netflow.flows") aliveInputs atomic.Int - logger *logp.Logger - initLogger sync.Once ) +func Plugin(log *logp.Logger) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Stable, + Deprecated: false, + Info: "collect and decode packets of netflow protocol", + Manager: &netflowInputManager{ + log: log.Named(inputName), + }, + } +} + +type netflowInputManager struct { + log *logp.Logger +} + +func (im *netflowInputManager) Init(_ unison.Group, _ v2.Mode) error { + return nil +} + +func (im *netflowInputManager) Create(cfg *conf.C) (v2.Input, error) { + inputCfg := defaultConfig + if err := cfg.Unpack(&inputCfg); err != nil { + return nil, err + } + + customFields := make([]fields.FieldDict, len(inputCfg.CustomDefinitions)) + for idx, yamlPath := range inputCfg.CustomDefinitions { + f, err := LoadFieldDefinitionsFromFile(yamlPath) + if err != nil { + return nil, fmt.Errorf("failed parsing custom field definitions from file '%s': %w", yamlPath, err) + } + customFields[idx] = f + } + + dec, err := decoder.NewDecoder(decoder.NewConfig(). + WithProtocols(inputCfg.Protocols...). + WithExpiration(inputCfg.ExpirationTimeout). + WithLogOutput(&logDebugWrapper{Logger: im.log}). + WithCustomFields(customFields...). + WithSequenceResetEnabled(inputCfg.DetectSequenceReset). + WithSharedTemplates(inputCfg.ShareTemplates)) + if err != nil { + return nil, fmt.Errorf("error initializing netflow decoder: %w", err) + } + + input := &netflowInput{ + decoder: dec, + internalNetworks: inputCfg.InternalNetworks, + logger: im.log, + queueSize: inputCfg.PacketQueueSize, + } + + input.udp = udp.New(&inputCfg.Config, input.packetDispatch) + + return input, nil +} + type packet struct { data []byte source net.Addr } type netflowInput struct { - mutex sync.Mutex + mtx sync.Mutex udp *udp.Server decoder *decoder.Decoder - outlet channel.Outleter - forwarder *harvester.Forwarder + client beat.Client internalNetworks []string logger *logp.Logger queueC chan packet @@ -56,11 +114,99 @@ type netflowInput struct { started bool } -func init() { - err := input.Register(inputName, NewInput) +func (n *netflowInput) Name() string { + return inputName +} + +func (n *netflowInput) Test(_ v2.TestContext) error { + return nil +} + +func (n *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { + select { + case n.queueC <- packet{data, metadata.RemoteAddr}: + numPackets.Inc() + default: + numDropped.Inc() + } +} + +func (n *netflowInput) Run(context v2.Context, connector beat.PipelineConnector) error { + n.mtx.Lock() + if n.started { + n.mtx.Unlock() + return nil + } + + n.started = true + n.mtx.Unlock() + + n.logger.Info("Starting netflow input") + + n.logger.Info("Connecting to beat event publishing") + client, err := connector.ConnectWith(beat.ClientConfig{ + PublishMode: beat.DefaultGuarantees, + Processing: beat.ProcessingConfig{ + // This input only produces events with basic types so normalization + // is not required. + EventNormalization: boolPtr(false), + }, + CloseRef: context.Cancelation, + EventListener: nil, + }) + if err != nil { + n.logger.Errorw("Failed connecting to beat event publishing", "error", err) + n.stop() + return err + } + + n.logger.Info("Starting netflow decoder") + if err := n.decoder.Start(); err != nil { + n.logger.Errorw("Failed to start netflow decoder", "error", err) + n.stop() + return err + } + + n.queueC = make(chan packet, n.queueSize) + + n.logger.Info("Starting udp server") + err = n.udp.Start() if err != nil { - panic(err) + n.logger.Errorf("Failed to start udp server: %v", err) + n.stop() + return err + } + + if aliveInputs.Inc() == 1 && n.logger.IsDebug() { + go n.statsLoop(ctxtool.FromCanceller(context.Cancelation)) } + defer aliveInputs.Dec() + + go func() { + <-context.Cancelation.Done() + n.stop() + }() + + for packet := range n.queueC { + flows, err := n.decoder.Read(bytes.NewBuffer(packet.data), packet.source) + if err != nil { + n.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) + continue + } + + fLen := len(flows) + if fLen == 0 { + continue + } + evs := make([]beat.Event, fLen) + numFlows.Add(uint64(fLen)) + for i, flow := range flows { + evs[i] = toBeatEvent(flow, n.internalNetworks) + } + client.PublishAll(evs) + } + + return nil } // An adapter so that logp.Logger can be used as a log.Logger. @@ -83,171 +229,72 @@ func (w *logDebugWrapper) Write(p []byte) (n int, err error) { return n, nil } -// NewInput creates a new Netflow input -func NewInput( - cfg *conf.C, - connector channel.Connector, - context input.Context, -) (input.Input, error) { - initLogger.Do(func() { - logger = logp.NewLogger(inputName) - }) - out, err := connector.Connect(cfg) - if err != nil { - return nil, err - } +// stop stops the netflow input +func (n *netflowInput) stop() { + n.mtx.Lock() + defer n.mtx.Unlock() - config := defaultConfig - if err = cfg.Unpack(&config); err != nil { - out.Close() - return nil, err + if !n.started { + return } - var customFields []fields.FieldDict - for _, yamlPath := range config.CustomDefinitions { - f, err := LoadFieldDefinitionsFromFile(yamlPath) - if err != nil { - return nil, fmt.Errorf("failed parsing custom field definitions from file '%s': %w", yamlPath, err) - } - customFields = append(customFields, f) + if n.udp != nil { + n.udp.Stop() } - decoder, err := decoder.NewDecoder(decoder.NewConfig(). - WithProtocols(config.Protocols...). - WithExpiration(config.ExpirationTimeout). - WithLogOutput(&logDebugWrapper{Logger: logger}). - WithCustomFields(customFields...). - WithSequenceResetEnabled(config.DetectSequenceReset). - WithSharedTemplates(config.ShareTemplates)) - if err != nil { - return nil, fmt.Errorf("error initializing netflow decoder: %w", err) - } - - input := &netflowInput{ - outlet: out, - internalNetworks: config.InternalNetworks, - forwarder: harvester.NewForwarder(out), - decoder: decoder, - logger: logger, - queueSize: config.PacketQueueSize, - } - - input.udp = udp.New(&config.Config, input.packetDispatch) - return input, nil -} - -func (p *netflowInput) Publish(events []beat.Event) error { - for _, evt := range events { - p.forwarder.Send(evt) - } - return nil -} -// Run starts listening for NetFlow events over the network. -func (p *netflowInput) Run() { - p.mutex.Lock() - defer p.mutex.Unlock() - - if !p.started { - logger.Info("Starting UDP input") - - if err := p.decoder.Start(); err != nil { - logger.Errorw("Failed to start netflow decoder", "error", err) - p.outlet.Close() - return - } - - p.queueC = make(chan packet, p.queueSize) - err := p.udp.Start() - if err != nil { - logger.Errorf("Error running harvester: %v", err) - p.outlet.Close() - p.decoder.Stop() - close(p.queueC) - return + if n.decoder != nil { + if err := n.decoder.Stop(); err != nil { + n.logger.Errorw("Error stopping decoder", "error", err) } + } - go p.recvRoutine() - // Only the first active input launches the stats thread - if aliveInputs.Inc() == 1 && logger.IsDebug() { - go p.statsLoop() + if n.client != nil { + if err := n.client.Close(); err != nil { + n.logger.Errorw("Error closing beat client", "error", err) } - p.started = true } -} - -// Stop stops the UDP input -func (p *netflowInput) Stop() { - p.mutex.Lock() - defer p.mutex.Unlock() - if p.started { - aliveInputs.Dec() - defer p.outlet.Close() - defer close(p.queueC) - logger.Info("Stopping UDP input") - p.udp.Stop() - p.started = false - } -} + close(n.queueC) -// Wait suspends the UDP input -func (p *netflowInput) Wait() { - p.Stop() + n.started = false } -func (p *netflowInput) statsLoop() { +func (n *netflowInput) statsLoop(ctx context.Context) { prevPackets := numPackets.Get() prevFlows := numFlows.Get() prevDropped := numDropped.Get() // The stats thread only monitors queue length for the first input - prevQueue := len(p.queueC) + prevQueue := len(n.queueC) t := time.NewTicker(time.Second) defer t.Stop() - for range t.C { - packets := numPackets.Get() - flows := numFlows.Get() - dropped := numDropped.Get() - queue := len(p.queueC) - if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { - logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", - packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) - prevFlows = flows - prevPackets = packets - prevQueue = queue - prevDropped = dropped - } else { - p.mutex.Lock() + for { + select { + case <-t.C: + packets := numPackets.Get() + flows := numFlows.Get() + dropped := numDropped.Get() + queue := len(n.queueC) + if packets > prevPackets || flows > prevFlows || dropped > prevDropped || queue > prevQueue { + n.logger.Debugf("Stats total:[ packets=%d dropped=%d flows=%d queue_len=%d ] delta:[ packets/s=%d dropped/s=%d flows/s=%d queue_len/s=%+d ]", + packets, dropped, flows, queue, packets-prevPackets, dropped-prevDropped, flows-prevFlows, queue-prevQueue) + prevFlows = flows + prevPackets = packets + prevQueue = queue + prevDropped = dropped + continue + } + + n.mtx.Lock() count := aliveInputs.Load() - p.mutex.Unlock() + n.mtx.Unlock() if count == 0 { - break + return } - } - } -} - -func (p *netflowInput) packetDispatch(data []byte, metadata inputsource.NetworkMetadata) { - select { - case p.queueC <- packet{data, metadata.RemoteAddr}: - numPackets.Inc() - default: - numDropped.Inc() - } -} -func (p *netflowInput) recvRoutine() { - for packet := range p.queueC { - flows, err := p.decoder.Read(bytes.NewBuffer(packet.data), packet.source) - if err != nil { - p.logger.Warnf("Error parsing NetFlow packet of length %d from %s: %v", len(packet.data), packet.source, err) - } - if n := len(flows); n > 0 { - evs := make([]beat.Event, n) - numFlows.Add(uint64(n)) - for i, flow := range flows { - evs[i] = toBeatEvent(flow, p.internalNetworks) - } - p.Publish(evs) + case <-ctx.Done(): + return } } } + +func boolPtr(b bool) *bool { return &b } diff --git a/x-pack/filebeat/input/netflow/input_test.go b/x-pack/filebeat/input/netflow/input_test.go index 506cf48db16..2f78cf83c7e 100644 --- a/x-pack/filebeat/input/netflow/input_test.go +++ b/x-pack/filebeat/input/netflow/input_test.go @@ -9,11 +9,23 @@ package netflow import ( "testing" - "github.com/elastic/beats/v7/filebeat/input/inputtest" + "github.com/elastic/beats/v7/libbeat/tests/resources" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/stretchr/testify/require" ) func TestNewInputDone(t *testing.T) { - config := mapstr.M{} - inputtest.AssertNotStartedInputCanBeDone(t, NewInput, &config) + + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + config, err := conf.NewConfigFrom(mapstr.M{}) + require.NoError(t, err) + + _, err = Plugin(logp.NewLogger("netflow_test")).Manager.Create(config) + require.NoError(t, err) } diff --git a/x-pack/filebeat/input/netflow/netflow_test.go b/x-pack/filebeat/input/netflow/netflow_test.go index b567d67bc43..a7cf3038173 100644 --- a/x-pack/filebeat/input/netflow/netflow_test.go +++ b/x-pack/filebeat/input/netflow/netflow_test.go @@ -6,26 +6,33 @@ package netflow import ( "bytes" + "context" "encoding/binary" "encoding/json" "flag" - "io/ioutil" "net" "os" "path/filepath" "strings" "testing" + "time" "github.com/google/gopacket" "github.com/google/gopacket/pcap" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "gopkg.in/yaml.v2" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/x-pack/dockerlogbeat/pipelinemock" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record" "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" ) var ( @@ -60,6 +67,108 @@ type TestResult struct { Flows []beat.Event `json:"events,omitempty"` } +func newV2Context() (v2.Context, func()) { + ctx, cancel := context.WithCancel(context.Background()) + return v2.Context{ + Logger: logp.NewLogger("netflow_test"), + ID: "test_id", + Cancelation: ctx, + }, cancel +} + +func TestNetFlow(t *testing.T) { + pcaps, err := filepath.Glob(filepath.Join(pcapDir, "*.pcap")) + if err != nil { + t.Fatal(err) + } + + for _, file := range pcaps { + testName := strings.TrimSuffix(filepath.Base(file), ".pcap") + + t.Run(testName, func(t *testing.T) { + + pluginCfg, err := conf.NewConfigFrom(mapstr.M{}) + require.NoError(t, err) + + netflowPlugin, err := Plugin(logp.NewLogger("netflow_test")).Manager.Create(pluginCfg) + require.NoError(t, err) + + mockPipeline := &pipelinemock.MockPipelineConnector{} + + ctx, cancelFn := newV2Context() + errChan := make(chan error) + go func() { + defer close(errChan) + errChan <- netflowPlugin.Run(ctx, mockPipeline) + }() + + defer cancelFn() + + require.Eventually(t, mockPipeline.HasConnectedClients, 5*time.Second, 100*time.Millisecond, + "no client has connected to the pipeline") + + udpAddr, err := net.ResolveUDPAddr("udp", defaultConfig.Config.Host) + require.NoError(t, err) + + conn, err := net.DialUDP("udp", nil, udpAddr) + require.NoError(t, err) + + f, err := pcap.OpenOffline(file) + require.NoError(t, err) + defer f.Close() + + goldenData := readGoldenFile(t, filepath.Join(goldenDir, testName+".pcap.golden.json")) + + // Process packets in PCAP and get flow records. + var totalBytes, totalPackets int + packetSource := gopacket.NewPacketSource(f, f.LinkType()) + for pkt := range packetSource.Packets() { + payloadData := pkt.TransportLayer().LayerPayload() + + n, err := conn.Write(payloadData) + require.NoError(t, err) + totalBytes += n + totalPackets++ + } + + require.Eventually(t, func() bool { + return len(mockPipeline.GetAllEvents()) == len(goldenData.Flows) + }, 5*time.Second, 100*time.Millisecond, + "got a different number of events than expected") + + for _, event := range goldenData.Flows { + // fields that cannot be matched at runtime + _ = event.Delete("netflow.exporter.address") + _ = event.Delete("event.created") + _ = event.Delete("observer.ip") + } + + publishedEvents := mockPipeline.GetAllEvents() + for _, event := range publishedEvents { + // fields that cannot be matched at runtime + _ = event.Delete("netflow.exporter.address") + _ = event.Delete("event.created") + _ = event.Delete("observer.ip") + } + + require.EqualValues(t, goldenData, normalize(t, TestResult{ + Name: goldenData.Name, + Error: "", + Flows: publishedEvents, + })) + + cancelFn() + select { + case err := <-errChan: + require.NoError(t, err) + case <-time.After(5 * time.Second): + t.Fatal("netflow plugin did not stop") + } + + }) + } +} + func TestPCAPFiles(t *testing.T) { pcaps, err := filepath.Glob(filepath.Join(pcapDir, "*.pcap")) if err != nil { @@ -83,7 +192,7 @@ func TestPCAPFiles(t *testing.T) { t.Fatal(err) } - err = ioutil.WriteFile(goldenName, data, 0o644) + err = os.WriteFile(goldenName, data, 0o644) if err != nil { t.Fatal(err) } @@ -115,7 +224,7 @@ func TestDatFiles(t *testing.T) { t.Fatal(err) } - err = ioutil.WriteFile(goldenName, data, 0o644) + err = os.WriteFile(goldenName, data, 0o644) if err != nil { t.Fatal(err) } @@ -141,7 +250,7 @@ func TestDatFiles(t *testing.T) { } func readDatTests(t testing.TB) *DatTests { - data, err := ioutil.ReadFile("testdata/dat_tests.yaml") + data, err := os.ReadFile("testdata/dat_tests.yaml") if err != nil { t.Fatal(err) } @@ -179,7 +288,7 @@ func getFlowsFromDat(t testing.TB, name string, testCase TestCase) TestResult { source := test.MakeAddress(t, datSourceIP+":4444") var events []beat.Event for _, f := range testCase.Files { - dat, err := ioutil.ReadFile(filepath.Join(datDir, f)) + dat, err := os.ReadFile(filepath.Join(datDir, f)) if err != nil { t.Fatal(err) } @@ -268,7 +377,7 @@ func normalize(t testing.TB, result TestResult) TestResult { } func readGoldenFile(t testing.TB, file string) TestResult { - data, err := ioutil.ReadFile(file) + data, err := os.ReadFile(file) if err != nil { t.Fatal(err) } @@ -343,7 +452,7 @@ func TestReverseFlows(t *testing.T) { }, } - var evs []beat.Event + evs := make([]beat.Event, 0, len(flows)) for _, f := range flows { evs = append(evs, toBeatEvent(f, []string{"private"})) }