diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 48109ba3c0d..6047c71b3d7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -201,3 +201,4 @@ - Redact sensitive information on diagnostics collect command. {issue}[241] {pull}[566] - Fix incorrectly creating a filebeat redis input when a policy contains a packetbeat redis input. {issue}[427] {pull}[700] - Add `lumberjack` input type to the Filebeat spec. {pull}[959] +- Add support for hints' based autodiscovery in kubernetes provider. {pull}[698] diff --git a/NOTICE.txt b/NOTICE.txt index 7b0395cd0e0..79a332f3d45 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -617,11 +617,11 @@ you may not use this file except in compliance with the Elastic License. -------------------------------------------------------------------------------- Dependency : github.com/elastic/elastic-agent-autodiscover -Version: v0.0.0-20220404145827-89887023c1ab +Version: v0.2.1 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.0.0-20220404145827-89887023c1ab/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-autodiscover@v0.2.1/LICENSE: Apache License Version 2.0, January 2004 diff --git a/go.mod b/go.mod index 9f0dfb21464..2557e2109d4 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/coreos/go-systemd/v22 v22.3.2 github.com/docker/go-units v0.4.0 github.com/elastic/e2e-testing v1.99.2-0.20220117192005-d3365c99b9c4 - github.com/elastic/elastic-agent-autodiscover v0.0.0-20220404145827-89887023c1ab + github.com/elastic/elastic-agent-autodiscover v0.2.1 github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 github.com/elastic/elastic-agent-libs v0.2.6 github.com/elastic/elastic-agent-system-metrics v0.3.0 @@ -119,7 +119,6 @@ require ( go.elastic.co/apm/v2 v2.0.0 // indirect go.elastic.co/fastjson v1.1.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/goleak v1.1.12 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/mod v0.5.1 // indirect golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect diff --git a/go.sum b/go.sum index 22059e4c0b3..0728fa89909 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,7 @@ github.com/Microsoft/hcsshim v0.8.14/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2 github.com/Microsoft/hcsshim v0.8.15/go.mod h1:x38A4YbHbdxJtc0sF6oIz+RG0npwSCAvn69iY6URG00= github.com/Microsoft/hcsshim v0.8.16/go.mod h1:o5/SZqmR7x9JNKsW3pu+nqHm0MF8vbA+VxGOoXdC600= github.com/Microsoft/hcsshim v0.8.21/go.mod h1:+w2gRZ5ReXQhFOrvSQeNfhrYB/dg3oDwTOcER2fw4I4= +github.com/Microsoft/hcsshim v0.8.24/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01nnU2M8jKDg= github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5hlzMzRKMLyo42nCZ9oml8AdTlq/0cvIaBv6tK1RehU= github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -227,6 +228,7 @@ github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340/go.mod h1:s5q4S github.com/containerd/cgroups v0.0.0-20200824123100-0b889c03f102/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo= github.com/containerd/cgroups v0.0.0-20210114181951-8a68de567b68/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= +github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8= github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE= @@ -240,6 +242,7 @@ github.com/containerd/containerd v1.3.2/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMX github.com/containerd/containerd v1.4.0-beta.2.0.20200729163537-40b22ef07410/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/containerd/containerd v1.4.1/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/containerd/containerd v1.4.3/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= +github.com/containerd/containerd v1.4.9/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA= github.com/containerd/containerd v1.5.0-beta.1/go.mod h1:5HfvG1V2FsKesEGQ17k5/T7V960Tmcumvqn8Mc+pCYQ= github.com/containerd/containerd v1.5.0-beta.3/go.mod h1:/wr9AVtEM7x9c+n0+stptlo/uBBoBORwEx6ardVcmKU= github.com/containerd/containerd v1.5.0-beta.4/go.mod h1:GmdgZd2zA2GYIBZ0w09ZvgqEq8EfBp/m3lcVZIvPHhI= @@ -281,6 +284,7 @@ github.com/containerd/ttrpc v0.0.0-20190828172938-92c8520ef9f8/go.mod h1:PvCDdDG github.com/containerd/ttrpc v0.0.0-20191028202541-4f1b8fe65a5c/go.mod h1:LPm1u0xBw8r8NOKoOdNMeVHSawSsltak+Ihv+etqsE8= github.com/containerd/ttrpc v1.0.1/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= github.com/containerd/ttrpc v1.0.2/go.mod h1:UAxOpgT9ziI0gJrmKvgcZivgxOp8iFPSk8httJEt98Y= +github.com/containerd/ttrpc v1.1.0/go.mod h1:XX4ZTnoOId4HklF4edwc4DcqskFZuvXB1Evzy5KFQpQ= github.com/containerd/typeurl v0.0.0-20180627222232-a93fcdb778cd/go.mod h1:Cm3kwCdlkCfMSHURc+r6fwoGH6/F1hH3S4sg0rLFWPc= github.com/containerd/typeurl v0.0.0-20190911142611-5eb25027c9fd/go.mod h1:GeKYzf2pQcqv7tJ0AoCuuhtnqhva5LNU3U+OyKxxJpk= github.com/containerd/typeurl v1.0.1/go.mod h1:TB1hUtrpaiO88KEK56ijojHS1+NeF0izUACaJW2mdXg= @@ -375,11 +379,11 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/elastic/e2e-testing v1.99.2-0.20220117192005-d3365c99b9c4 h1:uYT+Krd8dsvnhnLK9pe/JHZkYtXEGPfbV4Wt1JPPol0= github.com/elastic/e2e-testing v1.99.2-0.20220117192005-d3365c99b9c4/go.mod h1:UcNuf4pX/qDVNQr0zybm1NL2YoWik+jKBaINZqQCA40= -github.com/elastic/elastic-agent-autodiscover v0.0.0-20220404145827-89887023c1ab h1:Jk6Mfk5BF8gtfE7X0bNCiDGBtwJVxRI79b4wLCAsP+A= -github.com/elastic/elastic-agent-autodiscover v0.0.0-20220404145827-89887023c1ab/go.mod h1:Gg1fsQI+rVms9FJ2DefBSojfPIzgkV8xlyG8fPG0DE8= +github.com/elastic/elastic-agent-autodiscover v0.2.1 h1:Nbeayh3vq2FNm6xaFo34mhUdOu0EVlpj53CqCsbU0E4= +github.com/elastic/elastic-agent-autodiscover v0.2.1/go.mod h1:gPnzzfdYNdgznAb+iG9eyyXaQXBbAMHa+Y6Z8hXfcGY= github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6 h1:nFvXHBjYK3e9+xF0WKDeAKK4aOO51uC28s+L9rBmilo= github.com/elastic/elastic-agent-client/v7 v7.0.0-20210727140539-f0905d9377f6/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc= -github.com/elastic/elastic-agent-libs v0.0.0-20220303160015-5b4e674da3dd/go.mod h1://82M1l73IHx0wDbS2Tzkq6Fx9fkmytS1KgkIyzvNTM= +github.com/elastic/elastic-agent-libs v0.2.5/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-libs v0.2.6 h1:DpcUcCVYZ7lNtHLUlyT1u/GtGAh49wpL15DTH7+8O5o= github.com/elastic/elastic-agent-libs v0.2.6/go.mod h1:chO3rtcLyGlKi9S0iGVZhYCzDfdDsAQYBc+ui588AFE= github.com/elastic/elastic-agent-system-metrics v0.3.0 h1:W8L0E8lWJmdguH+oIR7OzuFgopvw8ucZAE9w6iqVlpE= @@ -390,6 +394,7 @@ github.com/elastic/go-elasticsearch/v8 v8.0.0-20210317102009-a9d74cec0186/go.mod github.com/elastic/go-licenser v0.3.1/go.mod h1:D8eNQk70FOCVBl3smCGQt/lv7meBeQno2eI1S5apiHQ= github.com/elastic/go-licenser v0.4.0 h1:jLq6A5SilDS/Iz1ABRkO6BHy91B9jBora8FwGRsDqUI= github.com/elastic/go-licenser v0.4.0/go.mod h1:V56wHMpmdURfibNBggaSBfqgPxyT1Tldns1i87iTEvU= +github.com/elastic/go-structform v0.0.9/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elastic/go-structform v0.0.10 h1:oy08o/Ih2hHTkNcRY/1HhaYvIp5z6t8si8gnCJPDo1w= github.com/elastic/go-structform v0.0.10/go.mod h1:CZWf9aIRYY5SuKSmOhtXScE5uQiLZNqAFnwKR4OrIM4= github.com/elastic/go-sysinfo v1.1.1/go.mod h1:i1ZYdU10oLNfRzq4vq62BEwD2fH8KaWh6eh0ikPT9F0= @@ -946,7 +951,6 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.0/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= -github.com/opencontainers/image-spec v1.0.2-0.20190823105129-775207bd45b6/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= github.com/opencontainers/image-spec v1.0.2/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0= github.com/opencontainers/runc v0.0.0-20190115041553-12f6a991201f/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U= @@ -1397,6 +1401,7 @@ golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211020060615-d418f374d309/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= diff --git a/internal/pkg/agent/application/local_mode.go b/internal/pkg/agent/application/local_mode.go index aae202b114f..ecd988bc7a8 100644 --- a/internal/pkg/agent/application/local_mode.go +++ b/internal/pkg/agent/application/local_mode.go @@ -114,7 +114,7 @@ func newLocal( } localApplication.router = router - composableCtrl, err := composable.New(log, rawConfig) + composableCtrl, err := composable.New(log, rawConfig, false) if err != nil { return nil, errors.New(err, "failed to initialize composable controller") } diff --git a/internal/pkg/agent/application/managed_mode.go b/internal/pkg/agent/application/managed_mode.go index 037cf74ad5c..a9903733762 100644 --- a/internal/pkg/agent/application/managed_mode.go +++ b/internal/pkg/agent/application/managed_mode.go @@ -135,7 +135,7 @@ func newManaged( } managedApplication.router = router - composableCtrl, err := composable.New(log, rawConfig) + composableCtrl, err := composable.New(log, rawConfig, true) if err != nil { return nil, errors.New(err, "failed to initialize composable controller") } diff --git a/internal/pkg/agent/application/managed_mode_test.go b/internal/pkg/agent/application/managed_mode_test.go index 7f111eae322..ebba108b7ec 100644 --- a/internal/pkg/agent/application/managed_mode_test.go +++ b/internal/pkg/agent/application/managed_mode_test.go @@ -45,7 +45,7 @@ func TestManagedModeRouting(t *testing.T) { router, _ := router.New(log, streamFn) agentInfo, _ := info.NewAgentInfo(true) nullStore := &storage.NullStore{} - composableCtrl, _ := composable.New(log, nil) + composableCtrl, _ := composable.New(log, nil, true) emit, err := emitter.New(ctx, log, agentInfo, composableCtrl, router, &pipeline.ConfigModifiers{Decorators: []pipeline.DecoratorFunc{modifiers.InjectMonitoring}}, nil) require.NoError(t, err) diff --git a/internal/pkg/agent/cmd/inspect.go b/internal/pkg/agent/cmd/inspect.go index e6284d56487..2faf951cd16 100644 --- a/internal/pkg/agent/cmd/inspect.go +++ b/internal/pkg/agent/cmd/inspect.go @@ -258,7 +258,7 @@ func getProgramsFromConfig(log *logger.Logger, agentInfo *info.AgentInfo, cfg *c router := &inmemRouter{} ctx, cancel := context.WithCancel(context.Background()) defer cancel() - composableCtrl, err := composable.New(log, cfg) + composableCtrl, err := composable.New(log, cfg, false) if err != nil { return nil, err } diff --git a/internal/pkg/agent/install/uninstall.go b/internal/pkg/agent/install/uninstall.go index 598ddaeea8c..c2881302678 100644 --- a/internal/pkg/agent/install/uninstall.go +++ b/internal/pkg/agent/install/uninstall.go @@ -250,7 +250,7 @@ func applyDynamics(ctx context.Context, log *logger.Logger, cfg *config.Config) }) } - ctrl, err := composable.New(log, cfg) + ctrl, err := composable.New(log, cfg, false) if err != nil { return nil, err } diff --git a/internal/pkg/composable/context.go b/internal/pkg/composable/context.go index 1dcb50cf956..d0ad4179e87 100644 --- a/internal/pkg/composable/context.go +++ b/internal/pkg/composable/context.go @@ -14,8 +14,9 @@ import ( ) // ContextProviderBuilder creates a new context provider based on the given config and returns it. -type ContextProviderBuilder func(log *logger.Logger, config *config.Config) (corecomp.ContextProvider, error) +type ContextProviderBuilder func(log *logger.Logger, config *config.Config, managed bool) (corecomp.ContextProvider, error) +//nolint:dupl,goimports,nolintlint // false positive // AddContextProvider adds a new ContextProviderBuilder func (r *providerRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error { r.lock.Lock() @@ -24,11 +25,14 @@ func (r *providerRegistry) AddContextProvider(name string, builder ContextProvid if name == "" { return fmt.Errorf("provider name is required") } + if strings.ToLower(name) != name { return fmt.Errorf("provider name must be lowercase") } + _, contextExists := r.contextProviders[name] _, dynamicExists := r.dynamicProviders[name] + if contextExists || dynamicExists { return fmt.Errorf("provider '%s' is already registered", name) } diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index a14e111194f..d94b9cda7d7 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -40,7 +40,7 @@ type controller struct { } // New creates a new controller. -func New(log *logger.Logger, c *config.Config) (Controller, error) { +func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) { l := log.Named("composable") var providersCfg Config @@ -59,7 +59,7 @@ func New(log *logger.Logger, c *config.Config) (Controller, error) { // explicitly disabled; skipping continue } - provider, err := builder(l, pCfg) + provider, err := builder(l, pCfg, managed) if err != nil { return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name)) } @@ -76,7 +76,7 @@ func New(log *logger.Logger, c *config.Config) (Controller, error) { // explicitly disabled; skipping continue } - provider, err := builder(l.Named(strings.Join([]string{"providers", name}, ".")), pCfg) + provider, err := builder(l.Named(strings.Join([]string{"providers", name}, ".")), pCfg, managed) if err != nil { return nil, errors.New(err, fmt.Sprintf("failed to build provider '%s'", name), errors.TypeConfig, errors.M("provider", name)) } diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index 09780767928..2ba71f33243 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -77,7 +77,7 @@ func TestController(t *testing.T) { log, err := logger.New("", false) require.NoError(t, err) - c, err := composable.New(log, cfg) + c, err := composable.New(log, cfg, false) require.NoError(t, err) var wg sync.WaitGroup @@ -99,14 +99,14 @@ func TestController(t *testing.T) { _, envExists := setVars[0].Lookup("env") assert.False(t, envExists) local, _ := setVars[0].Lookup("local") - localMap := local.(map[string]interface{}) + localMap, _ := local.(map[string]interface{}) assert.Equal(t, "value1", localMap["key1"]) local, _ = setVars[1].Lookup("local_dynamic") - localMap = local.(map[string]interface{}) + localMap, _ = local.(map[string]interface{}) assert.Equal(t, "value1", localMap["key1"]) local, _ = setVars[2].Lookup("local_dynamic") - localMap = local.(map[string]interface{}) + localMap, _ = local.(map[string]interface{}) assert.Equal(t, "value2", localMap["key1"]) } diff --git a/internal/pkg/composable/dynamic.go b/internal/pkg/composable/dynamic.go index a0de3543a1c..b8e55249a4d 100644 --- a/internal/pkg/composable/dynamic.go +++ b/internal/pkg/composable/dynamic.go @@ -34,30 +34,31 @@ type DynamicProvider interface { } // DynamicProviderBuilder creates a new dynamic provider based on the given config and returns it. -type DynamicProviderBuilder func(log *logger.Logger, config *config.Config) (DynamicProvider, error) +type DynamicProviderBuilder func(log *logger.Logger, config *config.Config, managed bool) (DynamicProvider, error) +//nolint:dupl,goimports,nolintlint // false positive // AddDynamicProvider adds a new DynamicProviderBuilder -func (r *providerRegistry) AddDynamicProvider(name string, builder DynamicProviderBuilder) error { +func (r *providerRegistry) AddDynamicProvider(providerName string, builder DynamicProviderBuilder) error { r.lock.Lock() defer r.lock.Unlock() - if name == "" { - return fmt.Errorf("provider name is required") + if providerName == "" { + return fmt.Errorf("provider providerName is required") } - if strings.ToLower(name) != name { - return fmt.Errorf("provider name must be lowercase") + if strings.ToLower(providerName) != providerName { + return fmt.Errorf("provider providerName must be lowercase") } - _, contextExists := r.contextProviders[name] - _, dynamicExists := r.dynamicProviders[name] + _, contextExists := r.contextProviders[providerName] + _, dynamicExists := r.dynamicProviders[providerName] if contextExists || dynamicExists { - return fmt.Errorf("provider '%s' is already registered", name) + return fmt.Errorf("provider '%s' is already registered", providerName) } if builder == nil { - return fmt.Errorf("provider '%s' cannot be registered with a nil factory", name) + return fmt.Errorf("provider '%s' cannot be registered with a nil factory", providerName) } - r.dynamicProviders[name] = builder - r.logger.Debugf("Registered provider: %s", name) + r.dynamicProviders[providerName] = builder + r.logger.Debugf("Registered provider: %s", providerName) return nil } diff --git a/internal/pkg/composable/providers/agent/agent.go b/internal/pkg/composable/providers/agent/agent.go index 2b9d0ff3deb..5578dd84d28 100644 --- a/internal/pkg/composable/providers/agent/agent.go +++ b/internal/pkg/composable/providers/agent/agent.go @@ -15,7 +15,7 @@ import ( ) func init() { - composable.Providers.AddContextProvider("agent", ContextProviderBuilder) + _ = composable.Providers.AddContextProvider("agent", ContextProviderBuilder) } type contextProvider struct{} @@ -42,6 +42,6 @@ func (*contextProvider) Run(comm corecomp.ContextProviderComm) error { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(_ *logger.Logger, _ *config.Config, managed bool) (corecomp.ContextProvider, error) { return &contextProvider{}, nil } diff --git a/internal/pkg/composable/providers/agent/agent_test.go b/internal/pkg/composable/providers/agent/agent_test.go index f3c6904b05c..cd15e8058ea 100644 --- a/internal/pkg/composable/providers/agent/agent_test.go +++ b/internal/pkg/composable/providers/agent/agent_test.go @@ -20,7 +20,7 @@ func TestContextProvider(t *testing.T) { testutils.InitStorage(t) builder, _ := composable.Providers.GetContextProvider("agent") - provider, err := builder(nil, nil) + provider, err := builder(nil, nil, true) require.NoError(t, err) comm := ctesting.NewContextComm(context.Background()) diff --git a/internal/pkg/composable/providers/docker/docker.go b/internal/pkg/composable/providers/docker/docker.go index 4bdc6d11cfe..b832cbb6c92 100644 --- a/internal/pkg/composable/providers/docker/docker.go +++ b/internal/pkg/composable/providers/docker/docker.go @@ -105,7 +105,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { } // DynamicProviderBuilder builds the dynamic provider. -func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { +func DynamicProviderBuilder(logger *logger.Logger, c *config.Config, managed bool) (composable.DynamicProvider, error) { var cfg Config if c == nil { c = config.New() diff --git a/internal/pkg/composable/providers/env/env.go b/internal/pkg/composable/providers/env/env.go index 4c6b5911f47..b7b521c85d1 100644 --- a/internal/pkg/composable/providers/env/env.go +++ b/internal/pkg/composable/providers/env/env.go @@ -16,7 +16,7 @@ import ( ) func init() { - composable.Providers.AddContextProvider("env", ContextProviderBuilder) + _ = composable.Providers.AddContextProvider("env", ContextProviderBuilder) } type contextProvider struct{} @@ -31,7 +31,7 @@ func (*contextProvider) Run(comm corecomp.ContextProviderComm) error { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(_ *logger.Logger, _ *config.Config, managed bool) (corecomp.ContextProvider, error) { return &contextProvider{}, nil } diff --git a/internal/pkg/composable/providers/env/env_test.go b/internal/pkg/composable/providers/env/env_test.go index f41f6200697..a03f37ee577 100644 --- a/internal/pkg/composable/providers/env/env_test.go +++ b/internal/pkg/composable/providers/env/env_test.go @@ -17,7 +17,7 @@ import ( func TestContextProvider(t *testing.T) { builder, _ := composable.Providers.GetContextProvider("env") - provider, err := builder(nil, nil) + provider, err := builder(nil, nil, true) require.NoError(t, err) comm := ctesting.NewContextComm(context.Background()) diff --git a/internal/pkg/composable/providers/host/host.go b/internal/pkg/composable/providers/host/host.go index 25d53430a2f..41498de79cc 100644 --- a/internal/pkg/composable/providers/host/host.go +++ b/internal/pkg/composable/providers/host/host.go @@ -24,7 +24,7 @@ import ( const DefaultCheckInterval = 5 * time.Minute func init() { - composable.Providers.AddContextProvider("host", ContextProviderBuilder) + _ = composable.Providers.AddContextProvider("host", ContextProviderBuilder) } type infoFetcher func() (map[string]interface{}, error) @@ -81,7 +81,7 @@ func (c *contextProvider) Run(comm corecomp.ContextProviderComm) error { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(log *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(log *logger.Logger, c *config.Config, managed bool) (corecomp.ContextProvider, error) { p := &contextProvider{ logger: log, fetcher: getHostInfo, diff --git a/internal/pkg/composable/providers/host/host_test.go b/internal/pkg/composable/providers/host/host_test.go index 8e117fcbeb4..30b9619bfc6 100644 --- a/internal/pkg/composable/providers/host/host_test.go +++ b/internal/pkg/composable/providers/host/host_test.go @@ -33,10 +33,10 @@ func TestContextProvider(t *testing.T) { builder, _ := composable.Providers.GetContextProvider("host") log, err := logger.New("host_test", false) require.NoError(t, err) - provider, err := builder(log, c) + provider, err := builder(log, c, true) require.NoError(t, err) - hostProvider := provider.(*contextProvider) + hostProvider, _ := provider.(*contextProvider) hostProvider.fetcher = returnHostMapping() require.Equal(t, 100*time.Millisecond, hostProvider.CheckInterval) diff --git a/internal/pkg/composable/providers/kubernetes/config.go b/internal/pkg/composable/providers/kubernetes/config.go index 9bec67b66b8..4a97b417c59 100644 --- a/internal/pkg/composable/providers/kubernetes/config.go +++ b/internal/pkg/composable/providers/kubernetes/config.go @@ -7,6 +7,8 @@ package kubernetes import ( "time" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-libs/logp" @@ -34,6 +36,9 @@ type Config struct { LabelsDedot bool `config:"labels.dedot"` AnnotationsDedot bool `config:"annotations.dedot"` + + Hints *config.C `config:"hints"` + Prefix string `config:"prefix"` } // Resources config section for resources' config blocks @@ -56,6 +61,7 @@ func (c *Config) InitDefaults() { c.LabelsDedot = true c.AnnotationsDedot = true c.AddResourceMetadata = metadata.GetDefaultResourceMetadataConfig() + c.Prefix = "co.elastic" } // Validate ensures correctness of config diff --git a/internal/pkg/composable/providers/kubernetes/hints.go b/internal/pkg/composable/providers/kubernetes/hints.go new file mode 100644 index 00000000000..1a779e0c2c6 --- /dev/null +++ b/internal/pkg/composable/providers/kubernetes/hints.go @@ -0,0 +1,258 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "fmt" + "regexp" + "strings" + + "github.com/elastic/elastic-agent-autodiscover/utils" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +const ( + integration = "package" + datastreams = "data_streams" + + host = "host" + period = "period" + timeout = "timeout" + metricspath = "metrics_path" + username = "username" + password = "password" + stream = "stream" // this is the container stream: stdout/stderr +) + +type hintsBuilder struct { + Key string + + logger *logp.Logger +} + +func (m *hintsBuilder) getIntegration(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, integration) +} + +func (m *hintsBuilder) getDataStreams(hints mapstr.M) []string { + ds := utils.GetHintAsList(hints, m.Key, datastreams) + return ds +} + +func (m *hintsBuilder) getHost(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, host) +} + +func (m *hintsBuilder) getStreamHost(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, host) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getPeriod(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, period) +} + +func (m *hintsBuilder) getStreamPeriod(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, period) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getTimeout(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, timeout) +} + +func (m *hintsBuilder) getStreamTimeout(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, timeout) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getMetricspath(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, metricspath) +} + +func (m *hintsBuilder) getStreamMetricspath(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, metricspath) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getUsername(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, username) +} + +func (m *hintsBuilder) getStreamUsername(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, username) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getPassword(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, password) +} + +func (m *hintsBuilder) getStreamPassword(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, password) + return utils.GetHintString(hints, m.Key, key) +} + +func (m *hintsBuilder) getContainerStream(hints mapstr.M) string { + return utils.GetHintString(hints, m.Key, stream) +} + +func (m *hintsBuilder) getStreamContainerStream(hints mapstr.M, streamName string) string { + key := fmt.Sprintf("%v.%v", streamName, stream) + return utils.GetHintString(hints, m.Key, key) +} + +// Replace hints like `'${kubernetes.pod.ip}:6379'` with the actual values from the resource metadata. +// So if you replace the `${kubernetes.pod.ip}` part with the value from the Pod's metadata +// you end up with sth like `10.28.90.345:6379` +func (m *hintsBuilder) getFromMeta(value string, kubeMeta mapstr.M) string { + if value == "" { + return "" + } + r := regexp.MustCompile(`\${([^{}]+)}`) + matches := r.FindAllString(value, -1) + for _, match := range matches { + key := strings.TrimSuffix(strings.TrimPrefix(match, "${kubernetes."), "}") + val, err := kubeMeta.GetValue(key) + if err != nil { + m.logger.Debugf("cannot retrieve key from k8smeta: %v", key) + return "" + } + hintVal, ok := val.(string) + if !ok { + m.logger.Debugf("cannot convert value into string: %v", val) + return "" + } + value = strings.Replace(value, match, hintVal, -1) + } + return value +} + +// GenerateHintsMapping gets a hint's map extracted from the annotations and constructs the final +// hints' mapping to be emitted. +func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger, containerID string) mapstr.M { + builder := hintsBuilder{ + Key: "hints", // consider doing it a configurable, + logger: logger, + } + + hintsMapping := mapstr.M{} + integration := builder.getIntegration(hints) + if integration == "" { + return hintsMapping + } + integrationHints := mapstr.M{ + "enabled": true, + } + + if containerID != "" { + _, _ = hintsMapping.Put("container_id", containerID) + // Add the default container log fallback to enable any template which defines + // a log input with a `"${kubernetes.hints.container_logs.enabled} == true"` condition + _, _ = integrationHints.Put("container_logs.enabled", true) + } + + // TODO: add support for processors + // Processors should be data_stream specific. + // Add a basic processor as a base like: + //- add_fields: + // target: kubernetes + // fields: + // hints: true + // Blocked by https://github.com/elastic/elastic-agent/issues/735 + + integrationHost := builder.getFromMeta(builder.getHost(hints), kubeMeta) + if integrationHost != "" { + _, _ = integrationHints.Put(host, integrationHost) + } + integrationPeriod := builder.getFromMeta(builder.getPeriod(hints), kubeMeta) + if integrationPeriod != "" { + _, _ = integrationHints.Put(period, integrationPeriod) + } + integrationTimeout := builder.getFromMeta(builder.getTimeout(hints), kubeMeta) + if integrationTimeout != "" { + _, _ = integrationHints.Put(timeout, integrationTimeout) + } + integrationMetricsPath := builder.getFromMeta(builder.getMetricspath(hints), kubeMeta) + if integrationMetricsPath != "" { + _, _ = integrationHints.Put(metricspath, integrationMetricsPath) + } + integrationUsername := builder.getFromMeta(builder.getUsername(hints), kubeMeta) + if integrationUsername != "" { + _, _ = integrationHints.Put(username, integrationUsername) + } + integrationPassword := builder.getFromMeta(builder.getPassword(hints), kubeMeta) + if integrationPassword != "" { + _, _ = integrationHints.Put(password, integrationPassword) + } + integrationContainerStream := builder.getFromMeta(builder.getContainerStream(hints), kubeMeta) + if integrationContainerStream != "" { + _, _ = integrationHints.Put(stream, integrationContainerStream) + } + + dataStreams := builder.getDataStreams(hints) + for _, dataStream := range dataStreams { + streamHints := mapstr.M{ + "enabled": true, + } + if integrationPeriod != "" { + _, _ = streamHints.Put(period, integrationPeriod) + } + if integrationHost != "" { + _, _ = streamHints.Put(host, integrationHost) + } + if integrationTimeout != "" { + _, _ = streamHints.Put(timeout, integrationTimeout) + } + if integrationMetricsPath != "" { + _, _ = streamHints.Put(metricspath, integrationMetricsPath) + } + if integrationUsername != "" { + _, _ = streamHints.Put(username, integrationUsername) + } + if integrationPassword != "" { + _, _ = streamHints.Put(password, integrationPassword) + } + if integrationContainerStream != "" { + _, _ = streamHints.Put(stream, integrationContainerStream) + } + + streamPeriod := builder.getFromMeta(builder.getStreamPeriod(hints, dataStream), kubeMeta) + if streamPeriod != "" { + _, _ = streamHints.Put(period, streamPeriod) + } + streamHost := builder.getFromMeta(builder.getStreamHost(hints, dataStream), kubeMeta) + if streamHost != "" { + _, _ = streamHints.Put(host, streamHost) + } + streamTimeout := builder.getFromMeta(builder.getStreamTimeout(hints, dataStream), kubeMeta) + if streamTimeout != "" { + _, _ = streamHints.Put(timeout, streamTimeout) + } + streamMetricsPath := builder.getFromMeta(builder.getStreamMetricspath(hints, dataStream), kubeMeta) + if streamMetricsPath != "" { + _, _ = streamHints.Put(metricspath, streamMetricsPath) + } + streamUsername := builder.getFromMeta(builder.getStreamUsername(hints, dataStream), kubeMeta) + if streamUsername != "" { + _, _ = streamHints.Put(username, streamUsername) + } + streamPassword := builder.getFromMeta(builder.getStreamPassword(hints, dataStream), kubeMeta) + if streamPassword != "" { + _, _ = streamHints.Put(password, streamPassword) + } + streamContainerStream := builder.getFromMeta(builder.getStreamContainerStream(hints, dataStream), kubeMeta) + if streamContainerStream != "" { + _, _ = streamHints.Put(stream, streamContainerStream) + } + _, _ = integrationHints.Put(dataStream, streamHints) + + } + + _, _ = hintsMapping.Put(integration, integrationHints) + + return hintsMapping +} diff --git a/internal/pkg/composable/providers/kubernetes/hints_test.go b/internal/pkg/composable/providers/kubernetes/hints_test.go new file mode 100644 index 00000000000..e23296d09a7 --- /dev/null +++ b/internal/pkg/composable/providers/kubernetes/hints_test.go @@ -0,0 +1,301 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package kubernetes + +import ( + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestGenerateHintsMapping(t *testing.T) { + logger := getLogger() + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": mapstr.M{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "ip": pod.Status.PodIP, + }, + "namespace_annotations": mapstr.M{ + "nsa": "nsb", + }, + "labels": mapstr.M{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + "annotations": mapstr.M{ + "app": "production", + }, + } + hints := mapstr.M{ + "hints": mapstr.M{ + "data_streams": "info, key, keyspace", + "host": "${kubernetes.pod.ip}:6379", + "info": mapstr.M{"period": "1m", "timeout": "41s"}, + "key": mapstr.M{"period": "10m"}, + "package": "redis", + "password": "password", + "username": "username", + "metrics_path": "/metrics", + "timeout": "42s", + "period": "42s", + }, + } + + expected := mapstr.M{ + "redis": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + "period": "42s", + "info": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "1m", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "41s", + }, "key": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "10m", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + }, "keyspace": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "42s", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + }, + }, + } + + hintsMapping := GenerateHintsMapping(hints, mapping, logger, "") + + assert.Equal(t, expected, hintsMapping) +} + +func TestGenerateHintsMappingWithContainerID(t *testing.T) { + logger := getLogger() + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": mapstr.M{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "ip": pod.Status.PodIP, + }, + "namespace_annotations": mapstr.M{ + "nsa": "nsb", + }, + "labels": mapstr.M{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + "annotations": mapstr.M{ + "app": "production", + }, + } + hints := mapstr.M{ + "hints": mapstr.M{ + "data_streams": "info, key, keyspace", + "host": "${kubernetes.pod.ip}:6379", + "info": mapstr.M{"period": "1m", "timeout": "41s"}, + "key": mapstr.M{"period": "10m"}, + "package": "redis", + "password": "password", + "username": "username", + "metrics_path": "/metrics", + "timeout": "42s", + "period": "42s", + }, + } + + expected := mapstr.M{ + "container_id": "asdfghjklqwerty", + "redis": mapstr.M{ + "container_logs": mapstr.M{ + "enabled": true, + }, + "enabled": true, + "host": "127.0.0.5:6379", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + "period": "42s", + "info": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "1m", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "41s", + }, "key": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "10m", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + }, "keyspace": mapstr.M{ + "enabled": true, + "host": "127.0.0.5:6379", + "period": "42s", + "metrics_path": "/metrics", + "username": "username", + "password": "password", + "timeout": "42s", + }, + }, + } + + hintsMapping := GenerateHintsMapping(hints, mapping, logger, "asdfghjklqwerty") + + assert.Equal(t, expected, hintsMapping) +} + +func TestGenerateHintsMappingWithLogStream(t *testing.T) { + logger := getLogger() + pod := &kubernetes.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + UID: types.UID(uid), + Namespace: "testns", + Labels: map[string]string{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + Annotations: map[string]string{ + "app": "production", + }, + }, + TypeMeta: metav1.TypeMeta{ + Kind: "Pod", + APIVersion: "v1", + }, + Spec: kubernetes.PodSpec{ + NodeName: "testnode", + }, + Status: kubernetes.PodStatus{PodIP: "127.0.0.5"}, + } + + mapping := map[string]interface{}{ + "namespace": pod.GetNamespace(), + "pod": mapstr.M{ + "uid": string(pod.GetUID()), + "name": pod.GetName(), + "ip": pod.Status.PodIP, + }, + "namespace_annotations": mapstr.M{ + "nsa": "nsb", + }, + "labels": mapstr.M{ + "foo": "bar", + "with-dash": "dash-value", + "with/slash": "some/path", + }, + "annotations": mapstr.M{ + "app": "production", + }, + } + hints := mapstr.M{ + "hints": mapstr.M{ + "data_streams": "access, error", + "access": mapstr.M{"stream": "stdout"}, + "error": mapstr.M{"stream": "stderr"}, + "package": "apache", + }, + } + + expected := mapstr.M{ + "container_id": "asdfghjkl", + "apache": mapstr.M{ + "enabled": true, + "container_logs": mapstr.M{ + "enabled": true, + }, + "access": mapstr.M{ + "enabled": true, + "stream": "stdout", + }, "error": mapstr.M{ + "enabled": true, + "stream": "stderr", + }, + }, + } + + hintsMapping := GenerateHintsMapping(hints, mapping, logger, "asdfghjkl") + + assert.Equal(t, expected, hintsMapping) +} diff --git a/internal/pkg/composable/providers/kubernetes/kubernetes.go b/internal/pkg/composable/providers/kubernetes/kubernetes.go index 91367c5252f..ab4a14d6a61 100644 --- a/internal/pkg/composable/providers/kubernetes/kubernetes.go +++ b/internal/pkg/composable/providers/kubernetes/kubernetes.go @@ -7,9 +7,11 @@ package kubernetes import ( "fmt" + "github.com/elastic/elastic-agent-autodiscover/kubernetes" + "github.com/elastic/elastic-agent-libs/logp" + k8s "k8s.io/client-go/kubernetes" - "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent/internal/pkg/agent/errors" "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" @@ -34,12 +36,13 @@ func init() { } type dynamicProvider struct { - logger *logger.Logger - config *Config + logger *logger.Logger + config *Config + managed bool } // DynamicProviderBuilder builds the dynamic provider. -func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { +func DynamicProviderBuilder(logger *logger.Logger, c *config.Config, managed bool) (composable.DynamicProvider, error) { var cfg Config if c == nil { c = config.New() @@ -49,11 +52,15 @@ func DynamicProviderBuilder(logger *logger.Logger, c *config.Config) (composable return nil, errors.New(err, "failed to unpack configuration") } - return &dynamicProvider{logger, &cfg}, nil + return &dynamicProvider{logger, &cfg, managed}, nil } // Run runs the kubernetes context provider. func (p *dynamicProvider) Run(comm composable.DynamicProviderComm) error { + if p.config.Hints.Enabled() { + betalogger := logp.NewLogger("cfgwarn") + betalogger.Warnf("BETA: Hints' feature is beta.") + } if p.config.Resources.Pod.Enabled { err := p.watchResource(comm, "pod") if err != nil { @@ -139,19 +146,19 @@ func (p *dynamicProvider) newEventer( client k8s.Interface) (Eventer, error) { switch resourceType { case "pod": - eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewPodEventer(comm, p.config, p.logger, client, p.config.Scope, p.managed) if err != nil { return nil, err } return eventer, nil case nodeScope: - eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewNodeEventer(comm, p.config, p.logger, client, p.config.Scope, p.managed) if err != nil { return nil, err } return eventer, nil case "service": - eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope) + eventer, err := NewServiceEventer(comm, p.config, p.logger, client, p.config.Scope, p.managed) if err != nil { return nil, err } diff --git a/internal/pkg/composable/providers/kubernetes/node.go b/internal/pkg/composable/providers/kubernetes/node.go index a1539afb9c1..0e5aebc8931 100644 --- a/internal/pkg/composable/providers/kubernetes/node.go +++ b/internal/pkg/composable/providers/kubernetes/node.go @@ -43,7 +43,8 @@ func NewNodeEventer( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (Eventer, error) { + scope string, + managed bool) (Eventer, error) { watcher, err := kubernetes.NewNamedWatcher("agent-node", client, &kubernetes.Node{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, diff --git a/internal/pkg/composable/providers/kubernetes/node_test.go b/internal/pkg/composable/providers/kubernetes/node_test.go index ab19e7d2ce2..8415304b00b 100644 --- a/internal/pkg/composable/providers/kubernetes/node_test.go +++ b/internal/pkg/composable/providers/kubernetes/node_test.go @@ -93,11 +93,6 @@ func TestGenerateNodeData(t *testing.T) { type nodeMeta struct{} // Generate generates node metadata from a resource object -// Metadata map is in the following form: -// { -// "kubernetes": {}, -// "some.ecs.field": "asdf" -// } // All Kubernetes fields that need to be stored under kubernetes. prefix are populated by // GenerateK8s method while fields that are part of ECS are generated by GenerateECS method func (n *nodeMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) mapstr.M { diff --git a/internal/pkg/composable/providers/kubernetes/pod.go b/internal/pkg/composable/providers/kubernetes/pod.go index 034df3c7a72..53814a182fa 100644 --- a/internal/pkg/composable/providers/kubernetes/pod.go +++ b/internal/pkg/composable/providers/kubernetes/pod.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/elastic/elastic-agent-autodiscover/utils" + "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" c "github.com/elastic/elastic-agent-libs/config" @@ -31,6 +33,7 @@ type pod struct { config *Config logger *logp.Logger scope string + managed bool cleanupTimeout time.Duration // Mutex used by configuration updates not triggered by the main watcher, @@ -51,7 +54,8 @@ func NewPodEventer( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (Eventer, error) { + scope string, + managed bool) (Eventer, error) { watcher, err := kubernetes.NewNamedWatcher("agent-pod", client, &kubernetes.Pod{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, @@ -95,6 +99,7 @@ func NewPodEventer( watcher: watcher, nodeWatcher: nodeWatcher, namespaceWatcher: namespaceWatcher, + managed: managed, } watcher.AddEventHandler(p) @@ -149,10 +154,32 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { data := generatePodData(pod, p.metagen, namespaceAnnotations) data.mapping["scope"] = p.scope - // Emit the pod - // We emit Pod + containers to ensure that configs matching Pod only - // get Pod metadata (not specific to any container) - _ = p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) + + if p.config.Hints.Enabled() { // This is "hints based autodiscovery flow" + if !p.managed { + if ann, ok := data.mapping["annotations"]; ok { + annotations, _ := ann.(mapstr.M) + hints := utils.GenerateHints(annotations, "", p.config.Prefix) + if len(hints) > 0 { + p.logger.Errorf("Extracted hints are :%v", hints) + hintsMapping := GenerateHintsMapping(hints, data.mapping, p.logger, "") + p.logger.Errorf("Generated hints mappings are :%v", hintsMapping) + _ = p.comm.AddOrUpdate( + data.uid, + PodPriority, + map[string]interface{}{"hints": hintsMapping}, + data.processors, + ) + } + } + } + } else { // This is the "template-based autodiscovery" flow + // emit normal mapping to be used in dynamic variable resolution + // Emit the pod + // We emit Pod + containers to ensure that configs matching Pod only + // get Pod metadata (not specific to any container) + _ = p.comm.AddOrUpdate(data.uid, PodPriority, data.mapping, data.processors) + } // Emit all containers in the pod // We should deal with init containers stopping after initialization @@ -160,7 +187,7 @@ func (p *pod) emitRunning(pod *kubernetes.Pod) { } func (p *pod) emitContainers(pod *kubernetes.Pod, namespaceAnnotations mapstr.M) { - generateContainerData(p.comm, pod, p.metagen, namespaceAnnotations) + generateContainerData(p.comm, pod, p.metagen, namespaceAnnotations, p.logger, p.managed, p.config) } func (p *pod) emitStopped(pod *kubernetes.Pod) { @@ -265,7 +292,10 @@ func generateContainerData( comm composable.DynamicProviderComm, pod *kubernetes.Pod, kubeMetaGen metadata.MetaGen, - namespaceAnnotations mapstr.M) { + namespaceAnnotations mapstr.M, + logger *logp.Logger, + managed bool, + config *Config) { containers := kubernetes.GetContainersInPod(pod) @@ -344,7 +374,28 @@ func generateContainerData( _, _ = containerMeta.Put("port", fmt.Sprintf("%v", port.ContainerPort)) _, _ = containerMeta.Put("port_name", port.Name) k8sMapping["container"] = containerMeta - _ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + + if config.Hints.Enabled() { // This is "hints based autodiscovery flow" + if !managed { + if ann, ok := k8sMapping["annotations"]; ok { + annotations, _ := ann.(mapstr.M) + hints := utils.GenerateHints(annotations, "", config.Prefix) + if len(hints) > 0 { + logger.Debugf("Extracted hints are :%v", hints) + hintsMapping := GenerateHintsMapping(hints, k8sMapping, logger, c.ID) + logger.Debugf("Generated hints mappings are :%v", hintsMapping) + _ = comm.AddOrUpdate( + eventID, + PodPriority, + map[string]interface{}{"hints": hintsMapping}, + processors, + ) + } + } + } + } else { // This is the "template-based autodiscovery" flow + _ = comm.AddOrUpdate(eventID, ContainerPriority, k8sMapping, processors) + } } } else { k8sMapping["container"] = containerMeta diff --git a/internal/pkg/composable/providers/kubernetes/pod_test.go b/internal/pkg/composable/providers/kubernetes/pod_test.go index 95361fd2ce0..7409ad1a3ea 100644 --- a/internal/pkg/composable/providers/kubernetes/pod_test.go +++ b/internal/pkg/composable/providers/kubernetes/pod_test.go @@ -9,17 +9,28 @@ import ( "fmt" "testing" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent/pkg/core/logger" + "github.com/elastic/elastic-agent-autodiscover/kubernetes" "github.com/elastic/elastic-agent-autodiscover/kubernetes/metadata" "github.com/elastic/elastic-agent-libs/mapstr" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + "github.com/elastic/elastic-agent/internal/pkg/config" ) +func getLogger() *logger.Logger { + loggerCfg := logger.DefaultLoggingConfig() + loggerCfg.Level = logp.ErrorLevel + l, _ := logger.NewFromConfig("", loggerCfg, false) + return l +} + func TestGeneratePodData(t *testing.T) { pod := &kubernetes.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -153,13 +164,21 @@ func TestGenerateContainerPodData(t *testing.T) { context.TODO(), providerDataChan, } + logger := getLogger() + var cfg Config + c := config.New() + _ = c.Unpack(&cfg) generateContainerData( &comm, pod, &podMeta{}, mapstr.M{ "nsa": "nsb", - }) + }, + logger, + true, + &cfg, + ) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), @@ -274,13 +293,21 @@ func TestEphemeralContainers(t *testing.T) { context.TODO(), providerDataChan, } + + logger := getLogger() + var cfg Config + c := config.New() + _ = c.Unpack(&cfg) generateContainerData( &comm, pod, &podMeta{}, mapstr.M{ "nsa": "nsb", - }) + }, + logger, + true, + &cfg) mapping := map[string]interface{}{ "namespace": pod.GetNamespace(), @@ -366,11 +393,6 @@ func (t *MockDynamicComm) Remove(id string) { type podMeta struct{} // Generate generates pod metadata from a resource object -// Metadata map is in the following form: -// { -// "kubernetes": {}, -// "some.ecs.field": "asdf" -// } // All Kubernetes fields that need to be stored under kubernetes. prefix are populated by // GenerateK8s method while fields that are part of ECS are generated by GenerateECS method func (p *podMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) mapstr.M { diff --git a/internal/pkg/composable/providers/kubernetes/service.go b/internal/pkg/composable/providers/kubernetes/service.go index 49c20627734..4060c12e646 100644 --- a/internal/pkg/composable/providers/kubernetes/service.go +++ b/internal/pkg/composable/providers/kubernetes/service.go @@ -43,7 +43,8 @@ func NewServiceEventer( cfg *Config, logger *logp.Logger, client k8s.Interface, - scope string) (Eventer, error) { + scope string, + managed bool) (Eventer, error) { watcher, err := kubernetes.NewNamedWatcher("agent-service", client, &kubernetes.Service{}, kubernetes.WatchOptions{ SyncTimeout: cfg.SyncPeriod, Node: cfg.Node, diff --git a/internal/pkg/composable/providers/kubernetes/service_test.go b/internal/pkg/composable/providers/kubernetes/service_test.go index 69e945ee1cd..1943e3cfcdb 100644 --- a/internal/pkg/composable/providers/kubernetes/service_test.go +++ b/internal/pkg/composable/providers/kubernetes/service_test.go @@ -107,11 +107,6 @@ func TestGenerateServiceData(t *testing.T) { type svcMeta struct{} // Generate generates svc metadata from a resource object -// Metadata map is in the following form: -// { -// "kubernetes": {}, -// "some.ecs.field": "asdf" -// } // All Kubernetes fields that need to be stored under kubernetes. prefix are populated by // GenerateK8s method while fields that are part of ECS are generated by GenerateECS method func (s *svcMeta) Generate(obj kubernetes.Resource, opts ...metadata.FieldOptions) mapstr.M { diff --git a/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go index 410e13ec77d..0276a4a6e0c 100644 --- a/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go +++ b/internal/pkg/composable/providers/kubernetesleaderelection/kubernetes_leaderelection.go @@ -35,7 +35,7 @@ type contextProvider struct { } // ContextProviderBuilder builds the provider. -func ContextProviderBuilder(logger *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed bool) (corecomp.ContextProvider, error) { var cfg Config if c == nil { c = config.New() diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 0bc560295ed..e560b08a599 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -34,7 +34,7 @@ type contextProviderK8sSecrets struct { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(logger *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(logger *logger.Logger, c *config.Config, managed bool) (corecomp.ContextProvider, error) { var cfg Config if c == nil { c = config.New() diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 4c80800a59b..079d7b4becc 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -49,7 +49,7 @@ func Test_K8sSecretsProvider_Fetch(t *testing.T) { cfg, err := config.NewConfigFrom(map[string]string{"a": "b"}) require.NoError(t, err) - p, err := ContextProviderBuilder(logger, cfg) + p, err := ContextProviderBuilder(logger, cfg, true) require.NoError(t, err) fp, _ := p.(corecomp.FetchContextProvider) @@ -86,7 +86,7 @@ func Test_K8sSecretsProvider_FetchWrongSecret(t *testing.T) { cfg, err := config.NewConfigFrom(map[string]string{"a": "b"}) require.NoError(t, err) - p, err := ContextProviderBuilder(logger, cfg) + p, err := ContextProviderBuilder(logger, cfg, true) require.NoError(t, err) fp, _ := p.(corecomp.FetchContextProvider) diff --git a/internal/pkg/composable/providers/local/local.go b/internal/pkg/composable/providers/local/local.go index 9c611ecbd13..2078dcf40ed 100644 --- a/internal/pkg/composable/providers/local/local.go +++ b/internal/pkg/composable/providers/local/local.go @@ -15,7 +15,7 @@ import ( ) func init() { - composable.Providers.AddContextProvider("local", ContextProviderBuilder) + _ = composable.Providers.AddContextProvider("local", ContextProviderBuilder) } type contextProvider struct { @@ -32,7 +32,7 @@ func (c *contextProvider) Run(comm corecomp.ContextProviderComm) error { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(_ *logger.Logger, c *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(_ *logger.Logger, c *config.Config, managed bool) (corecomp.ContextProvider, error) { p := &contextProvider{} if c != nil { err := c.Unpack(p) diff --git a/internal/pkg/composable/providers/local/local_test.go b/internal/pkg/composable/providers/local/local_test.go index 6afe29251d5..dfec629b88a 100644 --- a/internal/pkg/composable/providers/local/local_test.go +++ b/internal/pkg/composable/providers/local/local_test.go @@ -26,7 +26,7 @@ func TestContextProvider(t *testing.T) { }) require.NoError(t, err) builder, _ := composable.Providers.GetContextProvider("local") - provider, err := builder(nil, cfg) + provider, err := builder(nil, cfg, true) require.NoError(t, err) comm := ctesting.NewContextComm(context.Background()) diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic.go b/internal/pkg/composable/providers/localdynamic/localdynamic.go index f4f99ca4030..39a233d72da 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic.go @@ -18,7 +18,7 @@ import ( const ItemPriority = 0 func init() { - composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder) + _ = composable.Providers.AddDynamicProvider("local_dynamic", DynamicProviderBuilder) } type dynamicItem struct { @@ -41,7 +41,7 @@ func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error { } // DynamicProviderBuilder builds the dynamic provider. -func DynamicProviderBuilder(_ *logger.Logger, c *config.Config) (composable.DynamicProvider, error) { +func DynamicProviderBuilder(_ *logger.Logger, c *config.Config, managed bool) (composable.DynamicProvider, error) { p := &dynamicProvider{} if c != nil { err := c.Unpack(p) diff --git a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go index a20b37852d9..8cc0a44ccd7 100644 --- a/internal/pkg/composable/providers/localdynamic/localdynamic_test.go +++ b/internal/pkg/composable/providers/localdynamic/localdynamic_test.go @@ -60,7 +60,7 @@ func TestContextProvider(t *testing.T) { }) require.NoError(t, err) builder, _ := composable.Providers.GetDynamicProvider("local_dynamic") - provider, err := builder(nil, cfg) + provider, err := builder(nil, cfg, true) require.NoError(t, err) comm := ctesting.NewDynamicComm(context.Background()) diff --git a/internal/pkg/composable/providers/path/path.go b/internal/pkg/composable/providers/path/path.go index 455f46d2b28..f0062d19b57 100644 --- a/internal/pkg/composable/providers/path/path.go +++ b/internal/pkg/composable/providers/path/path.go @@ -14,7 +14,7 @@ import ( ) func init() { - composable.Providers.AddContextProvider("path", ContextProviderBuilder) + _ = composable.Providers.AddContextProvider("path", ContextProviderBuilder) } type contextProvider struct{} @@ -34,6 +34,6 @@ func (*contextProvider) Run(comm corecomp.ContextProviderComm) error { } // ContextProviderBuilder builds the context provider. -func ContextProviderBuilder(_ *logger.Logger, _ *config.Config) (corecomp.ContextProvider, error) { +func ContextProviderBuilder(_ *logger.Logger, _ *config.Config, managed bool) (corecomp.ContextProvider, error) { return &contextProvider{}, nil } diff --git a/internal/pkg/composable/providers/path/path_test.go b/internal/pkg/composable/providers/path/path_test.go index 14f263e56db..094865d3fbd 100644 --- a/internal/pkg/composable/providers/path/path_test.go +++ b/internal/pkg/composable/providers/path/path_test.go @@ -18,7 +18,7 @@ import ( func TestContextProvider(t *testing.T) { builder, _ := composable.Providers.GetContextProvider("path") - provider, err := builder(nil, nil) + provider, err := builder(nil, nil, true) require.NoError(t, err) comm := ctesting.NewContextComm(context.Background()) diff --git a/internal/pkg/composable/testing/dynamic.go b/internal/pkg/composable/testing/dynamic.go index bfa48dff57d..99b499835cd 100644 --- a/internal/pkg/composable/testing/dynamic.go +++ b/internal/pkg/composable/testing/dynamic.go @@ -81,6 +81,7 @@ func (t *DynamicComm) Previous(id string) (DynamicState, bool) { return prev, ok } +//nolint:prealloc,goimports,nolintlint // false positive // PreviousIDs returns the previous set mapping ID. func (t *DynamicComm) PreviousIDs() []string { t.lock.Lock() @@ -100,6 +101,7 @@ func (t *DynamicComm) Current(id string) (DynamicState, bool) { return curr, ok } +//nolint:prealloc,goimports,nolintlint // false positive // CurrentIDs returns the current set mapping ID. func (t *DynamicComm) CurrentIDs() []string { t.lock.Lock()