diff --git a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl index bfb84102e3c..5daf9d81432 100644 --- a/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl +++ b/x-pack/elastic-agent/_meta/config/common.reference.p2.yml.tmpl @@ -121,7 +121,7 @@ inputs: # port: 6791 # # Allow fleet to reload its configuration locally on disk. -# # Notes: Only specific process configuration will be reloaded. +# # Notes: Only specific process configuration and external input configurations will be reloaded. # agent.reload: # # enabled configure the Elastic Agent to reload or not the local configuration. # # diff --git a/x-pack/elastic-agent/elastic-agent.reference.yml b/x-pack/elastic-agent/elastic-agent.reference.yml index 7770b036dba..7b2d5ea586c 100644 --- a/x-pack/elastic-agent/elastic-agent.reference.yml +++ b/x-pack/elastic-agent/elastic-agent.reference.yml @@ -127,7 +127,7 @@ inputs: # port: 6791 # # Allow fleet to reload its configuration locally on disk. -# # Notes: Only specific process configuration will be reloaded. +# # Notes: Only specific process configuration and external input configurations will be reloaded. # agent.reload: # # enabled configure the Elastic Agent to reload or not the local configuration. # # diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go index 89cb816fec9..76ff369ef6a 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_server_bootstrap.go @@ -118,8 +118,9 @@ func newFleetServerBootstrap( return nil, err } + loader := config.NewLoader(log, "") discover := discoverer(pathConfigFile, cfg.Settings.Path) - bootstrapApp.source = newOnce(log, discover, emit) + bootstrapApp.source = newOnce(log, discover, loader, emit) return bootstrapApp, nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index a29977f2e8d..ca8284cdf8e 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -6,6 +6,7 @@ package application import ( "context" + "path/filepath" "go.elastic.co/apm" @@ -117,7 +118,7 @@ func newLocal( return nil, errors.New(err, "failed to initialize composable controller") } - discover := discoverer(pathConfigFile, cfg.Settings.Path) + discover := discoverer(pathConfigFile, cfg.Settings.Path, configuration.ExternalInputsPattern) emit, err := emitter.New( localApplication.bgContext, log, @@ -135,13 +136,15 @@ func newLocal( return nil, err } + loader := config.NewLoader(log, externalConfigsGlob()) + var cfgSource source if !cfg.Settings.Reload.Enabled { log.Debug("Reloading of configuration is off") - cfgSource = newOnce(log, discover, emit) + cfgSource = newOnce(log, discover, loader, emit) } else { log.Debugf("Reloading of configuration is on, frequency is set to %s", cfg.Settings.Reload.Period) - cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, emit) + cfgSource = newPeriodic(log, cfg.Settings.Reload.Period, discover, loader, emit) } localApplication.source = cfgSource @@ -161,6 +164,10 @@ func newLocal( return localApplication, nil } +func externalConfigsGlob() string { + return filepath.Join(paths.Config(), configuration.ExternalInputsPattern) +} + // Routes returns a list of routes handled by agent. func (l *Local) Routes() *sorted.Set { return l.router.Routes() diff --git a/x-pack/elastic-agent/pkg/agent/application/once.go b/x-pack/elastic-agent/pkg/agent/application/once.go index 64ee34e2590..db4af015608 100644 --- a/x-pack/elastic-agent/pkg/agent/application/once.go +++ b/x-pack/elastic-agent/pkg/agent/application/once.go @@ -16,11 +16,12 @@ import ( type once struct { log *logger.Logger discover discoverFunc + loader *config.Loader emitter pipeline.EmitterFunc } -func newOnce(log *logger.Logger, discover discoverFunc, emitter pipeline.EmitterFunc) *once { - return &once{log: log, discover: discover, emitter: emitter} +func newOnce(log *logger.Logger, discover discoverFunc, loader *config.Loader, emitter pipeline.EmitterFunc) *once { + return &once{log: log, discover: discover, loader: loader, emitter: emitter} } func (o *once) Start() error { @@ -33,15 +34,15 @@ func (o *once) Start() error { return ErrNoConfiguration } - return readfiles(context.Background(), files, o.emitter) + return readfiles(context.Background(), files, o.loader, o.emitter) } func (o *once) Stop() error { return nil } -func readfiles(ctx context.Context, files []string, emitter pipeline.EmitterFunc) error { - c, err := config.LoadFiles(files...) +func readfiles(ctx context.Context, files []string, loader *config.Loader, emitter pipeline.EmitterFunc) error { + c, err := loader.Load(files) if err != nil { return errors.New(err, "could not load or merge configuration", errors.TypeConfig) } diff --git a/x-pack/elastic-agent/pkg/agent/application/periodic.go b/x-pack/elastic-agent/pkg/agent/application/periodic.go index f79c09b6e68..119e616bf28 100644 --- a/x-pack/elastic-agent/pkg/agent/application/periodic.go +++ b/x-pack/elastic-agent/pkg/agent/application/periodic.go @@ -11,6 +11,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/pipeline" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/filewatcher" ) @@ -20,6 +21,7 @@ type periodic struct { period time.Duration done chan struct{} watcher *filewatcher.Watch + loader *config.Loader emitter pipeline.EmitterFunc discover discoverFunc } @@ -90,7 +92,7 @@ func (p *periodic) work() error { p.log.Debugf("Unchanged %d files: %s", len(s.Unchanged), strings.Join(s.Updated, ", ")) } - err := readfiles(context.Background(), files, p.emitter) + err := readfiles(context.Background(), files, p.loader, p.emitter) if err != nil { // assume something when really wrong and invalidate any cache // so we get a full new config on next tick. @@ -112,6 +114,7 @@ func newPeriodic( log *logger.Logger, period time.Duration, discover discoverFunc, + loader *config.Loader, emitter pipeline.EmitterFunc, ) *periodic { w, err := filewatcher.New(log, filewatcher.DefaultComparer) @@ -127,6 +130,7 @@ func newPeriodic( done: make(chan struct{}), watcher: w, discover: discover, + loader: loader, emitter: emitter, } } diff --git a/x-pack/elastic-agent/pkg/agent/configuration/settings.go b/x-pack/elastic-agent/pkg/agent/configuration/settings.go index 3081614d995..4f2a17dbede 100644 --- a/x-pack/elastic-agent/pkg/agent/configuration/settings.go +++ b/x-pack/elastic-agent/pkg/agent/configuration/settings.go @@ -5,6 +5,8 @@ package configuration import ( + "path/filepath" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" monitoringCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/monitoring/config" @@ -13,6 +15,9 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) +// ExternalInputsPattern is a glob that matches the paths of external configuration files. +var ExternalInputsPattern = filepath.Join("inputs.d", "*.yml") + // SettingsConfig is an collection of agent settings configuration. type SettingsConfig struct { DownloadConfig *artifact.Config `yaml:"download" config:"download" json:"download"` diff --git a/x-pack/elastic-agent/pkg/config/loader.go b/x-pack/elastic-agent/pkg/config/loader.go new file mode 100644 index 00000000000..8827edd6c3c --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/loader.go @@ -0,0 +1,104 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package config + +import ( + "fmt" + "path/filepath" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/go-ucfg" + "github.com/elastic/go-ucfg/cfgutil" +) + +// Loader is used to load configuration from the paths +// including appending multiple input configurations. +type Loader struct { + logger *logger.Logger + inputsFolder string +} + +// NewLoader creates a new Loader instance to load configuration +// files from different paths. +func NewLoader(logger *logger.Logger, inputsFolder string) *Loader { + return &Loader{logger: logger, inputsFolder: inputsFolder} +} + +// Load iterates over the list of files and loads the confguration from them. +// If a configuration file is under the folder set in `agent.config.inputs.path` +// it is appended to a list. If it is a regular config file, it is merged into +// the result config. The list of input configurations is merged into the result +// last. +func (l *Loader) Load(files []string) (*Config, error) { + inputsList := make([]*ucfg.Config, 0) + merger := cfgutil.NewCollector(nil) + for _, f := range files { + cfg, err := LoadFile(f) + if err != nil { + if l.isFileUnderInputsFolder(f) { + return nil, fmt.Errorf("failed to load external configuration file '%s': %w. Are you sure it contains an inputs section?", f, err) + } + return nil, fmt.Errorf("failed to load configuration file '%s': %w", f, err) + } + l.logger.Debugf("Loaded configuration from %s", f) + if l.isFileUnderInputsFolder(f) { + inp, err := getInput(cfg) + if err != nil { + return nil, fmt.Errorf("cannot get configuration from '%s': %w", f, err) + } + inputsList = append(inputsList, inp...) + l.logger.Debugf("Loaded %s input(s) from configuration from %s", len(inp), f) + } else { + if err := merger.Add(cfg.access(), err); err != nil { + return nil, fmt.Errorf("failed to merge configuration file '%s' to existing one: %w", f, err) + } + l.logger.Debugf("Merged configuration from %s into result", f) + } + } + config := merger.Config() + + // if there is no input configuration, return what we have collected. + if len(inputsList) == 0 { + l.logger.Debugf("Merged all configuration files from %v, no external input files", files) + return newConfigFrom(config), nil + } + + // merge inputs sections from the last standalone configuration + // file and all files from the inputs folder + start := 0 + if config.HasField("inputs") { + var err error + start, err = config.CountField("inputs") + if err != nil { + return nil, fmt.Errorf("failed to count the number of inputs in the configuration: %w", err) + } + } + for i, ll := range inputsList { + if err := config.SetChild("inputs", start+i, ll); err != nil { + return nil, fmt.Errorf("failed to add inputs to result configuration: %w", err) + } + } + + l.logger.Debugf("Merged all configuration files from %v, with external input files", files) + return newConfigFrom(config), nil +} + +func getInput(c *Config) ([]*ucfg.Config, error) { + tmpConfig := struct { + Inputs []*ucfg.Config `config:"inputs"` + }{make([]*ucfg.Config, 0)} + + if err := c.Unpack(&tmpConfig); err != nil { + return nil, fmt.Errorf("failed to parse inputs section from configuration: %w", err) + } + return tmpConfig.Inputs, nil +} + +func (l *Loader) isFileUnderInputsFolder(f string) bool { + if matches, err := filepath.Match(l.inputsFolder, f); !matches || err != nil { + return false + } + return true +} diff --git a/x-pack/elastic-agent/pkg/config/loader_test.go b/x-pack/elastic-agent/pkg/config/loader_test.go new file mode 100644 index 00000000000..d1766622663 --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/loader_test.go @@ -0,0 +1,212 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package config + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +func TestExternalConfigLoading(t *testing.T) { + cases := map[string]struct { + configs []string + inputsFolder string + expectedConfig map[string]interface{} + err bool + }{ + "non-existent config files lead to error": { + configs: []string{"no-such-configuration-file.yml"}, + err: true, + }, + "invalid configuration file in inputs folder lead to error": { + configs: []string{ + filepath.Join("testdata", "inputs", "invalid-inputs.yml"), + }, + inputsFolder: filepath.Join("testdata", "inputs", "*.yml"), + err: true, + }, + "two standalone configs can be merged without inputs": { + configs: []string{ + filepath.Join("testdata", "standalone1.yml"), + filepath.Join("testdata", "standalone2.yml"), + }, + inputsFolder: "", + expectedConfig: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"127.0.0.1:9201"}, + "api-key": "my-secret-key", + }, + }, + "agent": map[string]interface{}{ + "logging": map[string]interface{}{ + "level": "debug", + "metrics": map[string]interface{}{ + "enabled": false, + }, + }, + }, + }, + }, + "one external config, standalone config without inputs section": { + configs: []string{ + filepath.Join("testdata", "standalone1.yml"), + filepath.Join("testdata", "inputs", "log-inputs.yml"), + filepath.Join("testdata", "inputs", "metrics-inputs.yml"), + }, + inputsFolder: filepath.Join("testdata", "inputs", "*.yml"), + expectedConfig: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"127.0.0.1:9201"}, + "api-key": "my-secret-key", + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.auth", + "type": "logs", + }, + "exclude_files": []interface{}{".gz$"}, + "id": "logfile-system.auth-my-id", + "paths": []interface{}{"/var/log/auth.log*", "/var/log/secure*"}, + "use_output": "default", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.syslog", + "type": "logs", + }, + "type": "logfile", + "id": "logfile-system.syslog-my-id", + "exclude_files": []interface{}{".gz$"}, + "paths": []interface{}{"/var/log/messages*", "/var/log/syslog*"}, + "use_output": "default", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.diskio", + "type": "metrics", + }, + "id": "system/metrics-system.diskio-my-id", + "metricsets": []interface{}{"diskio"}, + "period": "10s", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.filesystem", + "type": "metrics", + }, + "id": "system/metrics-system.filesystem-my-id", + "metricsets": []interface{}{"filesystem"}, + "period": "30s", + }, + }, + }, + }, + "inputs sections of all external and standalone configuration are merged to the result": { + configs: []string{ + filepath.Join("testdata", "standalone-with-inputs.yml"), + filepath.Join("testdata", "inputs", "log-inputs.yml"), + filepath.Join("testdata", "inputs", "metrics-inputs.yml"), + }, + inputsFolder: filepath.Join("testdata", "inputs", "*.yml"), + expectedConfig: map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"127.0.0.1:9201"}, + "api-key": "my-secret-key", + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "system/metrics", + "data_stream.namespace": "default", + "use_output": "default", + "streams": []interface{}{ + map[string]interface{}{ + "metricset": "cpu", + "data_stream.dataset": "system.cpu", + }, + }, + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.auth", + "type": "logs", + }, + "exclude_files": []interface{}{".gz$"}, + "id": "logfile-system.auth-my-id", + "paths": []interface{}{"/var/log/auth.log*", "/var/log/secure*"}, + "use_output": "default", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.syslog", + "type": "logs", + }, + "type": "logfile", + "id": "logfile-system.syslog-my-id", + "exclude_files": []interface{}{".gz$"}, + "paths": []interface{}{"/var/log/messages*", "/var/log/syslog*"}, + "use_output": "default", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.diskio", + "type": "metrics", + }, + "id": "system/metrics-system.diskio-my-id", + "metricsets": []interface{}{"diskio"}, + "period": "10s", + }, + map[string]interface{}{ + "data_stream": map[string]interface{}{ + "dataset": "system.filesystem", + "type": "metrics", + }, + "id": "system/metrics-system.filesystem-my-id", + "metricsets": []interface{}{"filesystem"}, + "period": "30s", + }, + }, + }, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + test := test + + l := mustNewLoader(test.inputsFolder) + c, err := l.Load(test.configs) + if test.err { + require.NotNil(t, err) + return + } + + require.Nil(t, err) + raw, err := c.ToMapStr() + require.Nil(t, err) + require.Equal(t, test.expectedConfig, raw) + }) + } +} + +func mustNewLoader(inputsFolder string) *Loader { + log, err := logger.New("loader_test", true) + if err != nil { + panic(err) + } + return NewLoader(log, inputsFolder) +} diff --git a/x-pack/elastic-agent/pkg/config/testdata/inputs/invalid-inputs.yml b/x-pack/elastic-agent/pkg/config/testdata/inputs/invalid-inputs.yml new file mode 100644 index 00000000000..b6da68a339e --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/inputs/invalid-inputs.yml @@ -0,0 +1,5 @@ +# this file is invalid because the inputs section is missing +- data_stream: + dataset: system.auth + type: logs + diff --git a/x-pack/elastic-agent/pkg/config/testdata/inputs/log-inputs.yml b/x-pack/elastic-agent/pkg/config/testdata/inputs/log-inputs.yml new file mode 100644 index 00000000000..cead269b0a3 --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/inputs/log-inputs.yml @@ -0,0 +1,20 @@ +inputs: +- data_stream: + dataset: system.auth + type: logs + exclude_files: [".gz$"] + id: logfile-system.auth-my-id + paths: + - /var/log/auth.log* + - /var/log/secure* + use_output: default +- data_stream: + dataset: system.syslog + type: logs + type: logfile + id: logfile-system.syslog-my-id + exclude_files: [".gz$"] + paths: + - /var/log/messages* + - /var/log/syslog* + use_output: default diff --git a/x-pack/elastic-agent/pkg/config/testdata/inputs/metrics-inputs.yml b/x-pack/elastic-agent/pkg/config/testdata/inputs/metrics-inputs.yml new file mode 100644 index 00000000000..b44faf5097d --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/inputs/metrics-inputs.yml @@ -0,0 +1,16 @@ +inputs: +- data_stream: + dataset: system.diskio + type: metrics + id: system/metrics-system.diskio-my-id + metricsets: + - diskio + period: 10s +- data_stream: + dataset: system.filesystem + type: metrics + id: system/metrics-system.filesystem-my-id + metricsets: + - filesystem + period: 30s + diff --git a/x-pack/elastic-agent/pkg/config/testdata/standalone-with-inputs.yml b/x-pack/elastic-agent/pkg/config/testdata/standalone-with-inputs.yml new file mode 100644 index 00000000000..1be5ac9daec --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/standalone-with-inputs.yml @@ -0,0 +1,13 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9201] + api-key: "my-secret-key" + +inputs: + - type: system/metrics + data_stream.namespace: default + use_output: default + streams: + - metricset: cpu + data_stream.dataset: system.cpu diff --git a/x-pack/elastic-agent/pkg/config/testdata/standalone1.yml b/x-pack/elastic-agent/pkg/config/testdata/standalone1.yml new file mode 100644 index 00000000000..f0ddddf1a2a --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/standalone1.yml @@ -0,0 +1,6 @@ +outputs: + default: + type: elasticsearch + hosts: [127.0.0.1:9201] + api-key: "my-secret-key" + diff --git a/x-pack/elastic-agent/pkg/config/testdata/standalone2.yml b/x-pack/elastic-agent/pkg/config/testdata/standalone2.yml new file mode 100644 index 00000000000..7183b32b79f --- /dev/null +++ b/x-pack/elastic-agent/pkg/config/testdata/standalone2.yml @@ -0,0 +1,3 @@ +agent.logging.level: debug +agent.logging.metrics.enabled: false +