From 58d79ec09c3397437df8f690ed24e3b39efb944a Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 9 Nov 2022 11:35:14 -0800 Subject: [PATCH] Merge feature arch v2 (#33630) * Update Metricbeat, Filebeat, libbeat with elastic-agent V2 support (#32673) * basic framework * continued tinkering * move away from ast code, use a struct * get metricbeat working, starting on filebeat * add notice update * add basic config register * move over processors to individual beats * remove comments * start to integrate V2 client changes * finishing touches * lint * cleanup merge * remove V1 controller * stil tinkering with linter * still fixing linter * plz linter * fmt x-pack files * notice update * fix output test * refactor stop functions, refactor tests, some misc cleanup * fix client version string * add devguide * linter * expand filebeat test * cleanup test * fix docs, add tests, debuggin * add signal handler * fix mutex issue in register * Fix osquerybeat configuration for V2 * clean up component registration * spelling * remove workaround for filebeat types * try to fix filebeat tests * add nil checks, fix test, fix unit stop * continue tinkering with nil type checks * add test for missing config datastreams, clean up nil handling * change nil protections, use getter methods * fix config access in output code Co-authored-by: Aleksandr Maus * V2 packetbeat support (#33041) * first attempt at auditbeat support * add license header * initial packetbeat support * fix bad branch * cleanup * typo in comment * clean up, move around files * add new processors to streams * First pass at auditbeat support (#33026) * first attempt at auditbeat support * add license header * cleanup * move files around * Add heartbeat support for V2 (#33157) * add v2 config * fix name * fix doc * fix go.mod * fix unchecked stream_id * fix unchecked stream_id (#33335) * Update elastic-agent-libs for output panic fix (#33336) * Fix errors for non-synth capable instances (#33310) Fixes #32694 by making sure we use the lightweight wrapper code always when monitors cannot be initialized. This also fixes an unrelated bug, where errors attached to non-summary events would not be indexed. * [Automation] Update elastic stack version to 8.6.0-5a8d757d for testing (#33323) Co-authored-by: apmmachine * add pid awareness to file locking (#33169) * add pid awareness to file locking * cleanup, logic for handling restarts with the same PID * add zombie-state awareness * fix file naming * add retry for unlock * was confused by unlock code, fix, cleanup * update notice * fix race with file creation, update deps * clean up tests, spelling * hack for cgo * add lic headers * notice * try to fix windows issues * fix typos * small fixes * use exclusive locks * remove feature to start with a specially named pidfile * clean up some error handling, fix test cleanup * forgot changelog * Fix sample config in log rotation docs (#33306) * Add banner to deprecate functionbeat (#33297) * fix unchecked stream_id * packetbeat/protos/dns: clean up package (#33286) * avoid magic numbers * fix hashableDNSTuple size and offsets * avoid use of String and Error methods in formatted print calls * remove redundant conversions * quieten linter * use plugin-owned logp.Logger * update elastic-agent-libs * Revert "fix unchecked stream_id" This reverts commit 26ef6da081e69c0a7677f7f56bd2a683ecf4446f. * [Automation] Update elastic stack version to 8.6.0-40086bc7 for testing (#33339) Co-authored-by: apmmachine Co-authored-by: Andrew Cholakian Co-authored-by: apmmachine <58790750+apmmachine@users.noreply.github.com> Co-authored-by: apmmachine Co-authored-by: Jaime Soriano Pastor Co-authored-by: DeDe Morton Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com> * update elastic-agent-client (#33552) Co-authored-by: Aleksandr Maus Co-authored-by: Andrew Cholakian Co-authored-by: apmmachine <58790750+apmmachine@users.noreply.github.com> Co-authored-by: apmmachine Co-authored-by: Jaime Soriano Pastor Co-authored-by: DeDe Morton Co-authored-by: Dan Kortschak <90160302+efd6@users.noreply.github.com> --- NOTICE.txt | 12 +- filebeat/beater/filebeat.go | 3 +- go.mod | 6 +- go.sum | 10 +- heartbeat/beater/heartbeat.go | 4 +- libbeat/cfgfile/cfgfile.go | 48 ++- libbeat/cmd/instance/beat.go | 4 +- libbeat/common/reload/reload.go | 41 +- metricbeat/beater/metricbeat.go | 2 +- packetbeat/beater/packetbeat.go | 2 +- x-pack/auditbeat/cmd/root.go | 33 ++ x-pack/filebeat/cmd/agent.go | 28 ++ x-pack/filebeat/cmd/root.go | 2 + x-pack/heartbeat/cmd/root.go | 23 ++ x-pack/libbeat/management/blacklist.go | 3 +- x-pack/libbeat/management/config.go | 2 +- x-pack/libbeat/management/devguide.asciidoc | 116 ++++++ x-pack/libbeat/management/generate.go | 271 ++++++++++++ x-pack/libbeat/management/generate_test.go | 165 ++++++++ x-pack/libbeat/management/manager.go | 389 ------------------ x-pack/libbeat/management/managerV2.go | 377 +++++++++++++++++ x-pack/libbeat/management/manager_test.go | 75 ---- x-pack/libbeat/management/plugin.go | 10 +- .../tests/fbtest/filebeat_v2_test.go | 115 ++++++ .../management/tests/fbtest/testdata/messages | 62 +++ .../management/tests/fbtest/testdata/secure | 30 ++ x-pack/libbeat/management/tests/init.go | 88 ++++ .../tests/mbtest/metricbeat_v2_test.go | 124 ++++++ .../libbeat/management/tests/mock_server.go | 159 +++++++ .../libbeat/management/tests/output_read.go | 94 +++++ x-pack/metricbeat/cmd/agent.go | 37 ++ x-pack/metricbeat/cmd/root.go | 2 + x-pack/osquerybeat/cmd/root.go | 33 ++ x-pack/osquerybeat/internal/config/watcher.go | 2 +- x-pack/packetbeat/cmd/root.go | 32 ++ 35 files changed, 1885 insertions(+), 519 deletions(-) create mode 100644 x-pack/filebeat/cmd/agent.go create mode 100644 x-pack/libbeat/management/devguide.asciidoc create mode 100644 x-pack/libbeat/management/generate.go create mode 100644 x-pack/libbeat/management/generate_test.go delete mode 100644 x-pack/libbeat/management/manager.go create mode 100644 x-pack/libbeat/management/managerV2.go delete mode 100644 x-pack/libbeat/management/manager_test.go create mode 100644 x-pack/libbeat/management/tests/fbtest/filebeat_v2_test.go create mode 100644 x-pack/libbeat/management/tests/fbtest/testdata/messages create mode 100644 x-pack/libbeat/management/tests/fbtest/testdata/secure create mode 100644 x-pack/libbeat/management/tests/init.go create mode 100644 x-pack/libbeat/management/tests/mbtest/metricbeat_v2_test.go create mode 100644 x-pack/libbeat/management/tests/mock_server.go create mode 100644 x-pack/libbeat/management/tests/output_read.go create mode 100644 x-pack/metricbeat/cmd/agent.go diff --git a/NOTICE.txt b/NOTICE.txt index da464723c323..f326139247ef 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -9867,11 +9867,11 @@ Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-a -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-client/v7 -Version: v7.0.0-20210727140539-f0905d9377f6 +Version: v7.0.0-20221028150015-05e494d37ccd Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20210727140539-f0905d9377f6/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-client/v7@v7.0.0-20221028150015-05e494d37ccd/LICENSE.txt: ELASTIC LICENSE AGREEMENT @@ -10100,11 +10100,11 @@ SOFTWARE -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-libs -Version: v0.2.11 +Version: v0.2.13 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.2.11/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.2.13/LICENSE: Apache License Version 2.0, January 2004 @@ -17908,11 +17908,11 @@ THE SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/mitchellh/mapstructure -Version: v1.4.3 +Version: v1.5.0 Licence type (autodetected): MIT -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/mitchellh/mapstructure@v1.4.3/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/mitchellh/mapstructure@v1.5.0/LICENSE: The MIT License (MIT) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7f65130427df..b2b8027a4337 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -362,10 +362,9 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // Register reloadable list of inputs and modules inputs := cfgfile.NewRunnerList(management.DebugK, inputLoader, fb.pipeline) - reload.Register.MustRegisterList("filebeat.inputs", inputs) + reload.RegisterV2.MustRegisterInput(inputs) modules := cfgfile.NewRunnerList(management.DebugK, moduleLoader, fb.pipeline) - reload.Register.MustRegisterList("filebeat.modules", modules) var adiscover *autodiscover.Autodiscover if fb.config.Autodiscover != nil { diff --git a/go.mod b/go.mod index 9bfa73f35cd6..b2d379d23467 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/dustin/go-humanize v1.0.0 github.com/eapache/go-resiliency v1.2.0 github.com/eclipse/paho.mqtt.golang v1.3.5 - github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 + github.com/elastic/elastic-agent-client/v7 v7.0.0-20221028150015-05e494d37ccd github.com/elastic/go-concert v0.2.0 github.com/elastic/go-libaudit/v2 v2.3.2 github.com/elastic/go-licenser v0.4.0 @@ -124,7 +124,7 @@ require ( github.com/miekg/dns v1.1.42 github.com/mitchellh/gox v1.0.1 github.com/mitchellh/hashstructure v0.0.0-20170116052023-ab25296c0f51 - github.com/mitchellh/mapstructure v1.4.3 + github.com/mitchellh/mapstructure v1.5.0 github.com/olekukonko/tablewriter v0.0.5 github.com/osquery/osquery-go v0.0.0-20210622151333-99b4efa62ec5 github.com/otiai10/copy v1.2.0 @@ -193,7 +193,7 @@ require ( github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5 github.com/elastic/bayeux v1.0.5 github.com/elastic/elastic-agent-autodiscover v0.4.0 - github.com/elastic/elastic-agent-libs v0.2.11 + github.com/elastic/elastic-agent-libs v0.2.13 github.com/elastic/elastic-agent-shipper-client v0.4.0 github.com/elastic/elastic-agent-system-metrics v0.4.5-0.20220927192933-25a985b07d51 github.com/elastic/go-elasticsearch/v8 v8.2.0 diff --git a/go.sum b/go.sum index 318b8b38857a..cb167106dc15 100644 --- a/go.sum +++ b/go.sum @@ -615,10 +615,11 @@ github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 h1:lnDkqiRFKm0rxdljqr github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3/go.mod h1:aPqzac6AYkipvp4hufTyMj5PDIphF3+At8zr7r51xjY= github.com/elastic/elastic-agent-autodiscover v0.4.0 h1:R1JMLHQpH2KP3GXY8zmgV4dj39uoe1asyPPWGQbGgSk= github.com/elastic/elastic-agent-autodiscover v0.4.0/go.mod h1:p3MSf9813JEnolCTD0GyVAr3+Eptg2zQ9aZVFjl4tJ4= -github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1:nFvXHBjYK3e9+xF0WKDeAKK4aOO51uC28s+L9rBmilo= -github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= -github.com/elastic/elastic-agent-libs v0.2.11 h1:ZeYn35Kxt+IdtMPmE01TaDeaahCg/z7MkGPVWUo6Lp4= +github.com/elastic/elastic-agent-client/v7 v7.0.0-20221028150015-05e494d37ccd h1:IuAuac3vcucBrjAXKPQlTJ22H7mBUsSnNWxa7GZYFEg= +github.com/elastic/elastic-agent-client/v7 v7.0.0-20221028150015-05e494d37ccd/go.mod h1:FEXUbFMfaV62S0CtJgD+FFHGY7+4o4fXkDicyONPSH8= github.com/elastic/elastic-agent-libs v0.2.11/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= +github.com/elastic/elastic-agent-libs v0.2.13 h1:YQzhO8RaLosGlyt7IHtj/ZxigWiwLcXXlv3gS4QY9CA= +github.com/elastic/elastic-agent-libs v0.2.13/go.mod h1:0J9lzJh+BjttIiVjYDLncKYCEWUUHiiqnuI64y6C6ss= github.com/elastic/elastic-agent-shipper-client v0.4.0 h1:nsTJF9oo4RHLl+zxFUZqNHaE86C6Ba5aImfegcEf6Sk= github.com/elastic/elastic-agent-shipper-client v0.4.0/go.mod h1:OyI2W+Mv3JxlkEF3OeT7K0dbuxvwew8ke2Cf4HpLa9Q= github.com/elastic/elastic-agent-system-metrics v0.4.5-0.20220927192933-25a985b07d51 h1:ZFk7hC6eRPJkJNtOSG+GYbRlsgLjSD8rTj4gQq+7rsA= @@ -1366,8 +1367,9 @@ github.com/mitchellh/mapstructure v1.3.2/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.3.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= -github.com/mitchellh/mapstructure v1.4.3 h1:OVowDSCllw/YjdLkam3/sm7wEtOy59d8ndGgCcyj8cs= github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/mitchellh/osext v0.0.0-20151018003038-5e2d6d41470f/go.mod h1:OkQIRizQZAeMln+1tSwduZz7+Af5oFlKirV/MSYes2A= github.com/moby/locker v1.0.1/go.mod h1:S7SDdo5zpBK84bzzVlKr2V0hz+7x9hWbYC/kq7oQppc= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 0453cb846f8f..e91c7acfe3bb 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -222,10 +222,8 @@ func (bt *Heartbeat) RunCentralMgmtMonitors(b *beat.Beat) { return nil }) - mons := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) - reload.Register.MustRegisterList(b.Info.Beat+".monitors", mons) inputs := cfgfile.NewRunnerList(management.DebugK, bt.monitorFactory, b.Publisher) - reload.Register.MustRegisterList("inputs", inputs) + reload.RegisterV2.MustRegisterInput(inputs) } // RunReloadableMonitors runs the `heartbeat.config.monitors` portion of the yaml config if present. diff --git a/libbeat/cfgfile/cfgfile.go b/libbeat/cfgfile/cfgfile.go index d97107aaffd8..ca19af8cb9f6 100644 --- a/libbeat/cfgfile/cfgfile.go +++ b/libbeat/cfgfile/cfgfile.go @@ -23,6 +23,7 @@ import ( "path/filepath" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/fleetmode" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -101,13 +102,13 @@ func HandleFlags() error { home, err := filepath.Abs(filepath.Dir(os.Args[0])) if err != nil { if *homePath == "" { - return fmt.Errorf("The absolute path to %s could not be obtained. %v", + return fmt.Errorf("The absolute path to %s could not be obtained. %w", os.Args[0], err) } home = *homePath } - defaults.SetString("path.home", -1, home) + _ = defaults.SetString("path.home", -1, home) if len(overwrites.GetFields()) > 0 { common.PrintConfigDebugf(overwrites, "CLI setting overwrites (-E flag):") @@ -133,30 +134,36 @@ func Read(out interface{}, path string) error { // Load reads the configuration from a YAML file structure. If path is empty // this method reads from the configuration file specified by the '-c' command // line flag. +// This function cares about the underlying fleet setting, and if beats is running with +// the management.enabled flag, Load() will bypass reading a config file, and merely merge any overrides. func Load(path string, beatOverrides []ConditionalOverride) (*config.C, error) { var c *config.C var err error cfgpath := GetPathConfig() - if path == "" { - list := []string{} - for _, cfg := range configfiles.List() { - if !filepath.IsAbs(cfg) { - list = append(list, filepath.Join(cfgpath, cfg)) - } else { - list = append(list, cfg) + if !fleetmode.Enabled() { + if path == "" { + list := []string{} + for _, cfg := range configfiles.List() { + if !filepath.IsAbs(cfg) { + list = append(list, filepath.Join(cfgpath, cfg)) + } else { + list = append(list, cfg) + } + } + c, err = common.LoadFiles(list...) + } else { + if !filepath.IsAbs(path) { + path = filepath.Join(cfgpath, path) } + c, err = common.LoadFile(path) } - c, err = common.LoadFiles(list...) - } else { - if !filepath.IsAbs(path) { - path = filepath.Join(cfgpath, path) + if err != nil { + return nil, err } - c, err = common.LoadFile(path) - } - if err != nil { - return nil, err + } else { + c = config.NewConfig() } if beatOverrides != nil { @@ -183,6 +190,9 @@ func Load(path string, beatOverrides []ConditionalOverride) (*config.C, error) { c, overwrites, ) + if err != nil { + return nil, err + } } common.PrintConfigDebugf(c, "Complete configuration loaded:") @@ -194,13 +204,13 @@ func LoadList(file string) ([]*config.C, error) { logp.Debug("cfgfile", "Load config from file: %s", file) rawConfig, err := common.LoadFile(file) if err != nil { - return nil, fmt.Errorf("invalid config: %s", err) + return nil, fmt.Errorf("invalid config: %w", err) } var c []*config.C err = rawConfig.Unpack(&c) if err != nil { - return nil, fmt.Errorf("error reading configuration from file %s: %s", file, err) + return nil, fmt.Errorf("error reading configuration from file %s: %w", file, err) } return c, nil diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 5b38ecc9eb7b..6fb1db02eda4 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -352,7 +352,7 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, fmt.Errorf("error initializing publisher: %w", err) } - reload.Register.MustRegister("output", b.makeOutputReloader(publisher.OutputReloader())) + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically @@ -721,7 +721,7 @@ func (b *Beat) configure(settings Settings) error { logp.Info("Beat ID: %v", b.Info.ID) // initialize config manager - b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) + b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.RegisterV2, b.Beat.Info.ID) if err != nil { return err } diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index e18386192453..c3a57bc027a5 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -25,8 +25,14 @@ import ( "github.com/elastic/elastic-agent-libs/mapstr" ) -// Register holds a registry of reloadable objects -var Register = NewRegistry() +// RegisterV2 is the special registry used for the V2 controller +var RegisterV2 = NewRegistry() + +// InputRegName is the registation name for V2 inputs +const InputRegName = "input" + +// OutputRegName is the registation name for V2 Outputs +const OutputRegName = "output" // ConfigWithMeta holds a pair of config.C and optional metadata for it type ConfigWithMeta struct { @@ -106,13 +112,38 @@ func (r *Registry) MustRegister(name string, obj Reloadable) { } } -// MustRegisterList declares a reloadable object list -func (r *Registry) MustRegisterList(name string, list ReloadableList) { - if err := r.RegisterList(name, list); err != nil { +// MustRegisterOutput is a V2-specific registration function +// That declares a reloadable output +func (r *Registry) MustRegisterOutput(obj Reloadable) { + if err := r.Register(OutputRegName, obj); err != nil { panic(err) } } +// MustRegisterInput is a V2-specific registration function +// that declares a reloadable object list for a beat input +func (r *Registry) MustRegisterInput(list ReloadableList) { + if err := r.RegisterList(InputRegName, list); err != nil { + panic(err) + } +} + +// GetInputList is a V2-specific function +// That returns the reloadable list created for an input +func (r *Registry) GetInputList() ReloadableList { + r.RLock() + defer r.RUnlock() + return r.confsLists[InputRegName] +} + +// GetReloadableOutput is a V2-specific function +// That returns the reloader for the registered output +func (r *Registry) GetReloadableOutput() Reloadable { + r.RLock() + defer r.RUnlock() + return r.confs[OutputRegName] +} + // GetRegisteredNames returns the list of names registered func (r *Registry) GetRegisteredNames() []string { r.RLock() diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 8b204a3cad32..9e2c101a253d 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -218,7 +218,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Centrally managed modules factory := module.NewFactory(b.Info, bt.moduleOptions...) modules := cfgfile.NewRunnerList(management.DebugK, factory, b.Publisher) - reload.Register.MustRegisterList(b.Info.Beat+".modules", modules) + reload.RegisterV2.MustRegisterInput(modules) wg.Add(1) go func() { defer wg.Done() diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 4ff9fa94eae1..9830ae4545e1 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -153,7 +153,7 @@ func (pb *packetbeat) runStatic(b *beat.Beat, factory *processorFactory) error { // the runner by starting the beat's manager. It returns on the first fatal error. func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error { runner := newReloader(management.DebugK, factory, b.Publisher) - reload.Register.MustRegisterList("inputs", runner) + reload.RegisterV2.MustRegisterInput(runner) logp.Debug("main", "Waiting for the runner to finish") // Start the manager after all the hooks are registered and terminates when diff --git a/x-pack/auditbeat/cmd/root.go b/x-pack/auditbeat/cmd/root.go index 213a0c3c68a8..5ccd3a231ad6 100644 --- a/x-pack/auditbeat/cmd/root.go +++ b/x-pack/auditbeat/cmd/root.go @@ -5,8 +5,15 @@ package cmd import ( + "fmt" + "strings" + auditbeatcmd "github.com/elastic/beats/v7/auditbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" // Register Auditbeat x-pack modules. _ "github.com/elastic/beats/v7/x-pack/auditbeat/include" @@ -19,7 +26,33 @@ var Name = auditbeatcmd.Name // RootCmd to handle beats CLI. var RootCmd *cmd.BeatsRootCmd +// auditbeatCfg is a callback registered with central management to perform any needed config transformations +// before agent configs are sent to a beat +func auditbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } + + // Extract the type field that has "audit/auditd", treat this + // as the module config key + module := strings.Split(rawIn.Type, "/")[1] + + for iter := range modules { + modules[iter]["module"] = module + } + + // Format for the reloadable list needed bythe cm.Reload() method. + configList, err := management.CreateReloadConfigFromInputs(modules) + if err != nil { + return nil, fmt.Errorf("error creating reloader config: %w", err) + } + + return configList, nil +} + func init() { + management.ConfigTransform.SetTransform(auditbeatCfg) settings := auditbeatcmd.AuditbeatSettings() settings.ElasticLicensed = true RootCmd = auditbeatcmd.Initialize(settings) diff --git a/x-pack/filebeat/cmd/agent.go b/x-pack/filebeat/cmd/agent.go new file mode 100644 index 000000000000..d5cb432ce908 --- /dev/null +++ b/x-pack/filebeat/cmd/agent.go @@ -0,0 +1,28 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func filebeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + modules, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } + + // format for the reloadable list needed bythe cm.Reload() method + configList, err := management.CreateReloadConfigFromInputs(modules) + if err != nil { + return nil, fmt.Errorf("error creating config for reloader: %w", err) + } + return configList, nil +} diff --git a/x-pack/filebeat/cmd/root.go b/x-pack/filebeat/cmd/root.go index 18bdc321b30b..c6f55a023795 100644 --- a/x-pack/filebeat/cmd/root.go +++ b/x-pack/filebeat/cmd/root.go @@ -7,6 +7,7 @@ package cmd import ( fbcmd "github.com/elastic/beats/v7/filebeat/cmd" cmd "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/management" // Register the includes. _ "github.com/elastic/beats/v7/x-pack/filebeat/include" @@ -18,6 +19,7 @@ const Name = fbcmd.Name // Filebeat build the beat root command for executing filebeat and it's subcommands. func Filebeat() *cmd.BeatsRootCmd { + management.ConfigTransform.SetTransform(filebeatCfg) settings := fbcmd.FilebeatSettings() settings.ElasticLicensed = true command := fbcmd.Filebeat(inputs.Init, settings) diff --git a/x-pack/heartbeat/cmd/root.go b/x-pack/heartbeat/cmd/root.go index d777f631e485..1fcd69ed6e4a 100644 --- a/x-pack/heartbeat/cmd/root.go +++ b/x-pack/heartbeat/cmd/root.go @@ -5,16 +5,39 @@ package cmd import ( + "fmt" + heartbeatCmd "github.com/elastic/beats/v7/heartbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" _ "github.com/elastic/beats/v7/x-pack/libbeat/include" + "github.com/elastic/beats/v7/x-pack/libbeat/management" ) // RootCmd to handle beats cli var RootCmd *cmd.BeatsRootCmd +// heartbeatCfg is a callback registered via SetTransform that returns a Elastic Agent client.Unit +// configuration generated from a raw Elastic Agent config +func heartbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + //grab and properly format the input streams + inputStreams, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo) + if err != nil { + return nil, fmt.Errorf("error generating new stream config: %w", err) + } + + configList, err := management.CreateReloadConfigFromInputs(inputStreams) + if err != nil { + return nil, fmt.Errorf("error creating reloader config: %w", err) + } + return configList, nil +} + func init() { + management.ConfigTransform.SetTransform(heartbeatCfg) settings := heartbeatCmd.HeartbeatSettings() settings.ElasticLicensed = true RootCmd = heartbeatCmd.Initialize(settings) diff --git a/x-pack/libbeat/management/blacklist.go b/x-pack/libbeat/management/blacklist.go index 75d7425e89c4..ca5b07d5a5e2 100644 --- a/x-pack/libbeat/management/blacklist.go +++ b/x-pack/libbeat/management/blacklist.go @@ -9,7 +9,6 @@ import ( "strings" "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/common/match" conf "github.com/elastic/elastic-agent-libs/config" @@ -52,7 +51,7 @@ func NewConfigBlacklist(cfg ConfigBlacklistSettings) (*ConfigBlacklist, error) { for field, pattern := range cfg.Patterns { exp, err := match.Compile(pattern) if err != nil { - return nil, errors.Wrap(err, fmt.Sprintf("Given expression is not a valid regexp: %s", pattern)) + return nil, fmt.Errorf("given expression is not a valid regexp: %s", pattern) } list.patterns[field] = exp diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index a4500941b869..ae19bd08cc48 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -29,7 +29,7 @@ type ConfigBlocksWithType struct { // ConfigBlocks holds a list of type + configs objects type ConfigBlocks []ConfigBlocksWithType -func defaultConfig() *Config { +func DefaultConfig() *Config { return &Config{ Blacklist: ConfigBlacklistSettings{ Patterns: map[string]string{ diff --git a/x-pack/libbeat/management/devguide.asciidoc b/x-pack/libbeat/management/devguide.asciidoc new file mode 100644 index 000000000000..a2553a6e5df6 --- /dev/null +++ b/x-pack/libbeat/management/devguide.asciidoc @@ -0,0 +1,116 @@ +# Writing V2 support for a beat + +## What is this document? + +This guide is meant to be a high-level guide for adapting an individual beat to the newer V2 control protocol used by elastic-agent; +this includes component registration, config transformation, and tests. + + +## Introduction: The V2 controller + +The V2 remote management protocol is fundamentally different from V1; whereas V1 required the registration of a number of callbacks from a client to a server, +such as `OnConfig()` and `OnStop()`, V2, is more parallelized and flexible, +but as a trade-off requires more work on behalf of a client to manage the flow of commands from the server. + +From the perspective of an individual beat however, little has changed, as this newer V2 server will still look for a `ReloadableList` callback registered by an +individual beat. However, the V2 server also offloads the work of transforming the "agent-native" config generated by fleet onto clients. +Previously, this config transformation was performed by an AST layer in elastic-agent, +as configured by YML spec files that lived in `elastic-agent/internal/spec`. In V2, this transformation must be performed by an individual beat. + + +## Component Registration + +There are two components, one of which is optional, that must be registered with the management controller for a beat to run under the V2 controller: + +### The Reloader + +The reloader component is not unique to V2, and remains largely unchanged from it's V1 state. Currently, all beats have a line like this (the following example is from metricbeat), in their early startup state: + +``` +reload.RegisterV2.MustRegisterInput(modules) +``` + +The `MustRegisterInput` method takes a single argument: component that satisfies a `ReloadableList` interface. +For most beats, this component will be a wrapper around an array of individual modules/inputs that maps to each individual input in the upstream fleet config. +In this example agent YAML config, each item under the `streams` key will become an individual input config in the array sent to the `ReloadableList` interface: +``` + - id: logfile-system-default-system + name: system-1 + revision: 1 + type: logfile + use_output: default + meta: + package: + name: system + version: 1.17.0 + data_stream: + namespace: default + streams: + - id: logfile-system.auth-default-system + [...] + - id: logfile-system.syslog-default-system + [...] +``` + +### Config Transformation + +Unlike with the V1 agent controller, with the V2 controller, individual beats will be required to make any transformations needed to format their config. +In V1, config transformation was performed by the agent and configured by a YAML file. In V2, this transformation process is fundamentally different, +as beats no longer receive the entire agent config, but individual per-input configs, +and so the logic of the AST transformations present in `elastic-agent/internal/spec` cannot be moved one-to-one over to the V2 client in beats. + +In order for a beat to perform its own config transformations, it must register a callback function, as such: +``` +import "github.com/elastic/beats/v7/x-pack/libbeat/management" +... +management.ConfigTransform.SetTransform(metricbeatCfg) +``` + +The `SetTransform` method takes a function with the following signature: +``` +func(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) +``` + +This registration will usually happen early in the startup process, and for licensing reasons, must happen in `x-pack` code. +For the sake of consistency, both filebeat and metricbeat register this callback in `x-pack/[beatname]/cmd`. If a beat does not register a callback, +the V2 client will perform a basic config transformation that adds metadata `processor` fields and breaks apart the individual inputs under the `streams` key. + +This function callback takes an `AgentInfo` struct that can be used for creating metadata `add_field` processors, as well as a `UnitExpectedConfig` structure that +represents the individual input config demonstrated in the previous section. The return value, `[]*reload.ConfigWithMeta`, should represent the final config that +will be passed to the reloader interface. + +The V2 client library provides a number of convenience functions. `CreateInputsFromStreams` will generate an array of input configs from the `streams` value, +complete with metadata fields. `CreateReloadConfigFromInputs` will turn a list of input configs in the form of `[]map[string]interface{}` into a list of reloader +configs that can be returned and sent to the registered reloader. + + +## Tests + +The V2 client controller contains a test suite under `x-pack/libbeat/management/tests`. These tests are broken down to one beat per folder, +as the tests themselves will try to initialize a number of global variables, requiring the CI process to break them apart to individual executables. +To add a new beat to the test, you must fetch the `BeatsRootCmd` object that starts a beat executible, and pass it to a wrapper that will +modify the beats environment for use in a test wrapper: +``` +import fbroot "github.com/elastic/beats/v7/x-pack/filebeat/cmd" +... +filebeatCmd := fbroot.Filebeat() +tests.InitBeatsForTest(t, filebeatCmd) +``` + +The test suite uses a mock `elastic-agent` server, requiring only a `UnitExpectedConfig{}` with the beat's desired config. After the mock server has been configured, +the `BeatsRootCmd` object can be started as it would in any normal non-test setup: +``` + // Setup the mock server, return the tmpfile where the beat will output metrics, and the server handler + outPath, server := tests.SetupTestEnv(t, expectedBeatsConfig, serverRuntimeInSeconds) + defer server.Srv.Stop() + + // start the beat. This is a blocking command, and beats will shut down after `serverRuntimeInSeconds`. + err := filebeatCmd.Execute() + require.NoError(t, err) + + // Read the reported metrics send to the `file` output by the beat + events := tests.ReadEvents(t, outPath) + t.Logf("Got %d events", len(events)) +``` + +For more in-depth examples, examine the `fbtest` and `mbtest` directories. \ No newline at end of file diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go new file mode 100644 index 000000000000..29bcf15d84bd --- /dev/null +++ b/x-pack/libbeat/management/generate.go @@ -0,0 +1,271 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var DefaultNamespaceName = "default" +var DefaultDatasetName = "generic" + +// =========== +// Config Transformation Registry +// =========== + +var ConfigTransform = TransformRegister{} + +// TransformRegister is a hack that allows an individual beat to set a transform function +// so the V2 controller can perform beat-specific config transformations. +// This is mostly done this way so we can avoid mixing up code with different licenses, +// as this is entirely xpack/Elastic License code, and the normal beat init process happens in libbeat. +// This is fairly simple, as only one beat will ever register a callback. +type TransformRegister struct { + transformFunc func(*proto.UnitExpectedConfig, *client.AgentInfo) ([]*reload.ConfigWithMeta, error) +} + +// SetTransform sets a transform function callback +func (r *TransformRegister) SetTransform(transform func(*proto.UnitExpectedConfig, *client.AgentInfo) ([]*reload.ConfigWithMeta, error)) { + r.transformFunc = transform +} + +// SetTransform sets a transform function callback +func (r *TransformRegister) Transform(cfg *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + // If no transform is registered, fallback to a basic setup + if r.transformFunc == nil { + streamList, err := CreateInputsFromStreams(cfg, "log", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from fallback function: %w", err) + } + // format for the reloadable list needed bythe cm.Reload() method + configList, err := CreateReloadConfigFromInputs(streamList) + if err != nil { + return nil, fmt.Errorf("error creating reloader config: %w", err) + } + return configList, nil + } + + return r.transformFunc(cfg, agentInfo) +} + +// =========== +// Stream and Input processors +// =========== + +// CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values +// that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/ +// This also performs the basic task of inserting module-level add_field processors into the inputs/modules. +func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, inputType string, agentInfo *client.AgentInfo) ([]map[string]interface{}, error) { + inputs := make([]map[string]interface{}, len(raw.Streams)) + + for iter, stream := range raw.GetStreams() { + streamSource := raw.GetStreams()[iter].GetSource().AsMap() + + streamSource = injectIndexStream(raw, inputType, stream, streamSource) + streamSource, err := injectStreamProcessors(raw, inputType, stream, streamSource) + if err != nil { + return nil, fmt.Errorf("Error injecting stream processors: %w", err) + } + streamSource, err = injectAgentInfoRule(streamSource, agentInfo) + if err != nil { + return nil, fmt.Errorf("Error injecting agent processors: %w", err) + } + inputs[iter] = streamSource + } + + return inputs, nil +} + +// CreateReloadConfigFromInputs turns a raw input/module list into the ConfigWithMeta type used by the reloader interface +func CreateReloadConfigFromInputs(raw []map[string]interface{}) ([]*reload.ConfigWithMeta, error) { + // format for the reloadable list needed bythe cm.Reload() method + configList := make([]*reload.ConfigWithMeta, len(raw)) + + for iter := range raw { + uconfig, err := conf.NewConfigFrom(raw[iter]) + if err != nil { + return nil, fmt.Errorf("error in conversion to conf.C: %w", err) + } + configList[iter] = &reload.ConfigWithMeta{Config: uconfig} + } + return configList, nil +} + +// Emulates the InjectAgentInfoRule and InjectHeadersRule ast rules +func injectAgentInfoRule(inputs map[string]interface{}, agentInfo *client.AgentInfo) (map[string]interface{}, error) { + // upstream API can sometimes return a nil agent info + if agentInfo == nil { + return inputs, nil + } + var processors []interface{} + + processors = append(processors, generateAddFieldsProcessor( + mapstr.M{"id": agentInfo.ID, "snapshot": agentInfo.Snapshot, "version": agentInfo.Version}, + "elastic_agent")) + processors = append(processors, generateAddFieldsProcessor( + mapstr.M{"id": agentInfo.ID}, + "agent")) + + currentProcs, ok := inputs["processors"] + if !ok { + inputs["processors"] = processors + } else { + currentProcsList, ok := currentProcs.([]interface{}) + if !ok { + return nil, fmt.Errorf("error creating list of existing processors, got: %#v", currentProcs) + } + inputs["processors"] = append(processors, currentProcsList...) + + } + + return inputs, nil +} + +// injectIndexStream is an emulation of the InjectIndexProcessor AST code +func injectIndexStream(expected *proto.UnitExpectedConfig, inputType string, streamExpected *proto.Stream, stream map[string]interface{}) map[string]interface{} { + streamType := expected.GetDataStream().GetType() + if streamType == "" { + streamType = inputType + } + + dataset := DefaultDatasetName + if testDataset := streamExpected.GetDataStream().GetDataset(); testDataset != "" { + dataset = testDataset + } + + namespace := DefaultNamespaceName + if testNamespace := expected.GetDataStream().GetNamespace(); testNamespace != "" { + namespace = testNamespace + } + + index := fmt.Sprintf("%s-%s-%s", streamType, dataset, namespace) + stream["index"] = index + return stream +} + +//injectStreamProcessors is an emulation of the InjectStreamProcessorRule AST code +func injectStreamProcessors(expected *proto.UnitExpectedConfig, inputType string, streamExpected *proto.Stream, stream map[string]interface{}) (map[string]interface{}, error) { + //1. start by "repairing" config to add any missing fields + // logic from datastreamTypeFromInputNode + procInputType := inputType + if testInputType := expected.GetDataStream().GetType(); testInputType != "" { + procInputType = testInputType + } + + procInputNamespace := DefaultNamespaceName + if testInputNamespace := expected.GetDataStream().GetNamespace(); testInputNamespace != "" { + procInputNamespace = testInputNamespace + } + + var processors = []interface{}{} + + // the AST injects input_id at the input level and not the stream level, + // for reasons I can't understand, as it just ends up shuffling it around + // to individual metricsets anyway, at least on metricbeat + if expectedID := expected.GetId(); expectedID != "" { + inputId := generateAddFieldsProcessor(mapstr.M{"input_id": expectedID}, "@metadata") + processors = append(processors, inputId) + } + + procInputDataset := DefaultDatasetName + if testStreamDataset := streamExpected.GetDataStream().GetDataset(); testStreamDataset != "" { + procInputDataset = testStreamDataset + } + + //2. Actually add the processors + // namespace + datastream := generateAddFieldsProcessor(mapstr.M{"dataset": procInputDataset, + "namespace": procInputNamespace, "type": procInputType}, "data_stream") + processors = append(processors, datastream) + + // dataset + event := generateAddFieldsProcessor(mapstr.M{"dataset": procInputDataset}, "event") + processors = append(processors, event) + + // source stream + if streamID := streamExpected.GetId(); streamID != "" { + sourceStream := generateAddFieldsProcessor(mapstr.M{"stream_id": streamID}, "@metadata") + processors = append(processors, sourceStream) + } + + // figure out if we have any existing processors + currentProcs, ok := stream["processors"] + if !ok { + stream["processors"] = processors + } else { + currentProcsList, ok := currentProcs.([]interface{}) + if !ok { + return nil, fmt.Errorf("error creating list of existing processors, got: %#v", currentProcs) + } + stream["processors"] = append(processors, currentProcsList...) + + } + + return stream, nil +} + +// =========== +// Config Processors +// =========== + +func generateAddFieldsProcessor(fields mapstr.M, target string) mapstr.M { + return mapstr.M{ + "add_fields": mapstr.M{ + "fields": fields, + "target": target, + }, + } +} + +// This generates an opaque config blob used by all the beats +// This has to handle both universal config changes and changes specific to the beats +// This is a replacement for the AST code that lived in V1 +func generateBeatConfig(unitRaw *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + // We aren't guaranteed a DataStream field from the config + if unitRaw.GetDataStream() == nil { + unitRaw.DataStream = &proto.DataStream{ + Namespace: DefaultNamespaceName, + Dataset: DefaultDatasetName, + } + } else { + if unitRaw.GetDataStream().GetNamespace() == "" { + unitRaw.DataStream.Namespace = DefaultNamespaceName + } + if unitRaw.GetDataStream().GetDataset() == "" { + unitRaw.DataStream.Dataset = DefaultDatasetName + } + } + + // Generate the config that's unique to a beat + metaConfig, err := ConfigTransform.Transform(unitRaw, agentInfo) + if err != nil { + return nil, fmt.Errorf("error transforming config for beats: %w", err) + } + return metaConfig, nil +} + +// generate the output config, including shuffling around the `type` key +// In V1, this was done by the groupByOutputs function buried in the AST init +func groupByOutputs(outCfg *proto.UnitExpectedConfig) (*reload.ConfigWithMeta, error) { + // We still need to emulate the InjectHeadersRule AST code, + // I don't think we can get the `Headers()` data reported by the AgentInfo() + sourceMap := outCfg.GetSource().AsMap() + outputType := outCfg.GetType() + formattedOut := mapstr.M{ + outputType: sourceMap, + } + uconfig, err := conf.NewConfigFrom(formattedOut) + if err != nil { + return nil, fmt.Errorf("error creating reloader config for output: %w", err) + } + + return &reload.ConfigWithMeta{Config: uconfig}, nil +} diff --git a/x-pack/libbeat/management/generate_test.go b/x-pack/libbeat/management/generate_test.go new file mode 100644 index 000000000000..b81931342a08 --- /dev/null +++ b/x-pack/libbeat/management/generate_test.go @@ -0,0 +1,165 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestBareConfig(t *testing.T) { + // config with datastreams, metadata, etc, removed + rawExpected := proto.UnitExpectedConfig{ + Id: "system/metrics-system-default-system", + Type: "system/metrics", + Name: "system-1", + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system", + Source: requireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "1m", + }), + }, + }, + } + + // First test: this doesn't panic on nil pointer dereference + reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) + require.NoError(t, err, "error in generateBeatConfig") + cfgMap := mapstr.M{} + err = reloadCfg[0].Config.Unpack(&cfgMap) + require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) + + // Actual checks + processorFields := map[string]interface{}{ + "add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system", + "add_fields.fields.dataset": "generic", + "add_fields.fields.namespace": "default", + "add_fields.fields.type": "log", + "add_fields.fields.input_id": "system/metrics-system-default-system", + "add_fields.fields.id": "beat-ID", + } + findFieldsInProcessors(t, processorFields, cfgMap) +} + +func TestMBGenerate(t *testing.T) { + sourceStream := requireNewStruct(t, map[string]interface{}{ + "metricsets": []interface{}{"filesystem"}, + "period": "1m", + "processors": []interface{}{ + map[string]interface{}{ + "drop_event.when.regexp": map[string]interface{}{ + "system.filesystem.mount_point": "^/(sys|cgroup|proc|dev|etc|host|lib|snap)($|/)", + }, + }, + }, + }) + + rawExpected := proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Id: "system/metrics-system-default-system", + Type: "system/metrics", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Streams: []*proto.Stream{ + { + Id: "system/metrics-system.filesystem-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.filesystem", + Type: "metrics", + }, + Source: sourceStream, + }, + }, + } + + reloadCfg, err := generateBeatConfig(&rawExpected, &client.AgentInfo{ID: "beat-ID", Version: "8.0.0", Snapshot: true}) + require.NoError(t, err, "error in generateBeatConfig") + cfgMap := mapstr.M{} + err = reloadCfg[0].Config.Unpack(&cfgMap) + require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) + + configFields := map[string]interface{}{ + "drop_event": nil, + "add_fields.fields.stream_id": "system/metrics-system.filesystem-default-system", + "add_fields.fields.dataset": "system.filesystem", + "add_fields.fields.input_id": "system/metrics-system-default-system", + "add_fields.fields.id": "beat-ID", + } + findFieldsInProcessors(t, configFields, cfgMap) + +} + +func TestOutputGen(t *testing.T) { + testExpected := proto.UnitExpectedConfig{ + Type: "elasticsearch", + Source: requireNewStruct(t, map[string]interface{}{ + "hosts": []interface{}{"localhost:9200"}, + "username": "elastic", + "password": "changeme", + }), + } + + cfg, err := groupByOutputs(&testExpected) + require.NoError(t, err) + testStruct := mapstr.M{} + err = cfg.Config.Unpack(&testStruct) + require.NoError(t, err) + innerCfg, exists := testStruct["elasticsearch"] + assert.True(t, exists, "elasticsearch key does not exist") + _, pwExists := innerCfg.(map[string]interface{})["password"] + assert.True(t, pwExists, "password config not found") + +} + +func requireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + if err != nil { + require.NoError(t, err) + } + return str +} + +func findFieldsInProcessors(t *testing.T, configFields map[string]interface{}, cfgMap mapstr.M) { + for key, val := range configFields { + gotKey := false + gotVal := false + errStr := "" + for _, proc := range cfgMap["processors"].([]interface{}) { + processor := mapstr.M(proc.(map[string]interface{})) + found, ok := processor.GetValue(key) + if ok == nil { + gotKey = true + if val == nil { + gotVal = true + } else { + if val == found { + gotVal = true + } else { + errStr = found.(string) + } + } + } + } + assert.True(t, gotKey, "did not find key for %s", key) + assert.True(t, gotVal, "got incorrect key for %s, expected %s, got %s", key, val, errStr) + } +} diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go deleted file mode 100644 index 11b9a294d029..000000000000 --- a/x-pack/libbeat/management/manager.go +++ /dev/null @@ -1,389 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package management - -import ( - "context" - "fmt" - "os" - "sort" - "sync" - - "github.com/gofrs/uuid" - "github.com/hashicorp/go-multierror" - "github.com/pkg/errors" - - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" - - "github.com/elastic/beats/v7/libbeat/common/cfgwarn" - "github.com/elastic/beats/v7/libbeat/common/reload" - lbmanagement "github.com/elastic/beats/v7/libbeat/management" - "github.com/elastic/elastic-agent-libs/logp" -) - -var notReportedErrors = []error{ - context.Canceled, -} - -// Manager handles internal config updates. By retrieving -// new configs from Kibana and applying them to the Beat. -type Manager struct { - config *Config - logger *logp.Logger - beatUUID uuid.UUID - registry *reload.Registry - blacklist *ConfigBlacklist - client client.Client - lock sync.Mutex - status lbmanagement.Status - msg string - payload map[string]interface{} - - stopFunc func() - isRunning bool -} - -// NewFleetManager returns a X-Pack Beats Fleet Management manager. -func NewFleetManager(config *conf.C, registry *reload.Registry, beatUUID uuid.UUID) (lbmanagement.Manager, error) { - c := defaultConfig() - if config.Enabled() { - if err := config.Unpack(&c); err != nil { - return nil, errors.Wrap(err, "parsing fleet management settings") - } - } - return NewFleetManagerWithConfig(c, registry, beatUUID) -} - -// NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager. -func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (lbmanagement.Manager, error) { - log := logp.NewLogger(lbmanagement.DebugK) - - m := &Manager{ - config: c, - logger: log.Named("fleet"), - beatUUID: beatUUID, - registry: registry, - } - - var err error - var blacklist *ConfigBlacklist - var eac client.Client - if c.Enabled { - // Initialize configs blacklist - blacklist, err = NewConfigBlacklist(c.Blacklist) - if err != nil { - return nil, errors.Wrap(err, "wrong settings for configurations blacklist") - } - - // Initialize the client - eac, err = client.NewFromReader(os.Stdin, m) - if err != nil { - return nil, errors.Wrap(err, "failed to create elastic-agent-client") - } - } - - m.blacklist = blacklist - m.client = eac - return m, nil -} - -// Enabled returns true if config management is enabled. -func (cm *Manager) Enabled() bool { - return cm.config.Enabled -} - -// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully. -func (cm *Manager) SetStopCallback(stopFunc func()) { - cm.lock.Lock() - defer cm.lock.Unlock() - cm.stopFunc = stopFunc -} - -// Start the config manager. -func (cm *Manager) Start() error { - cm.lock.Lock() - defer cm.lock.Unlock() - - if !cm.Enabled() { - return nil - } - - cfgwarn.Beta("Fleet management is enabled") - cm.logger.Info("Starting fleet management service") - - cm.isRunning = true - err := cm.client.Start(context.Background()) - if err != nil { - cm.logger.Errorf("failed to start elastic-agent-client: %s", err) - return err - } - cm.logger.Info("Ready to receive configuration") - return nil -} - -// Stop stops the current Manager and close the connection to Elastic Agent. -func (cm *Manager) Stop() { - cm.lock.Lock() - defer cm.lock.Unlock() - - if !cm.Enabled() { - return - } - - cm.logger.Info("Stopping fleet management service") - cm.isRunning = false - cm.client.Stop() -} - -// CheckRawConfig check settings are correct to start the beat. This method -// checks there are no collision between the existing configuration and what -// fleet management can configure. -// -// NOTE: This is currently not implemented for fleet. -func (cm *Manager) CheckRawConfig(cfg *conf.C) error { - // TODO implement this method - return nil -} - -// UpdateStatus updates the manager with the current status for the beat. -func (cm *Manager) UpdateStatus(status lbmanagement.Status, msg string) { - cm.lock.Lock() - defer cm.lock.Unlock() - - if cm.status != status || cm.msg != msg { - cm.status = status - cm.msg = msg - cm.client.Status(statusToProtoStatus(status), msg, nil) - cm.logger.Infof("Status change to %s: %s", status, msg) - } -} - -// updateStatusWithError updates the manager with the current status for the beat with error. -func (cm *Manager) updateStatusWithError(err error) { - if err == nil { - return - } - - for _, e := range notReportedErrors { - if errors.Is(err, e) { - return - } - } - - cm.logger.Error(err) - cm.UpdateStatus(lbmanagement.Failed, err.Error()) -} - -func (cm *Manager) OnConfig(s string) { - cm.UpdateStatus(lbmanagement.Configuring, "Updating configuration") - - var configMap mapstr.M - uconfig, err := conf.NewConfigFrom(s) - if err != nil { - err = errors.Wrap(err, "config blocks unsuccessfully generated") - cm.updateStatusWithError(err) - return - } - - err = uconfig.Unpack(&configMap) - if err != nil { - err = errors.Wrap(err, "config blocks unsuccessfully generated") - cm.updateStatusWithError(err) - return - } - - blocks, err := cm.toConfigBlocks(configMap) - if err != nil { - err = errors.Wrap(err, "failed to parse configuration") - cm.updateStatusWithError(err) - return - } - - if err := cm.apply(blocks); err != nil { - // `cm.apply` already logs the errors; currently allow beat to run degraded - cm.updateStatusWithError(err) - cm.logger.Errorf("failed applying config blocks: %v", err) - return - } - - cm.client.Status(proto.StateObserved_HEALTHY, "Running", cm.payload) -} - -func (cm *Manager) RegisterAction(action client.Action) { - cm.client.RegisterAction(action) -} - -func (cm *Manager) UnregisterAction(action client.Action) { - cm.client.UnregisterAction(action) -} - -func (cm *Manager) SetPayload(payload map[string]interface{}) { - cm.lock.Lock() - cm.payload = payload - cm.lock.Unlock() -} - -func (cm *Manager) OnStop() { - cm.lock.Lock() - defer cm.lock.Unlock() - - if cm.stopFunc != nil { - cm.client.Status(proto.StateObserved_STOPPING, "Stopping", nil) - cm.stopFunc() - } -} - -func (cm *Manager) OnError(err error) { - isStopped := false - cm.lock.Lock() - isStopped = !cm.isRunning - cm.lock.Unlock() - - if isStopped && errors.Is(err, context.Canceled) { - // don't report context cancelled on shutdown - return - } - cm.logger.Errorf("elastic-agent-client got error: %s", err) -} - -func (cm *Manager) apply(blocks ConfigBlocks) error { - missing := map[string]bool{} - for _, name := range cm.registry.GetRegisteredNames() { - missing[name] = true - } - - // Detect unwanted configs from the list - if err := cm.blacklist.Detect(blocks); err != nil { - return err - } - - var errors *multierror.Error - // Reload configs - for _, b := range blocks { - if err := cm.reload(b.Type, b.Blocks); err != nil { - errors = multierror.Append(errors, err) - } - missing[b.Type] = false - } - - // Unset missing configs - for name, isMissing := range missing { - if isMissing { - if err := cm.reload(name, []*ConfigBlock{}); err != nil { - errors = multierror.Append(errors, err) - } - } - } - - return errors.ErrorOrNil() -} - -func (cm *Manager) reload(t string, blocks []*ConfigBlock) error { - cm.logger.Infof("Applying settings for %s", t) - if obj := cm.registry.GetReloadable(t); obj != nil { - // Single object - if len(blocks) > 1 { - err := fmt.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) - cm.logger.Error(err) - return err - } - - var config *reload.ConfigWithMeta - var err error - if len(blocks) == 1 { - config, err = blocks[0].ConfigWithMeta() - if err != nil { - cm.logger.Error(err) - return err - } - } - - if err := obj.Reload(config); err != nil { - cm.logger.Error(err) - return err - } - } else if obj := cm.registry.GetReloadableList(t); obj != nil { - // List - var configs []*reload.ConfigWithMeta - for _, block := range blocks { - config, err := block.ConfigWithMeta() - if err != nil { - cm.logger.Error(err) - return err - } - configs = append(configs, config) - } - - if err := obj.Reload(configs); err != nil { - cm.logger.Error(err) - return err - } - } - - return nil -} - -func (cm *Manager) toConfigBlocks(cfg mapstr.M) (ConfigBlocks, error) { - blocks := map[string][]*ConfigBlock{} - - // Extract all registered values beat can respond to - for _, regName := range cm.registry.GetRegisteredNames() { - iBlock, err := cfg.GetValue(regName) - if err != nil { - cm.logger.Warnf("failed to get '%s' from config: %v. Continuing to next one", regName, err) - continue - } - - if mapBlock, ok := iBlock.(map[string]interface{}); ok { - blocks[regName] = append(blocks[regName], &ConfigBlock{Raw: mapBlock}) - } else if arrayBlock, ok := iBlock.([]interface{}); ok { - for _, item := range arrayBlock { - if mapBlock, ok := item.(map[string]interface{}); ok { - blocks[regName] = append(blocks[regName], &ConfigBlock{Raw: mapBlock}) - } - } - } - } - - // keep the ordering consistent while grouping the items. - keys := make([]string, 0, len(blocks)) - for k := range blocks { - keys = append(keys, k) - } - sort.Strings(keys) - - res := ConfigBlocks{} - for _, t := range keys { - b := blocks[t] - res = append(res, ConfigBlocksWithType{Type: t, Blocks: b}) - } - - return res, nil -} - -func statusToProtoStatus(status lbmanagement.Status) proto.StateObserved_Status { - switch status { - case lbmanagement.Unknown: - // unknown is reported as healthy, as the status is unknown - return proto.StateObserved_HEALTHY - case lbmanagement.Starting: - return proto.StateObserved_STARTING - case lbmanagement.Configuring: - return proto.StateObserved_CONFIGURING - case lbmanagement.Running: - return proto.StateObserved_HEALTHY - case lbmanagement.Degraded: - return proto.StateObserved_DEGRADED - case lbmanagement.Failed: - return proto.StateObserved_FAILED - case lbmanagement.Stopping: - return proto.StateObserved_STOPPING - } - // unknown status, still reported as healthy - return proto.StateObserved_HEALTHY -} diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go new file mode 100644 index 000000000000..060c23161cef --- /dev/null +++ b/x-pack/libbeat/management/managerV2.go @@ -0,0 +1,377 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + + "github.com/gofrs/uuid" + + "github.com/elastic/beats/v7/libbeat/common/reload" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// BeatV2Manager is the main type for tracing V2-related config updates +type BeatV2Manager struct { + config *Config + registry *reload.Registry + client client.V2 + + logger *logp.Logger + + // Track individual units given to us by the V2 API + unitsMut sync.Mutex + units map[string]*client.Unit + mainUnit string + + // This satisfies the SetPayload() function, and will pass along this value to the UpdateStatus() + // call whenever a config is re-registered + payload map[string]interface{} + + // stop callback must be registered by libbeat, as with the V1 callback + stopFunc func() + stopMut sync.Mutex + beatStop sync.Once + + // sync channel for shutting down the manager after we get a stop from + // either the agent or the beat + stopChan chan struct{} + + isRunning bool +} + +// NewV2AgentManager returns a remote config manager for the agent V2 protocol. +// This is meant to be used by the management plugin system, which will register this as a callback. +func NewV2AgentManager(config *conf.C, registry *reload.Registry, beatUUID uuid.UUID) (lbmanagement.Manager, error) { + c := DefaultConfig() + if config.Enabled() { + if err := config.Unpack(&c); err != nil { + return nil, fmt.Errorf("parsing fleet management settings: %w", err) + } + } + agentClient, _, err := client.NewV2FromReader(os.Stdin, client.VersionInfo{ + Name: "beat-v2-client", + Version: version.GetDefaultVersion(), + Meta: map[string]string{ + "commit": version.Commit(), + "build_time": version.BuildTime().String(), + }, + }) + if err != nil { + return nil, fmt.Errorf("error reading control config from agent: %w", err) + } + + return NewV2AgentManagerWithClient(c, registry, agentClient) +} + +// NewV2AgentManagerWithClient actually creates the manager instance used by the rest of the beats. +func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agentClient client.V2) (lbmanagement.Manager, error) { + log := logp.NewLogger(lbmanagement.DebugK) + m := &BeatV2Manager{ + config: config, + logger: log.Named("V2-manager"), + registry: registry, + units: make(map[string]*client.Unit), + stopChan: make(chan struct{}, 1), + } + + if config.Enabled { + m.client = agentClient + } + return m, nil +} + +// ================================ +// Beats central management interface implementation +// ================================ + +// UpdateStatus updates the manager with the current status for the beat. +func (cm *BeatV2Manager) UpdateStatus(status lbmanagement.Status, msg string) { + updateState := client.UnitState(status) + stateUnit, exists := cm.getMainUnit() + cm.logger.Debugf("Updating beat status: %s", msg) + if exists { + _ = stateUnit.UpdateState(updateState, msg, cm.payload) + } else { + cm.logger.Warnf("Cannot update state to %s, no main unit is set. Msg: %s", status, msg) + } +} + +// Enabled returns true if config management is enabled. +func (cm *BeatV2Manager) Enabled() bool { + return cm.config.Enabled +} + +// SetStopCallback sets the callback to run when the manager want to shutdown the beats gracefully. +func (cm *BeatV2Manager) SetStopCallback(stopFunc func()) { + cm.stopMut.Lock() + defer cm.stopMut.Unlock() + cm.stopFunc = stopFunc +} + +// Start the config manager. +func (cm *BeatV2Manager) Start() error { + if !cm.Enabled() { + return fmt.Errorf("V2 Manager is disabled") + } + err := cm.client.Start(context.Background()) + if err != nil { + return fmt.Errorf("error starting connection to client") + } + + go cm.unitListen() + cm.isRunning = true + return nil +} + +// Stop stops the current Manager and close the connection to Elastic Agent. +func (cm *BeatV2Manager) Stop() { + cm.stopChan <- struct{}{} +} + +// CheckRawConfig is currently not implemented for V1. +func (cm *BeatV2Manager) CheckRawConfig(cfg *conf.C) error { + // This does not do anything on V1 or V2, but here we are + return nil +} + +func (cm *BeatV2Manager) RegisterAction(action client.Action) { + cm.unitsMut.Lock() + defer cm.unitsMut.Unlock() + stateUnit, exists := cm.units[cm.mainUnit] + if exists { + _ = stateUnit.UpdateState(client.UnitStateHealthy, fmt.Sprintf("Registering action %s for main unit with ID %s", cm.mainUnit, action.Name()), nil) + cm.units[cm.mainUnit].RegisterAction(action) + } else { + cm.logger.Warnf("Cannot register action %s, no main unit found", action.Name()) + } +} + +func (cm *BeatV2Manager) UnregisterAction(action client.Action) { + cm.unitsMut.Lock() + defer cm.unitsMut.Unlock() + stateUnit, exists := cm.units[cm.mainUnit] + if exists { + _ = stateUnit.UpdateState(client.UnitStateHealthy, fmt.Sprintf("Unregistering action %s for main unit with ID %s", cm.mainUnit, action.Name()), nil) + cm.units[cm.mainUnit].UnregisterAction(action) + } else { + cm.logger.Warnf("Cannot Unregister action %s, no main unit found", action.Name()) + } +} + +func (cm *BeatV2Manager) SetPayload(payload map[string]interface{}) { + cm.payload = payload +} + +// ================================ +// Unit manager +// ================================ + +func (cm *BeatV2Manager) addUnit(unit *client.Unit) { + cm.unitsMut.Lock() + cm.units[unit.ID()] = unit + cm.unitsMut.Unlock() +} + +func (cm *BeatV2Manager) getMainUnit() (*client.Unit, bool) { + cm.unitsMut.Lock() + defer cm.unitsMut.Unlock() + if cm.mainUnit == "" { + return nil, false + } + return cm.units[cm.mainUnit], true +} + +// We need a "main" unit that we can send updates to for the StatusReporter interface +// the purpose of this is to just grab the first input-type unit we get and set it as the "main" unit +func (cm *BeatV2Manager) setMainUnitValue(unit *client.Unit) { + cm.unitsMut.Lock() + defer cm.unitsMut.Unlock() + if cm.mainUnit == "" { + cm.logger.Debugf("Set main input unit to ID %s", unit.ID) + cm.mainUnit = unit.ID() + } +} + +func (cm *BeatV2Manager) deleteUnit(unit *client.Unit) { + cm.unitsMut.Lock() + delete(cm.units, unit.ID()) + cm.unitsMut.Unlock() +} + +// ================================ +// Private V2 implementation +// ================================ + +func (cm *BeatV2Manager) unitListen() { + + // register signal handler + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + + cm.logger.Debugf("Listening for agent unit changes") + for { + select { + // The stopChan channel comes from the Manager interface Stop() method + case <-cm.stopChan: + cm.stopBeat() + case sig := <-sigc: + // we can't duplicate the same logic used by stopChan here. + // A beat will also watch for sigint and shut down, if we call the stopFunc + // callback, either the V2 client or the beat will get a panic, + // as the stopFunc sent by the beats is usually unsafe. + switch sig { + case syscall.SIGINT, syscall.SIGTERM: + cm.logger.Debug("Received sigterm/sigint, stopping") + case syscall.SIGHUP: + cm.logger.Debug("Received sighup, stopping") + } + cm.isRunning = false + unit, mainExists := cm.getMainUnit() + if mainExists { + _ = unit.UpdateState(client.UnitStateStopping, "stopping beat", nil) + } + cm.client.Stop() + return + case change := <-cm.client.UnitChanges(): + switch change.Type { + // Within the context of how we send config to beats, I'm not sure there is a difference between + // A unit add and a unit change, since either way we can't do much more than call the reloader + case client.UnitChangedAdded: + // At this point we also get a log level, however I'm not sure the beats core logger provides a + // clean way to "just" change the log level, without resetting the whole log config + state, _, _ := change.Unit.Expected() + cm.logger.Debugf("Got unit added: %s, type: %s expected state: %s", change.Unit.ID(), change.Unit.Type(), state.String()) + go cm.handleUnitReload(change.Unit) + + case client.UnitChangedModified: + state, _, _ := change.Unit.Expected() + cm.logger.Debugf("Got unit modified: %s, type: %s expected state: %s", change.Unit.ID(), change.Unit.Type(), state.String()) + // I'm assuming that a state STOPPED just tells us to shut down the entire beat, + // as such we don't really care about updating via a particular unit + if state == client.UnitStateStopped { + cm.stopBeat() + } else { + go cm.handleUnitReload(change.Unit) + } + + case client.UnitChangedRemoved: + cm.logger.Debugf("Got unit removed: %s", change.Unit.ID()) + cm.deleteUnit(change.Unit) + } + } + + } +} + +func (cm *BeatV2Manager) stopBeat() { + if !cm.isRunning { + return + } + // will we ever get a Unit removed for anything other than the main beat? + // Individual reloaders don't have a "stop" function, so the most we can do + // is just shut down a beat, I think. + cm.logger.Debugf("Stopping beat") + // stop the "main" beat runtime + unit, mainExists := cm.getMainUnit() + if mainExists { + _ = unit.UpdateState(client.UnitStateStopping, "stopping beat", nil) + } + + cm.isRunning = false + cm.stopMut.Lock() + defer cm.stopMut.Unlock() + if cm.stopFunc != nil { + // I'm not 100% sure the once here is needed, + // but various beats tend to handle this in a not-quite-safe way + cm.beatStop.Do(cm.stopFunc) + } + cm.client.Stop() + + if mainExists { + _ = unit.UpdateState(client.UnitStateStopped, "stopped beat", nil) + } + +} + +func (cm *BeatV2Manager) handleUnitReload(unit *client.Unit) { + cm.addUnit(unit) + unitType := unit.Type() + + if unitType == client.UnitTypeOutput { + cm.handleOutputReload(unit) + } else if unitType == client.UnitTypeInput { + cm.handleInputReload(unit) + } +} + +// Handle the updated config for an output unit +func (cm *BeatV2Manager) handleOutputReload(unit *client.Unit) { + _, _, rawConfig := unit.Expected() + cm.logger.Debugf("Got Output unit config: %s, ID: %s", rawConfig.Type, rawConfig.Id) + + reloadConfig, err := groupByOutputs(rawConfig) + if err != nil { + errString := fmt.Errorf("Failed to generate config for output: %w", err) + _ = unit.UpdateState(client.UnitStateFailed, errString.Error(), nil) + return + } + // Assuming that the output reloadable isn't a list, see createBeater() in cmd/instance/beat.go + output := cm.registry.GetReloadableOutput() + if output == nil { + _ = unit.UpdateState(client.UnitStateFailed, "failed to find beat reloadable type 'output'", nil) + return + } + + _ = unit.UpdateState(client.UnitStateConfiguring, "reloading output component", nil) + err = output.Reload(reloadConfig) + if err != nil { + errString := fmt.Errorf("Failed to reload component: %w", err) + _ = unit.UpdateState(client.UnitStateFailed, errString.Error(), nil) + return + } + _ = unit.UpdateState(client.UnitStateHealthy, "reloaded output component", nil) +} + +// handle the updated config for an input unit +func (cm *BeatV2Manager) handleInputReload(unit *client.Unit) { + _, _, rawConfig := unit.Expected() + cm.setMainUnitValue(unit) + cm.logger.Debugf("Got Input unit config: %s, ID: %s", rawConfig.Type, rawConfig.Id) + + // Find the V2 inputs we need to reload + // The reloader provides list and non-list types, but all the beats register as lists, + // so just go with that for V2 + obj := cm.registry.GetInputList() + if obj == nil { + _ = unit.UpdateState(client.UnitStateFailed, "failed to find beat reloadable type 'input'", nil) + return + } + _ = unit.UpdateState(client.UnitStateConfiguring, "found reloader for 'input'", nil) + + beatCfg, err := generateBeatConfig(rawConfig, cm.client.AgentInfo()) + if err != nil { + errString := fmt.Errorf("Failed to create Unit config: %w", err) + _ = unit.UpdateState(client.UnitStateFailed, errString.Error(), nil) + return + } + + err = obj.Reload(beatCfg) + if err != nil { + errString := fmt.Errorf("Error reloading input: %w", err) + _ = unit.UpdateState(client.UnitStateFailed, errString.Error(), nil) + return + } + _ = unit.UpdateState(client.UnitStateHealthy, "beat reloaded", nil) +} diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go deleted file mode 100644 index b05d70fab2da..000000000000 --- a/x-pack/libbeat/management/manager_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package management - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/elastic-agent-client/v7/pkg/proto" - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/mapstr" - - "github.com/elastic/beats/v7/libbeat/common/reload" - lbmanagement "github.com/elastic/beats/v7/libbeat/management" -) - -func TestConfigBlocks(t *testing.T) { - input := ` -filebeat: - inputs: - - type: log - paths: - - /var/log/hello1.log - - /var/log/hello2.log -output: - elasticsearch: - hosts: - - localhost:9200` - - var cfg mapstr.M - uconfig, err := conf.NewConfigFrom(input) - if err != nil { - t.Fatalf("Config blocks unsuccessfully generated: %+v", err) - } - - err = uconfig.Unpack(&cfg) - if err != nil { - t.Fatalf("Config blocks unsuccessfully generated: %+v", err) - } - - reg := reload.NewRegistry() - reg.Register("output", &dummyReloadable{}) - reg.Register("filebeat.inputs", &dummyReloadable{}) - - cm := &Manager{ - registry: reg, - } - blocks, err := cm.toConfigBlocks(cfg) - if err != nil { - t.Fatalf("Config blocks unsuccessfully generated: %+v", err) - } - - if len(blocks) != 2 { - t.Fatalf("Expected 2 block have %d: %+v", len(blocks), blocks) - } -} - -func TestStatusToProtoStatus(t *testing.T) { - assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(lbmanagement.Unknown)) - assert.Equal(t, proto.StateObserved_STARTING, statusToProtoStatus(lbmanagement.Starting)) - assert.Equal(t, proto.StateObserved_CONFIGURING, statusToProtoStatus(lbmanagement.Configuring)) - assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(lbmanagement.Running)) - assert.Equal(t, proto.StateObserved_DEGRADED, statusToProtoStatus(lbmanagement.Degraded)) - assert.Equal(t, proto.StateObserved_FAILED, statusToProtoStatus(lbmanagement.Failed)) - assert.Equal(t, proto.StateObserved_STOPPING, statusToProtoStatus(lbmanagement.Stopping)) -} - -type dummyReloadable struct{} - -func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error { - return nil -} diff --git a/x-pack/libbeat/management/plugin.go b/x-pack/libbeat/management/plugin.go index 8568f7029f6d..95286be4d820 100644 --- a/x-pack/libbeat/management/plugin.go +++ b/x-pack/libbeat/management/plugin.go @@ -11,17 +11,17 @@ import ( ) func init() { - lbmanagement.Register("x-pack-fleet", NewFleetManagerPlugin, feature.Beta) + lbmanagement.Register("x-pack-fleet", NewFleetManagerPluginV2, feature.Beta) } -// NewFleetManagerPlugin creates a plugin function returning factory if configuration matches the criteria -func NewFleetManagerPlugin(config *conf.C) lbmanagement.FactoryFunc { - c := defaultConfig() +// NewFleetManagerPluginV2 registers the V2 callback +func NewFleetManagerPluginV2(config *conf.C) lbmanagement.FactoryFunc { + c := DefaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { return nil } - return NewFleetManager + return NewV2AgentManager } return nil diff --git a/x-pack/libbeat/management/tests/fbtest/filebeat_v2_test.go b/x-pack/libbeat/management/tests/fbtest/filebeat_v2_test.go new file mode 100644 index 000000000000..7d8a73fa5c28 --- /dev/null +++ b/x-pack/libbeat/management/tests/fbtest/filebeat_v2_test.go @@ -0,0 +1,115 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package fbtest + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + + fbroot "github.com/elastic/beats/v7/x-pack/filebeat/cmd" + // initialize the plugin system before libbeat does, so we can overwrite it properly + _ "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +var expectedFBStreams = &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "logfile", + Id: "logfile-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, +} + +func TestFilebeat(t *testing.T) { + filebeatCmd := fbroot.Filebeat() + tests.InitBeatsForTest(t, filebeatCmd) + var fbStreams = []*proto.Stream{ + { + Id: "logfile-system.syslog-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.syslog", + Type: "logs", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "paths": []interface{}{"./testdata/messages"}, + "exclude_files": []interface{}{".gz$"}, + "multiline": map[string]interface{}{ + "pattern": `^\s`, + "match": "after", + }, + }), + }, + { + Id: "logfile-system.auth-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.auth", + Type: "logs", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "paths": []interface{}{"./testdata/secure*"}, + "exclude_files": []interface{}{".gz$"}, + "multiline": map[string]interface{}{ + "pattern": `^\s`, + "match": "after", + }, + }), + }, + } + + expectedFBStreams.Streams = fbStreams + outPath, server := tests.SetupTestEnv(t, expectedFBStreams, time.Second*6) + defer server.Srv.Stop() + + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + t.Logf("Running beats...") + err := filebeatCmd.Execute() + require.NoError(t, err) + + t.Logf("Reading events...") + events := tests.ReadEvents(t, outPath) + t.Logf("Got %d events", len(events)) + // Look for processors + expectedMetaValuesSyslog := map[string]interface{}{ + // Processors created by + "@metadata.input_id": "logfile-system-default-system", + "@metadata.stream_id": "logfile-system.syslog-default-system", + "agent.id": "test-agent", + "data_stream.dataset": "system.syslog", + "data_stream.namespace": "default", + "data_stream.type": "logs", + } + tests.ValuesExist(t, expectedMetaValuesSyslog, events, tests.ONCE) + + expectedMetaValuesAuth := map[string]interface{}{ + // Processors created by + "@metadata.input_id": "logfile-system-default-system", + "@metadata.stream_id": "logfile-system.auth-default-system", + "agent.id": "test-agent", + "data_stream.dataset": "system.auth", + } + tests.ValuesExist(t, expectedMetaValuesAuth, events, tests.ONCE) + + expectedLogValues := map[string]interface{}{ + "log.file.path": nil, + "message": nil, + } + tests.ValuesExist(t, expectedLogValues, events, tests.ONCE) +} diff --git a/x-pack/libbeat/management/tests/fbtest/testdata/messages b/x-pack/libbeat/management/tests/fbtest/testdata/messages new file mode 100644 index 000000000000..6eacd05dada0 --- /dev/null +++ b/x-pack/libbeat/management/tests/fbtest/testdata/messages @@ -0,0 +1,62 @@ +Aug 7 00:00:00 test-server systemd[1]: Starting unbound-anchor.service - update of the root trust anchor for DNSSEC validation in unbound... +Aug 7 00:00:00 test-server audit: BPF prog-id=1328 op=LOAD +Aug 7 00:00:00 test-server systemd[1]: Starting logrotate.service - Rotate log files... +Aug 7 00:00:00 test-server systemd[1]: unbound-anchor.service: Deactivated successfully. +Aug 7 00:00:00 test-server systemd[1]: Finished unbound-anchor.service - update of the root trust anchor for DNSSEC validation in unbound. +Aug 7 00:00:00 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=unbound-anchor comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:00:00 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=unbound-anchor comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:00:00 test-server systemd[1]: rsyslog.service: Sent signal SIGHUP to main process 978 (rsyslogd) on client request. +Aug 7 00:00:00 test-server rsyslogd[978]: [origin software="rsyslogd" swVersion="8.2204.0-2.fc36" x-pid="978" x-info="https://www.rsyslog.com"] rsyslogd was HUPed +Aug 7 00:00:00 test-server systemd[1]: logrotate.service: Deactivated successfully. +Aug 7 00:00:00 test-server systemd[1]: Finished logrotate.service - Rotate log files. +Aug 7 00:00:00 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=logrotate comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:00:00 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=logrotate comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:00:00 test-server audit: BPF prog-id=0 op=UNLOAD +Aug 7 00:08:00 test-server systemd[1]: Starting pmie_daily.service - Process PMIE logs... +Aug 7 00:08:00 test-server systemd[1]: Started pmie_daily.service - Process PMIE logs. +Aug 7 00:08:00 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmie_daily comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:08:01 test-server systemd[1]: pmie_daily.service: Deactivated successfully. +Aug 7 00:08:01 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmie_daily comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:09:19 test-server systemd-logind[984]: Session 389 logged out. Waiting for processes to exit. +Aug 7 00:09:19 test-server systemd[1]: session-389.scope: Deactivated successfully. +Aug 7 00:09:19 test-server systemd-logind[984]: Removed session 389. +Aug 7 00:09:21 test-server systemd-logind[984]: New session 390 of user alexk. +Aug 7 00:09:21 test-server systemd[1]: Started session-390.scope - Session 390 of User alexk. +Aug 7 00:09:21 test-server audit: BPF prog-id=1329 op=LOAD +Aug 7 00:09:21 test-server audit: BPF prog-id=1330 op=LOAD +Aug 7 00:09:21 test-server systemd[1]: Starting systemd-hostnamed.service - Hostname Service... +Aug 7 00:09:21 test-server systemd[1]: Started systemd-hostnamed.service - Hostname Service. +Aug 7 00:09:21 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=systemd-hostnamed comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:09:51 test-server systemd[1]: systemd-hostnamed.service: Deactivated successfully. +Aug 7 00:09:51 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=systemd-hostnamed comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:09:51 test-server audit: BPF prog-id=0 op=UNLOAD +Aug 7 00:09:51 test-server audit: BPF prog-id=0 op=UNLOAD +Aug 7 00:10:00 test-server systemd[1]: Starting pmlogger_daily.service - Process archive logs... +Aug 7 00:10:00 test-server systemd[1]: Started pmlogger_daily.service - Process archive logs. +Aug 7 00:10:00 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_daily comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:10:03 test-server systemd[1]: pmlogger_daily.service: Deactivated successfully. +Aug 7 00:10:03 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_daily comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:10:03 test-server systemd[1]: pmlogger_daily.service: Consumed 1.905s CPU time. +Aug 7 00:11:11 test-server systemd[1]: run-docker-runtime\x2drunc-moby-c2afbe542c67dd805b6017d6e225bf18d830393752e82d9e97351f6b396c0be2-runc.QxRbu3.mount: Deactivated successfully. +Aug 7 00:11:34 test-server systemd[1]: run-docker-runtime\x2drunc-moby-6435295863b92ffce5f9312196b9fe3153cfdeb385b4cea8a06f9532db8ef457-runc.xOrRCg.mount: Deactivated successfully. +Aug 7 00:17:26 test-server systemd[1]: run-docker-runtime\x2drunc-moby-c2afbe542c67dd805b6017d6e225bf18d830393752e82d9e97351f6b396c0be2-runc.QQ0qVl.mount: Deactivated successfully. +Aug 7 00:19:48 test-server systemd[1]: run-docker-runtime\x2drunc-moby-6435295863b92ffce5f9312196b9fe3153cfdeb385b4cea8a06f9532db8ef457-runc.4vtJU0.mount: Deactivated successfully. +Aug 7 00:21:12 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=dnf-makecache comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:21:12 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=dnf-makecache comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:21:12 test-server systemd[1]: dnf-makecache.service: Consumed 16.261s CPU time. +Aug 7 00:22:22 test-server NetworkManager[1141]: [1659856942.7446] dhcp4 (eno1): state changed new lease, address=192.168.1.96 +Aug 7 00:22:52 test-server systemd[1]: run-docker-runtime\x2drunc-moby-6435295863b92ffce5f9312196b9fe3153cfdeb385b4cea8a06f9532db8ef457-runc.8ba3LQ.mount: Deactivated successfully. +Aug 7 00:22:59 test-server systemd[1]: run-docker-runtime\x2drunc-moby-6435295863b92ffce5f9312196b9fe3153cfdeb385b4cea8a06f9532db8ef457-runc.N1risu.mount: Deactivated successfully. +Aug 7 00:23:29 test-server systemd[1]: run-docker-runtime\x2drunc-moby-56c445f9a6c907b6e46555e448111001f8b8ee4d5e44d94380db44702837789d-runc.VXLgpv.mount: Deactivated successfully. +Aug 7 00:24:21 test-server systemd[1]: run-docker-runtime\x2drunc-moby-c2afbe542c67dd805b6017d6e225bf18d830393752e82d9e97351f6b396c0be2-runc.ia9DaI.mount: Deactivated successfully. +Aug 7 00:25:00 test-server systemd[1]: Starting pmlogger_check.service - Check pmlogger instances are running... +Aug 7 00:25:00 test-server systemd[1]: Started pmlogger_check.service - Check pmlogger instances are running. +Aug 7 00:25:00 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_check comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:25:01 test-server systemd[1]: pmlogger_check.service: Deactivated successfully. +Aug 7 00:25:01 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_check comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:25:10 test-server systemd[1]: Starting pmlogger_farm_check.service - Check and migrate non-primary pmlogger farm instances... +Aug 7 00:25:10 test-server systemd[1]: Started pmlogger_farm_check.service - Check and migrate non-primary pmlogger farm instances. +Aug 7 00:25:10 test-server audit[1]: SERVICE_START pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_farm_check comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:25:10 test-server systemd[1]: pmlogger_farm_check.service: Deactivated successfully. +Aug 7 00:25:10 test-server audit[1]: SERVICE_STOP pid=1 uid=0 auid=4294967295 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='unit=pmlogger_farm_check comm="systemd" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success' +Aug 7 00:26:30 test-server systemd[1]: run-docker-runtime\x2drunc-moby-c2afbe542c67dd805b6017d6e225bf18d830393752e82d9e97351f6b396c0be2-runc.OqtwmI.mount: Deactivated successfully. \ No newline at end of file diff --git a/x-pack/libbeat/management/tests/fbtest/testdata/secure b/x-pack/libbeat/management/tests/fbtest/testdata/secure new file mode 100644 index 000000000000..8701850320c2 --- /dev/null +++ b/x-pack/libbeat/management/tests/fbtest/testdata/secure @@ -0,0 +1,30 @@ +Aug 14 00:06:20 shoebill sshd[2305664]: Received disconnect from 192.168.1.141 port 63067:11: disconnected by user +Aug 14 00:06:20 shoebill sshd[2305664]: Disconnected from user user 192.168.1.141 port 63067 +Aug 14 00:06:20 shoebill sshd[2305661]: pam_unix(sshd:session): session closed for user user +Aug 14 00:06:22 shoebill sshd[2355631]: Accepted publickey for user from 192.168.1.141 port 63134 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 00:06:22 shoebill sshd[2355631]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) +Aug 14 00:24:21 shoebill sshd[2355634]: Received disconnect from 192.168.1.141 port 63134:11: disconnected by user +Aug 14 00:24:21 shoebill sshd[2355634]: Disconnected from user user 192.168.1.141 port 63134 +Aug 14 00:24:21 shoebill sshd[2355631]: pam_unix(sshd:session): session closed for user user +Aug 14 00:24:23 shoebill sshd[2406593]: Accepted publickey for user from 192.168.1.141 port 63206 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 00:24:23 shoebill sshd[2406593]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) +Aug 14 00:43:04 shoebill sshd[2406596]: Received disconnect from 192.168.1.141 port 63206:11: disconnected by user +Aug 14 00:43:04 shoebill sshd[2406596]: Disconnected from user user 192.168.1.141 port 63206 +Aug 14 00:43:04 shoebill sshd[2406593]: pam_unix(sshd:session): session closed for user user +Aug 14 00:43:06 shoebill sshd[2459763]: Accepted publickey for user from 192.168.1.141 port 63275 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 00:43:06 shoebill sshd[2459763]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) +Aug 14 01:00:56 shoebill sshd[2459766]: Received disconnect from 192.168.1.141 port 63275:11: disconnected by user +Aug 14 01:00:56 shoebill sshd[2459766]: Disconnected from user user 192.168.1.141 port 63275 +Aug 14 01:00:56 shoebill sshd[2459763]: pam_unix(sshd:session): session closed for user user +Aug 14 01:00:58 shoebill sshd[2510537]: Accepted publickey for user from 192.168.1.141 port 63343 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 01:00:58 shoebill sshd[2510537]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) +Aug 14 01:19:28 shoebill sshd[2510571]: Received disconnect from 192.168.1.141 port 63343:11: disconnected by user +Aug 14 01:19:28 shoebill sshd[2510571]: Disconnected from user user 192.168.1.141 port 63343 +Aug 14 01:19:28 shoebill sshd[2510537]: pam_unix(sshd:session): session closed for user user +Aug 14 01:19:30 shoebill sshd[2562541]: Accepted publickey for user from 192.168.1.141 port 63413 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 01:19:30 shoebill sshd[2562541]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) +Aug 14 01:36:58 shoebill sshd[2562544]: Received disconnect from 192.168.1.141 port 63413:11: disconnected by user +Aug 14 01:36:58 shoebill sshd[2562544]: Disconnected from user user 192.168.1.141 port 63413 +Aug 14 01:36:58 shoebill sshd[2562541]: pam_unix(sshd:session): session closed for user user +Aug 14 01:37:00 shoebill sshd[2612276]: Accepted publickey for user from 192.168.1.141 port 63477 ssh2: RSA SHA256:4PSoxRMl32R2k+79E7+Fufpm2bjX9Q+d2aI3F8GbNyI +Aug 14 01:37:00 shoebill sshd[2612276]: pam_unix(sshd:session): session opened for user user(uid=1000) by (uid=0) \ No newline at end of file diff --git a/x-pack/libbeat/management/tests/init.go b/x-pack/libbeat/management/tests/init.go new file mode 100644 index 000000000000..0344ccc1de16 --- /dev/null +++ b/x-pack/libbeat/management/tests/init.go @@ -0,0 +1,88 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package tests + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/feature" + lbmanagement "github.com/elastic/beats/v7/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + conf "github.com/elastic/elastic-agent-libs/config" +) + +var defaultFleetName = "x-pack-fleet" + +// InitBeatsForTest tinkers with a bunch of global variables so beats will start up properly in a test environment +func InitBeatsForTest(t *testing.T, beatRoot *cmd.BeatsRootCmd) { + // this is a tad hacky, but the go test environment will attempt to insert a bunch of CLI args into the executable, + // which beats's CLI library will then choke on + os.Args = os.Args[:1] + + // Set CLI flags needed to run the tests + t.Logf("Setting flags...") + err := beatRoot.PersistentFlags().Set("e", "true") + require.NoError(t, err) + err = beatRoot.PersistentFlags().Set("E", "management.enabled=true") + require.NoError(t, err) + err = beatRoot.PersistentFlags().Set("d", "centralmgmt.V2-manager") + require.NoError(t, err) +} + +// ResetFleetManager re-registers the global fleet handler, if needed, and replace it with the test one. +func ResetFleetManager(handler MockV2Handler) error { + managers, err := feature.GlobalRegistry().LookupAll(lbmanagement.Namespace) + if err != nil { + return fmt.Errorf("error finding management plugin: %w", err) + } + if managers != nil && managers[0].Name() == defaultFleetName { + _ = feature.GlobalRegistry().Unregister(lbmanagement.Namespace, defaultFleetName) + } + lbmanagement.Register("fleet-test", fleetClientFactory(handler), feature.Beta) + return nil +} + +func fleetClientFactory(srv MockV2Handler) lbmanagement.PluginFunc { + return func(config *conf.C) lbmanagement.FactoryFunc { + c := management.DefaultConfig() + if config.Enabled() { + if err := config.Unpack(&c); err != nil { + return nil + } + return func(_ *conf.C, registry *reload.Registry, beatUUID uuid.UUID) (lbmanagement.Manager, error) { + return management.NewV2AgentManagerWithClient(c, registry, srv.Client) + } + } + return nil + } +} + +// SetupTestEnv is a helper to initialize the common files and handlers for metricbeat. +// This returns a string to the tmpdir location +func SetupTestEnv(t *testing.T, config *proto.UnitExpectedConfig, runtime time.Duration) (string, MockV2Handler) { + tmpdir := os.TempDir() + filename := fmt.Sprintf("test-%d", time.Now().Unix()) + outPath := filepath.Join(tmpdir, filename) + t.Logf("writing output to file %s", outPath) + err := os.Mkdir(outPath, 0775) + require.NoError(t, err) + + server := NewMockServer(t, runtime, config, outPath) + t.Logf("Resetting fleet manager...") + err = ResetFleetManager(server) + require.NoError(t, err) + + return outPath, server +} diff --git a/x-pack/libbeat/management/tests/mbtest/metricbeat_v2_test.go b/x-pack/libbeat/management/tests/mbtest/metricbeat_v2_test.go new file mode 100644 index 000000000000..05d206f84d9e --- /dev/null +++ b/x-pack/libbeat/management/tests/mbtest/metricbeat_v2_test.go @@ -0,0 +1,124 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package mbtest + +import ( + "os" + "testing" + "time" + + "github.com/stretchr/testify/require" + // initialize the plugin system before libbeat does, so we can overwrite it properly + _ "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/beats/v7/x-pack/libbeat/management/tests" + "github.com/elastic/beats/v7/x-pack/metricbeat/cmd" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +var expectedMBStreams = &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "system/metrics", + Id: "system/metrics-system-default-system", + Name: "system-1", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, +} + +func TestSingleMetricbeatMetricsetWithProcessors(t *testing.T) { + tests.InitBeatsForTest(t, cmd.RootCmd) + var mbStreams = []*proto.Stream{ + { + Id: "system/metrics-system.cpu-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.cpu", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"cpu"}, + "period": "2s", + "processors": []interface{}{ + map[string]interface{}{ + "add_fields": map[string]interface{}{ + "fields": map[string]interface{}{"testfield": true}, + "target": "@metadata", + }, + }, + }, + }), + }, + { + Id: "system/metrics-system.memory-default-system", + DataStream: &proto.DataStream{ + Dataset: "system.memory", + Type: "metrics", + }, + Source: tests.RequireNewStruct(map[string]interface{}{ + "metricsets": []interface{}{"memory"}, + "period": "2s", + }), + }, + } + + expectedMBStreams.Streams = mbStreams + outPath, server := tests.SetupTestEnv(t, expectedMBStreams, time.Second*6) + + defer server.Srv.Stop() + defer func() { + err := os.RemoveAll(outPath) + require.NoError(t, err) + }() + + // After runfor seconds, this should shut down, allowing us to check the output + t.Logf("Running beats...") + err := cmd.RootCmd.Execute() + require.NoError(t, err) + + t.Logf("Reading events...") + events := tests.ReadEvents(t, outPath) + t.Logf("Got %d events", len(events)) + + // Look for processors + expectedCPUMetaValues := map[string]interface{}{ + // Processors created by + "@metadata.input_id": "system/metrics-system-default-system", + "@metadata.stream_id": "system/metrics-system.cpu-default-system", + "agent.id": "test-agent", + "data_stream.dataset": "system.cpu", + "data_stream.namespace": "default", + "data_stream.type": "metrics", + // make sure the V2 shim isn't overwriting any custom processors + "@metadata.testfield": true, + } + tests.ValuesExist(t, expectedCPUMetaValues, events, tests.ONCE) + + expectedMemoryMetaValues := map[string]interface{}{ + "@metadata.stream_id": "system/metrics-system.memory-default-system", + "data_stream.dataset": "system.memory", + } + tests.ValuesExist(t, expectedMemoryMetaValues, events, tests.ONCE) + + // Look for proper CPU/memory config + expectedCPU := map[string]interface{}{ + "system.cpu.cores": nil, + "system.cpu.total": nil, + "system.memory.actual.free": nil, + } + tests.ValuesExist(t, expectedCPU, events, tests.ONCE) + + // If there's a config issue, metricbeat will fallback to default metricsets. Make sure they don't exist. + disabledMetricsets := []string{ + "system.process", + "system.load", + "system.process_summary", + } + tests.ValuesDoNotExist(t, disabledMetricsets, events) +} diff --git a/x-pack/libbeat/management/tests/mock_server.go b/x-pack/libbeat/management/tests/mock_server.go new file mode 100644 index 000000000000..0948704c6406 --- /dev/null +++ b/x-pack/libbeat/management/tests/mock_server.go @@ -0,0 +1,159 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package tests + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/structpb" +) + +// MockV2Handler wraps the basic tooling needed to handle a fake V2 controller +type MockV2Handler struct { + Srv mock.StubServerV2 + Client client.V2 +} + +// NewMockServer returns a mocked elastic-agent V2 controller +func NewMockServer(t *testing.T, runtime time.Duration, inputConfig *proto.UnitExpectedConfig, outPath string) MockV2Handler { + unitOneID := mock.NewID() + unitOutID := mock.NewID() + + token := mock.NewID() + //var gotConfig bool + + var mut sync.Mutex + + var logOutputStream = &proto.UnitExpectedConfig{ + DataStream: &proto.DataStream{ + Namespace: "default", + }, + Type: "file", + Revision: 1, + Meta: &proto.Meta{ + Package: &proto.Package{ + Name: "system", + Version: "1.17.0", + }, + }, + Source: RequireNewStruct(map[string]interface{}{ + "type": "file", + "enabled": true, + "path": outPath, + "filename": "beat-out", + "number_of_files": 7, + }), + } + + start := time.Now() + + var stateIndex uint64 = 1 + srv := mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + mut.Lock() + defer mut.Unlock() + if observed.Token == token { + + // initial checkin + if len(observed.Units) == 0 || observed.Units[0].State == proto.State_STARTING { + return sendUnitsWithState(proto.State_HEALTHY, inputConfig, logOutputStream, unitOneID, unitOutID, stateIndex) + } else if checkUnitStateHealthy(observed.Units) { + + if time.Since(start) > runtime { + //remove the units once they've been healthy for a given period of time + return sendUnitsWithState(proto.State_STOPPED, inputConfig, logOutputStream, unitOneID, unitOutID, stateIndex+1) + } + //otherwise, just remove the units + } else if observed.Units[0].State == proto.State_STOPPED { + return &proto.CheckinExpected{ + Units: nil, + } + } else if observed.Units[0].State == proto.State_FAILED { + + return &proto.CheckinExpected{ + Units: nil, + } + } + + } + + return nil + }, + ActionImpl: func(response *proto.ActionResponse) error { + return nil + }, + ActionsChan: make(chan *mock.PerformAction, 100), + } // end of srv declaration + + // The start() needs to happen here, since the client needs the assigned server port + err := srv.Start() + require.NoError(t, err) + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), token, client.VersionInfo{ + Name: "program", + Version: "v1.0.0", + Meta: map[string]string{ + "key": "value", + }, + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + + return MockV2Handler{Srv: srv, Client: client} +} + +// helper to wrap the CheckinExpected config we need with every refresh of the mock server +func sendUnitsWithState(state proto.State, input, output *proto.UnitExpectedConfig, inId, outId string, stateIndex uint64) *proto.CheckinExpected { + return &proto.CheckinExpected{ + AgentInfo: &proto.CheckinAgentInfo{ + Id: "test-agent", + Version: "8.4.0", + Snapshot: true, + }, + Units: []*proto.UnitExpected{ + { + Id: inId, + Type: proto.UnitType_INPUT, + ConfigStateIdx: stateIndex, + Config: input, + State: state, + LogLevel: proto.UnitLogLevel_DEBUG, + }, + { + Id: outId, + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: stateIndex, + Config: output, + State: state, + }, + }, + } +} + +func checkUnitStateHealthy(units []*proto.UnitObserved) bool { + for _, unit := range units { + if unit.State != proto.State_HEALTHY { + return false + } + } + return true +} + +//RequireNewStruct converts a mapstr to a protobuf struct +func RequireNewStruct(v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + if err != nil { + panic(err) + } + return str +} diff --git a/x-pack/libbeat/management/tests/output_read.go b/x-pack/libbeat/management/tests/output_read.go new file mode 100644 index 000000000000..e5c7649de6bc --- /dev/null +++ b/x-pack/libbeat/management/tests/output_read.go @@ -0,0 +1,94 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package tests + +import ( + "encoding/json" + "errors" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type findFieldsMode string + +const ALL findFieldsMode = "all" +const ONCE findFieldsMode = "once" + +// ReadEvents reads the ndjson output we get from the beats file output +func ReadEvents(t *testing.T, path string) []mapstr.M { + files, err := filepath.Glob(filepath.Join(path, "*.ndjson")) + require.NoError(t, err) + + events := []mapstr.M{} + for _, file := range files { + rawFile, err := os.ReadFile(file) + require.NoError(t, err) + lines := strings.Split(string(rawFile), "\n") + for _, line := range lines { + var event = mapstr.M{} + // skip newlines that appear at the end of files + if len(line) < 2 { + continue + } + err = json.Unmarshal([]byte(line), &event) + require.NoError(t, err) + events = append(events, event) + } + } + return events +} + +// ValuesExist verifies that the given fields exist in the events. +// the values map takes keys in the form of keys in the events map, which may be in dot form: "system.cpu.cores", etc. +// The value for the map should be the expected value, or a `nil` if you merely want to check for the presence of a field. +// the mode determines if `ValuesExist` must exist in all events, or just one. +func ValuesExist(t *testing.T, values map[string]interface{}, events []mapstr.M, mode findFieldsMode) { + for searchKey, val := range values { + var foundCount = 0 + for eventIter, event := range events { + evt, err := event.GetValue(searchKey) + if errors.Is(err, mapstr.ErrKeyNotFound) { + continue + } + if val == nil { + foundCount++ + } else { + if val == evt { + foundCount++ + } else if val != evt && mode == ALL { + t.Errorf("Key %s was found in event %d, but value was unexpected. Expected %#v, got %#v", searchKey, eventIter, val, evt) + } + } + } + if mode == ALL { + if foundCount != len(events) { + t.Errorf("Expected to find key %s in all %d events, but key was only found %d times.", searchKey, len(events), foundCount) + } + } + if mode == ONCE { + if foundCount == 0 { + t.Errorf("Did not find key %s in any events", searchKey) + } + } + } +} + +// ValuesDoNotExist checks to make sure that the given keys do not exist in any events. +func ValuesDoNotExist(t *testing.T, values []string, events []mapstr.M) { + for _, key := range values { + for eventIter, event := range events { + evt, _ := event.GetValue(key) + if evt != nil { + t.Errorf("key %s with value %#v was found in event %d in the output", key, evt, eventIter) + } + } + } +} diff --git a/x-pack/metricbeat/cmd/agent.go b/x-pack/metricbeat/cmd/agent.go new file mode 100644 index 000000000000..38a7f51e99e5 --- /dev/null +++ b/x-pack/metricbeat/cmd/agent.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "fmt" + "strings" + + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func metricbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + modules, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } + + // Extract the module name from the type, usually in the form system/metric + module := strings.Split(rawIn.Type, "/")[0] + + for iter := range modules { + modules[iter]["module"] = module + } + + // format for the reloadable list needed bythe cm.Reload() method + configList, err := management.CreateReloadConfigFromInputs(modules) + if err != nil { + return nil, fmt.Errorf("error creating reloader config: %w", err) + } + + return configList, nil +} diff --git a/x-pack/metricbeat/cmd/root.go b/x-pack/metricbeat/cmd/root.go index 5f2df6c025eb..99fdd9cf49de 100644 --- a/x-pack/metricbeat/cmd/root.go +++ b/x-pack/metricbeat/cmd/root.go @@ -16,6 +16,7 @@ import ( "github.com/elastic/beats/v7/metricbeat/beater" mbcmd "github.com/elastic/beats/v7/metricbeat/cmd" "github.com/elastic/beats/v7/metricbeat/cmd/test" + "github.com/elastic/beats/v7/x-pack/libbeat/management" "github.com/elastic/elastic-agent-libs/mapstr" // Register the includes. @@ -43,6 +44,7 @@ var withECSVersion = processing.WithFields(mapstr.M{ }) func init() { + management.ConfigTransform.SetTransform(metricbeatCfg) var runFlags = pflag.NewFlagSet(Name, pflag.ExitOnError) runFlags.AddGoFlag(flag.CommandLine.Lookup("system.hostfs")) settings := instance.Settings{ diff --git a/x-pack/osquerybeat/cmd/root.go b/x-pack/osquerybeat/cmd/root.go index 06cb4ab4cb7c..3780ef59efc8 100644 --- a/x-pack/osquerybeat/cmd/root.go +++ b/x-pack/osquerybeat/cmd/root.go @@ -5,11 +5,17 @@ package cmd import ( + "fmt" + cmd "github.com/elastic/beats/v7/libbeat/cmd" "github.com/elastic/beats/v7/libbeat/cmd/instance" "github.com/elastic/beats/v7/libbeat/common/cli" + "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/ecs" "github.com/elastic/beats/v7/libbeat/publisher/processing" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -35,6 +41,7 @@ var withECSVersion = processing.WithFields(mapstr.M{ var RootCmd = Osquerybeat() func Osquerybeat() *cmd.BeatsRootCmd { + management.ConfigTransform.SetTransform(osquerybeatCfg) settings := instance.Settings{ Name: Name, Processing: processing.MakeDefaultSupport(true, withECSVersion, processing.WithHost, processing.WithAgentMeta()), @@ -63,3 +70,29 @@ func genVerifyCmd(_ instance.Settings) *cobra.Command { }), } } + +func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + // Convert to streams, osquerybeat doesn't use streams + streams := make([]*proto.Stream, 1) + streams[0] = &proto.Stream{ + Source: rawIn.GetSource(), + Id: rawIn.GetId(), + DataStream: rawIn.GetDataStream(), + } + rawIn.Streams = streams + + modules, err := management.CreateInputsFromStreams(rawIn, "osquery", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } + for iter := range modules { + modules[iter]["type"] = "log" + } + + // format for the reloadable list needed bythe cm.Reload() method + configList, err := management.CreateReloadConfigFromInputs(modules) + if err != nil { + return nil, fmt.Errorf("error creating config for reloader: %w", err) + } + return configList, nil +} diff --git a/x-pack/osquerybeat/internal/config/watcher.go b/x-pack/osquerybeat/internal/config/watcher.go index 1e1fbf856e33..4b4aebdcd316 100644 --- a/x-pack/osquerybeat/internal/config/watcher.go +++ b/x-pack/osquerybeat/internal/config/watcher.go @@ -65,7 +65,7 @@ func WatchInputs(ctx context.Context, log *logp.Logger) <-chan []InputConfig { log: log, ch: ch, } - reload.Register.MustRegisterList("inputs", r) + reload.RegisterV2.MustRegisterInput(r) return ch } diff --git a/x-pack/packetbeat/cmd/root.go b/x-pack/packetbeat/cmd/root.go index 407d24570df4..05c878caaabe 100644 --- a/x-pack/packetbeat/cmd/root.go +++ b/x-pack/packetbeat/cmd/root.go @@ -5,8 +5,15 @@ package cmd import ( + "fmt" + "github.com/elastic/beats/v7/libbeat/cmd" + "github.com/elastic/beats/v7/libbeat/common/reload" packetbeatCmd "github.com/elastic/beats/v7/packetbeat/cmd" + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + conf "github.com/elastic/elastic-agent-libs/config" _ "github.com/elastic/beats/v7/x-pack/libbeat/include" @@ -20,7 +27,32 @@ var Name = packetbeatCmd.Name // RootCmd to handle beats cli var RootCmd *cmd.BeatsRootCmd +// packetbeatCfg is a callback registered via SetTransform that returns a packetbeat Elastic Agent client.Unit +// configuration generated from a raw Elastic Agent config +func packetbeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + //grab and properly format the input streams + inputStreams, err := management.CreateInputsFromStreams(rawIn, "metrics", agentInfo) + if err != nil { + return nil, fmt.Errorf("error generating new stream config: %w", err) + } + + // Packetbeat does its own transformations, + // so update the existing config with our new transformations, + // then send to packetbeat + souceMap := rawIn.Source.AsMap() + souceMap["streams"] = inputStreams + + uconfig, err := conf.NewConfigFrom(souceMap) + if err != nil { + return nil, fmt.Errorf("error in conversion to conf.C: %w", err) + } + return []*reload.ConfigWithMeta{{Config: uconfig}}, nil +} + func init() { + // Register packetbeat with central management to perform any needed config + // transformations before agent configs are sent to the beat during reload. + management.ConfigTransform.SetTransform(packetbeatCfg) settings := packetbeatCmd.PacketbeatSettings() settings.ElasticLicensed = true RootCmd = packetbeatCmd.Initialize(settings)