Skip to content

Commit

Permalink
[8.16](backport #41585) filebeat: input v2 compat uses random ID for …
Browse files Browse the repository at this point in the history
…CheckConfig (#41642)

* filebeat: input v2 compat uses random ID for CheckConfig (#41585)

The CheckConfig function validates a configuration by creating and immediately discarding an input. However, a potential conflict arises when CheckConfig is used with autodiscover in Kubernetes.

Autodiscover accumulates configuration changes and applies them in batches. This can be problematic if a stop event for a pod is closely followed by a start event for the same pod (e.g., during a pod restart) before the inputs are reloaded. In this scenario, autodiscover might attempt to validate the configuration for the start event while the input for the pod is already running. This would lead to filestream input manager to see two inputs with the same ID, triggering a log warning.

Although this situation generates a warning, it doesn't result in data duplication. As the second input is only created to validate the configuration and later discarded. Also the reload process will ensure only new inputs are created, any input already running won't be duplicated.

(cherry picked from commit 697ede4)

---------

Co-authored-by: Anderson Queiroz <anderson.queiroz@elastic.co>
  • Loading branch information
mergify[bot] and AndersonQ authored Nov 22, 2024
1 parent f88f887 commit 3242411
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Affecting all Beats*

- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

*Auditbeat*

Expand Down
37 changes: 35 additions & 2 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"sync"

"github.com/gofrs/uuid/v5"
"github.com/mitchellh/hashstructure"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -73,12 +74,19 @@ func RunnerFactory(
}

func (f *factory) CheckConfig(cfg *conf.C) error {
_, err := f.loader.Configure(cfg)
// just check the config, therefore to avoid potential side effects (ID duplication)
// change the ID.
checkCfg, err := f.generateCheckConfig(cfg)
if err != nil {
f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true))
checkCfg = cfg
}
_, err = f.loader.Configure(checkCfg)
if err != nil {
return fmt.Errorf("runner factory could not check config: %w", err)
}

if err = f.loader.Delete(cfg); err != nil {
if err = f.loader.Delete(checkCfg); err != nil {
return fmt.Errorf(
"runner factory failed to delete an input after config check: %w",
err)
Expand Down Expand Up @@ -176,3 +184,28 @@ func configID(config *conf.C) (string, error) {

return fmt.Sprintf("%16X", id), nil
}

func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) {
// copy the config so it's safe to change it
testCfg, err := conf.NewConfigFrom(config)
if err != nil {
return nil, fmt.Errorf("failed to create new config: %w", err)
}

// let's try to override the `id` field, if it fails, give up
inputID, err := testCfg.String("id", -1)
if err != nil {
return nil, fmt.Errorf("failed to get 'id': %w", err)
}

id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("failed to generate check congig id: %w", err)
}
err = testCfg.SetString("id", -1, inputID+"-"+id.String())
if err != nil {
return nil, fmt.Errorf("failed to set 'id': %w", err)
}

return testCfg, nil
}
113 changes: 113 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package compat

import (
"errors"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) {
assert.Equal(t, 0, countRun)
})

t.Run("does not cause input ID duplication", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int
var runWG sync.WaitGroup
var ids = map[string]int{}
var idsMu sync.Mutex

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(cfg *conf.C) (v2.Input, error) {
idsMu.Lock()
defer idsMu.Unlock()
id, err := cfg.String("id", -1)
assert.NoError(t, err, "OnConfigure: could not get 'id' fom config")
idsCount := ids[id]
ids[id] = idsCount + 1

countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error {
runWG.Done()
countRun++
return nil
},
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4"
inputCfg := fmt.Sprintf(`
id: %s
parsers:
- container: null
paths:
- /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "could not create input")

runWG.Add(1)
runner.Start()
defer runner.Stop()
// wait input to be running
runWG.Wait()

err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "unexpected error when calling CheckConfig")

// validate: configured an input, but do not run test or run
assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times")
assert.Equal(t, 0, countTest, "OnTest should not have been called")
assert.Equal(t, 1, countRun, "OnRun should be called only once")
idsMu.Lock()
assert.Equal(t, 1, ids[inputID])
idsMu.Unlock()
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
Expand Down Expand Up @@ -118,3 +186,48 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) {
assert.Error(t, err)
})
}

func TestGenerateCheckConfig(t *testing.T) {
tcs := []struct {
name string
cfg *conf.C
want *conf.C
wantErr error
assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool
}{
{
name: "id is present",
cfg: conf.MustNewConfigFrom("id: some-id"),
assertCfg: assert.NotEqual,
},
{
name: "absent id",
cfg: conf.MustNewConfigFrom(""),
wantErr: errors.New("failed to get 'id'"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
{
name: "invalid config",
cfg: nil,
wantErr: errors.New("failed to create new config"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
f := factory{}

got, err := f.generateCheckConfig(tc.cfg)
if tc.wantErr != nil {
assert.ErrorContains(t, err, tc.wantErr.Error())
}

tc.assertCfg(t, tc.cfg, got)
})
}
}

0 comments on commit 3242411

Please sign in to comment.