diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index 38b0e57ef2b..7c1d714e7c4 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -38,6 +38,7 @@ - Fix issue of missing log messages from filebeat monitor {pull}23514[23514] - Increase checkin grace period to 30 seconds {pull}23568[23568] - Fix libbeat from reporting back degraded on config update {pull}23537[23537] +- Fix issues with dynamic inputs and conditions {pull}23886[23886] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/application.go b/x-pack/elastic-agent/pkg/agent/application/application.go index f87cad3a09c..af50477d5fd 100644 --- a/x-pack/elastic-agent/pkg/agent/application/application.go +++ b/x-pack/elastic-agent/pkg/agent/application/application.go @@ -35,7 +35,7 @@ func New(log *logger.Logger, pathConfigFile string, reexec reexecManager, uc upg // Load configuration from disk to understand in which mode of operation // we must start the elastic-agent, the mode of operation cannot be changed without restarting the // elastic-agent. - rawConfig, err := LoadConfigFromFile(pathConfigFile) + rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/agent/application/config.go b/x-pack/elastic-agent/pkg/agent/application/config.go index e42f3dcab28..8dfd093e040 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config.go +++ b/x-pack/elastic-agent/pkg/agent/application/config.go @@ -5,16 +5,8 @@ package application import ( - "io/ioutil" - - "github.com/elastic/go-ucfg" - - "gopkg.in/yaml.v2" - - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana" ) @@ -34,53 +26,3 @@ func createFleetConfigFromEnroll(accessAPIKey string, kbn *kibana.Config) (*conf } return cfg, nil } - -// LoadConfigFromFile loads the Agent configuration from a file. -// -// This must be used to load the Agent configuration, so that variables defined in the inputs are not -// parsed by go-ucfg. Variables from the inputs should be parsed by the transpiler. -func LoadConfigFromFile(path string) (*config.Config, error) { - in, err := ioutil.ReadFile(path) - if err != nil { - return nil, err - } - var m map[string]interface{} - if err := yaml.Unmarshal(in, &m); err != nil { - return nil, err - } - return LoadConfig(m) -} - -// LoadConfig loads the Agent configuration from a map. -// -// This must be used to load the Agent configuration, so that variables defined in the inputs are not -// parsed by go-ucfg. Variables from the inputs should be parsed by the transpiler. -func LoadConfig(in map[string]interface{}) (*config.Config, error) { - // make copy of a map so we dont affect a caller - m := common.MapStr(in).Clone() - - inputs, ok := m["inputs"] - if ok { - // remove the inputs - delete(m, "inputs") - } - cfg, err := config.NewConfigFrom(m) - if err != nil { - return nil, err - } - if ok { - inputsOnly := map[string]interface{}{ - "inputs": inputs, - } - // convert to config without variable substitution - inputsCfg, err := config.NewConfigFrom(inputsOnly, ucfg.PathSep("."), ucfg.ResolveNOOP) - if err != nil { - return nil, err - } - err = cfg.Merge(inputsCfg, ucfg.PathSep("."), ucfg.ResolveNOOP) - if err != nil { - return nil, err - } - } - return cfg, err -} diff --git a/x-pack/elastic-agent/pkg/agent/application/config_test.go b/x-pack/elastic-agent/pkg/agent/application/config_test.go index 09acd68dd83..824691295fe 100644 --- a/x-pack/elastic-agent/pkg/agent/application/config_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/config_test.go @@ -6,8 +6,6 @@ package application import ( "io/ioutil" - "os" - "path/filepath" "testing" "time" @@ -20,44 +18,6 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" ) -func TestLoadConfig(t *testing.T) { - contents := map[string]interface{}{ - "outputs": map[string]interface{}{ - "default": map[string]interface{}{ - "type": "elasticsearch", - "hosts": []interface{}{"127.0.0.1:9200"}, - "username": "elastic", - "password": "changeme", - }, - }, - "inputs": []interface{}{ - map[string]interface{}{ - "type": "logfile", - "streams": []interface{}{ - map[string]interface{}{ - "paths": []interface{}{"/var/log/${host.name}"}, - }, - }, - }, - }, - } - - tmp, err := ioutil.TempDir("", "config") - require.NoError(t, err) - defer os.RemoveAll(tmp) - - cfgPath := filepath.Join(tmp, "config.yml") - dumpToYAML(t, cfgPath, contents) - - cfg, err := LoadConfigFromFile(cfgPath) - require.NoError(t, err) - - cfgData, err := cfg.ToMapStr() - require.NoError(t, err) - - assert.Equal(t, contents, cfgData) -} - func TestConfig(t *testing.T) { testMgmtMode(t) testLocalConfig(t) diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go index 30a163a7b5e..b921569a19b 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go @@ -44,7 +44,7 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA return fmt.Errorf("invalid type, expected ActionPolicyChange and received %T", a) } - c, err := LoadConfig(action.Policy) + c, err := config.NewConfigFrom(action.Policy) if err != nil { return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) } diff --git a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go index 4ea725898f2..15dccd83a88 100644 --- a/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go +++ b/x-pack/elastic-agent/pkg/agent/application/inspect_config_cmd.go @@ -61,7 +61,7 @@ func (c *InspectConfigCmd) inspectConfig() error { } func loadConfig(configPath string) (*config.Config, error) { - rawConfig, err := LoadConfigFromFile(configPath) + rawConfig, err := config.LoadFile(configPath) if err != nil { return nil, err } diff --git a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go index 2ae011db291..282ea2f4645 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/enroll.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/enroll.go @@ -12,6 +12,7 @@ import ( "time" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/control/client" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/spf13/cobra" @@ -92,7 +93,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command, flags *globalFlags, args } pathConfigFile := flags.Config() - rawConfig, err := application.LoadConfigFromFile(pathConfigFile) + rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { return errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), diff --git a/x-pack/elastic-agent/pkg/agent/cmd/run.go b/x-pack/elastic-agent/pkg/agent/cmd/run.go index eb128585539..ecfac4ff740 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/run.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/run.go @@ -79,7 +79,7 @@ func run(flags *globalFlags, streams *cli.IOStreams) error { // Windows: Mark se service.HandleSignals(stopBeat, cancel) pathConfigFile := flags.Config() - rawConfig, err := application.LoadConfigFromFile(pathConfigFile) + rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { return errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), diff --git a/x-pack/elastic-agent/pkg/agent/cmd/watch.go b/x-pack/elastic-agent/pkg/agent/cmd/watch.go index 01d73022f99..1124a950893 100644 --- a/x-pack/elastic-agent/pkg/agent/cmd/watch.go +++ b/x-pack/elastic-agent/pkg/agent/cmd/watch.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/cli" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/release" ) @@ -182,7 +183,7 @@ func gracePeriod(marker *upgrade.UpdateMarker) (bool, time.Duration) { func configuredLogger(flags *globalFlags) (*logger.Logger, error) { pathConfigFile := flags.Config() - rawConfig, err := application.LoadConfigFromFile(pathConfigFile) + rawConfig, err := config.LoadFile(pathConfigFile) if err != nil { return nil, errors.New(err, fmt.Sprintf("could not read configuration file %s", pathConfigFile), diff --git a/x-pack/elastic-agent/pkg/config/config.go b/x-pack/elastic-agent/pkg/config/config.go index d2d366e526c..593631f0050 100644 --- a/x-pack/elastic-agent/pkg/config/config.go +++ b/x-pack/elastic-agent/pkg/config/config.go @@ -8,65 +8,116 @@ import ( "fmt" "io" "io/ioutil" + "os" + + "gopkg.in/yaml.v2" "github.com/elastic/go-ucfg" "github.com/elastic/go-ucfg/cfgutil" - "github.com/elastic/go-ucfg/yaml" ) +// options hold the specified options +type options struct { + skipKeys []string +} + +// Option is an option type that modifies how loading configs work +type Option func(*options) + +// VarSkipKeys prevents variable expansion for these keys. +// +// The provided keys only skip if the keys are top-level keys. +func VarSkipKeys(keys ...string) Option { + return func(opts *options) { + opts.skipKeys = keys + } +} + // DefaultOptions defaults options used to read the configuration -var DefaultOptions = []ucfg.Option{ +var DefaultOptions = []interface{}{ ucfg.PathSep("."), ucfg.ResolveEnv, ucfg.VarExp, + VarSkipKeys("inputs"), } // Config custom type over a ucfg.Config to add new methods on the object. type Config ucfg.Config -// LoadYAML takes YAML configuration and return a concrete Config or any errors. -func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) { - if len(opts) == 0 { - opts = DefaultOptions - } - config, err := yaml.NewConfigWithFile(path, opts...) - if err != nil { - return nil, err - } - return newConfigFrom(config), nil -} - // New creates a new empty config. func New() *Config { return newConfigFrom(ucfg.New()) } // NewConfigFrom takes a interface and read the configuration like it was YAML. -func NewConfigFrom(from interface{}, opts ...ucfg.Option) (*Config, error) { +func NewConfigFrom(from interface{}, opts ...interface{}) (*Config, error) { if len(opts) == 0 { opts = DefaultOptions } - - if str, ok := from.(string); ok { - c, err := yaml.NewConfig([]byte(str), opts...) - return newConfigFrom(c), err + var ucfgOpts []ucfg.Option + var localOpts []Option + for _, o := range opts { + switch ot := o.(type) { + case ucfg.Option: + ucfgOpts = append(ucfgOpts, ot) + case Option: + localOpts = append(localOpts, ot) + default: + return nil, fmt.Errorf("unknown option type %T", o) + } + } + local := &options{} + for _, o := range localOpts { + o(local) } - if in, ok := from.(io.Reader); ok { + var data map[string]interface{} + var err error + if bytes, ok := from.([]byte); ok { + err = yaml.Unmarshal(bytes, &data) + if err != nil { + return nil, err + } + } else if str, ok := from.(string); ok { + err = yaml.Unmarshal([]byte(str), &data) + if err != nil { + return nil, err + } + } else if in, ok := from.(io.Reader); ok { if closer, ok := from.(io.Closer); ok { defer closer.Close() } - - content, err := ioutil.ReadAll(in) + fData, err := ioutil.ReadAll(in) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(fData, &data) if err != nil { return nil, err } - c, err := yaml.NewConfig(content, opts...) + } else if contents, ok := from.(map[string]interface{}); ok { + data = contents + } else { + c, err := ucfg.NewFrom(from, ucfgOpts...) return newConfigFrom(c), err } - c, err := ucfg.NewFrom(from, opts...) - return newConfigFrom(c), err + skippedKeys := map[string]interface{}{} + for _, skip := range local.skipKeys { + val, ok := data[skip] + if ok { + skippedKeys[skip] = val + delete(data, skip) + } + } + cfg, err := ucfg.NewFrom(data, ucfgOpts...) + if err != nil { + return nil, err + } + if len(skippedKeys) > 0 { + err = cfg.Merge(skippedKeys, ucfg.ResolveNOOP) + } + return newConfigFrom(cfg), err } // MustNewConfigFrom try to create a configuration based on the type passed as arguments and panic @@ -84,8 +135,12 @@ func newConfigFrom(in *ucfg.Config) *Config { } // Unpack unpacks a struct to Config. -func (c *Config) Unpack(to interface{}) error { - return c.access().Unpack(to, DefaultOptions...) +func (c *Config) Unpack(to interface{}, opts ...interface{}) error { + ucfgOpts, err := getUcfgOptions(opts...) + if err != nil { + return err + } + return c.access().Unpack(to, ucfgOpts...) } func (c *Config) access() *ucfg.Config { @@ -93,11 +148,12 @@ func (c *Config) access() *ucfg.Config { } // Merge merges two configuration together. -func (c *Config) Merge(from interface{}, opts ...ucfg.Option) error { - if len(opts) == 0 { - opts = DefaultOptions +func (c *Config) Merge(from interface{}, opts ...interface{}) error { + ucfgOpts, err := getUcfgOptions(opts...) + if err != nil { + return err } - return c.access().Merge(from, opts...) + return c.access().Merge(from, ucfgOpts...) } // ToMapStr takes the config and transform it into a map[string]interface{} @@ -127,18 +183,16 @@ func (c *Config) Enabled() bool { // LoadFile take a path and load the file and return a new configuration. func LoadFile(path string) (*Config, error) { - c, err := yaml.NewConfigWithFile(path, DefaultOptions...) + fp, err := os.Open(path) if err != nil { return nil, err } - - cfg := newConfigFrom(c) - return cfg, err + return NewConfigFrom(fp) } // LoadFiles takes multiples files, load and merge all of them in a single one. func LoadFiles(paths ...string) (*Config, error) { - merger := cfgutil.NewCollector(nil, DefaultOptions...) + merger := cfgutil.NewCollector(nil) for _, path := range paths { cfg, err := LoadFile(path) if err := merger.Add(cfg.access(), err); err != nil { @@ -147,3 +201,22 @@ func LoadFiles(paths ...string) (*Config, error) { } return newConfigFrom(merger.Config()), nil } + +func getUcfgOptions(opts ...interface{}) ([]ucfg.Option, error) { + if len(opts) == 0 { + opts = DefaultOptions + } + var ucfgOpts []ucfg.Option + for _, o := range opts { + switch ot := o.(type) { + case ucfg.Option: + ucfgOpts = append(ucfgOpts, ot) + case Option: + // ignored during unpack + continue + default: + return nil, fmt.Errorf("unknown option type %T", o) + } + } + return ucfgOpts, nil +} diff --git a/x-pack/elastic-agent/pkg/config/config_test.go b/x-pack/elastic-agent/pkg/config/config_test.go index f8b32e97d8e..105663dacce 100644 --- a/x-pack/elastic-agent/pkg/config/config_test.go +++ b/x-pack/elastic-agent/pkg/config/config_test.go @@ -21,6 +21,44 @@ func TestConfig(t *testing.T) { testLoadFiles(t) } +func TestInputsResolveNOOP(t *testing.T) { + contents := map[string]interface{}{ + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"127.0.0.1:9200"}, + "username": "elastic", + "password": "changeme", + }, + }, + "inputs": []interface{}{ + map[string]interface{}{ + "type": "logfile", + "streams": []interface{}{ + map[string]interface{}{ + "paths": []interface{}{"/var/log/${host.name}"}, + }, + }, + }, + }, + } + + tmp, err := ioutil.TempDir("", "config") + require.NoError(t, err) + defer os.RemoveAll(tmp) + + cfgPath := filepath.Join(tmp, "config.yml") + dumpToYAML(t, cfgPath, contents) + + cfg, err := LoadFile(cfgPath) + require.NoError(t, err) + + cfgData, err := cfg.ToMapStr() + require.NoError(t, err) + + assert.Equal(t, contents, cfgData) +} + func testToMapStr(t *testing.T) { m := map[string]interface{}{ "hello": map[string]interface{}{ diff --git a/x-pack/elastic-agent/pkg/eql/eql_test.go b/x-pack/elastic-agent/pkg/eql/eql_test.go index 5f67c516c81..c594a6c2a20 100644 --- a/x-pack/elastic-agent/pkg/eql/eql_test.go +++ b/x-pack/elastic-agent/pkg/eql/eql_test.go @@ -296,7 +296,7 @@ func TestEql(t *testing.T) { {expression: "stringContains('hello world', 'rol')", result: false}, {expression: "stringContains('hello world', 'o w', 'too many')", err: true}, {expression: "stringContains(0, 'o w', 'too many')", err: true}, - {expression: "stringContains('hello world', 0)", err: true}, + {expression: "stringContains('hello world', 0)", result: false}, // Bad expression and malformed expression {expression: "length('hello')", err: true}, diff --git a/x-pack/elastic-agent/pkg/eql/methods_str.go b/x-pack/elastic-agent/pkg/eql/methods_str.go index 781e193d924..b7c49a61036 100644 --- a/x-pack/elastic-agent/pkg/eql/methods_str.go +++ b/x-pack/elastic-agent/pkg/eql/methods_str.go @@ -25,12 +25,7 @@ func endsWith(args []interface{}) (interface{}, error) { if len(args) != 2 { return nil, fmt.Errorf("endsWith: accepts exactly 2 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - suffix, sOk := args[1].(string) - if !iOk || !sOk { - return nil, fmt.Errorf("endsWith: accepts exactly 2 string arguments; recieved %T and %T", args[0], args[1]) - } - return strings.HasSuffix(input, suffix), nil + return strings.HasSuffix(toString(args[0]), toString(args[1])), nil } // indexOf returns the starting index of substring @@ -38,11 +33,8 @@ func indexOf(args []interface{}) (interface{}, error) { if len(args) < 2 || len(args) > 3 { return nil, fmt.Errorf("indexOf: accepts 2-3 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - substring, sOk := args[1].(string) - if !iOk || !sOk { - return nil, fmt.Errorf("indexOf: argument 0 and 1 must be a string; recieved %T and %T", args[0], args[1]) - } + input := toString(args[0]) + substring := toString(args[1]) start := 0 if len(args) > 2 { s, sOk := args[2].(int) @@ -59,10 +51,7 @@ func match(args []interface{}) (interface{}, error) { if len(args) < 2 { return nil, fmt.Errorf("match: accepts minimum of 2 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - if !iOk { - return nil, fmt.Errorf("match: argument 0 must be a string; recieved %T", args[0]) - } + input := toString(args[0]) for i, reg := range args[1:] { switch r := reg.(type) { case string: @@ -85,10 +74,7 @@ func number(args []interface{}) (interface{}, error) { if len(args) < 1 || len(args) > 2 { return nil, fmt.Errorf("number: accepts between 1-2 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - if !iOk { - return nil, fmt.Errorf("number: argument 0 must be a string; recieved %T", args[0]) - } + input := toString(args[0]) base := 10 if len(args) > 1 { switch a := args[1].(type) { @@ -113,12 +99,7 @@ func startsWith(args []interface{}) (interface{}, error) { if len(args) != 2 { return nil, fmt.Errorf("startsWith: accepts exactly 2 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - prefix, pOk := args[1].(string) - if !iOk || !pOk { - return nil, fmt.Errorf("startsWith: accepts exactly 2 string arguments; recieved %T and %T", args[0], args[1]) - } - return strings.HasPrefix(input, prefix), nil + return strings.HasPrefix(toString(args[0]), toString(args[1])), nil } // str converts the argument into a string @@ -134,12 +115,7 @@ func stringContains(args []interface{}) (interface{}, error) { if len(args) != 2 { return nil, fmt.Errorf("stringContains: accepts exactly 2 arguments; recieved %d", len(args)) } - input, iOk := args[0].(string) - substr, sOk := args[1].(string) - if !iOk || !sOk { - return nil, fmt.Errorf("stringContains: accepts exactly 2 string arguments; recieved %T and %T", args[0], args[1]) - } - return strings.Contains(input, substr), nil + return strings.Contains(toString(args[0]), toString(args[1])), nil } func toString(arg interface{}) string {