Skip to content

Commit

Permalink
filebeat: input v2 compat uses random ID for CheckConfig (#41585)
Browse files Browse the repository at this point in the history
The CheckConfig function validates a configuration by creating and immediately discarding an input. However, a potential conflict arises when CheckConfig is used with autodiscover in Kubernetes.

Autodiscover accumulates configuration changes and applies them in batches. This can be problematic if a stop event for a pod is closely followed by a start event for the same pod (e.g., during a pod restart) before the inputs are reloaded. In this scenario, autodiscover might attempt to validate the configuration for the start event while the input for the pod is already running. This would lead to filestream input manager to see two inputs with the same ID, triggering a log warning.

Although this situation generates a warning, it doesn't result in data duplication. As the second input is only created to validate the configuration and later discarded. Also the reload process will ensure only new inputs are created, any input already running won't be duplicated.

(cherry picked from commit 697ede4)
  • Loading branch information
AndersonQ authored and mergify[bot] committed Nov 14, 2024
1 parent 5c6ea75 commit 54f9fc7
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 2 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,24 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Removed deprecated Cylance from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Removed deprecated Bluecoat from Beats. See <<migrate-from-deprecated-module>> for migration options. {pull}38037[38037]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]
- Update Salesforce module to use new Salesforce input. {pull}37509[37509]
- Tag events that come from a filestream in "take over" mode. {pull}39828[39828]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Enable file ingestion to report detailed status to Elastic Agent {pull}40075[40075]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]
- Added support for hyphens in extension keys in `decode_cef` Filebeat processor. {pull}40427[40427]
- Journald: removed configuration options `include_matches.or`, `include_matches.and`, `backoff`, `max_backoff`, `cursor_seek_fallback`. {pull}40061[40061]
- Journald: `include_matches.match` now behaves in the same way as matchers in `journalctl`. Users should carefully update their input configuration. {pull}40061[40061]
- Journald: `seek` and `since` behaviour have been simplified, if there is a cursor (state) `seek` and `since` are ignored and the cursor is used. {pull}40061[40061]
- Redis: Added replication role as a field to submitted slowlogs
- Added `container.image.name` to `journald` Filebeat input's Docker-specific translated fields. {pull}40450[40450]
- Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]


*Heartbeat*
Expand Down
37 changes: 35 additions & 2 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"sync"

"github.com/gofrs/uuid/v5"
"github.com/mitchellh/hashstructure"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -73,12 +74,19 @@ func RunnerFactory(
}

func (f *factory) CheckConfig(cfg *conf.C) error {
_, err := f.loader.Configure(cfg)
// just check the config, therefore to avoid potential side effects (ID duplication)
// change the ID.
checkCfg, err := f.generateCheckConfig(cfg)
if err != nil {
f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true))
checkCfg = cfg
}
_, err = f.loader.Configure(checkCfg)
if err != nil {
return fmt.Errorf("runner factory could not check config: %w", err)
}

if err = f.loader.Delete(cfg); err != nil {
if err = f.loader.Delete(checkCfg); err != nil {
return fmt.Errorf(
"runner factory failed to delete an input after config check: %w",
err)
Expand Down Expand Up @@ -176,3 +184,28 @@ func configID(config *conf.C) (string, error) {

return fmt.Sprintf("%16X", id), nil
}

func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) {
// copy the config so it's safe to change it
testCfg, err := conf.NewConfigFrom(config)
if err != nil {
return nil, fmt.Errorf("failed to create new config: %w", err)
}

// let's try to override the `id` field, if it fails, give up
inputID, err := testCfg.String("id", -1)
if err != nil {
return nil, fmt.Errorf("failed to get 'id': %w", err)
}

id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("failed to generate check congig id: %w", err)
}
err = testCfg.SetString("id", -1, inputID+"-"+id.String())
if err != nil {
return nil, fmt.Errorf("failed to set 'id': %w", err)
}

return testCfg, nil
}
113 changes: 113 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package compat

import (
"errors"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) {
assert.Equal(t, 0, countRun)
})

t.Run("does not cause input ID duplication", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int
var runWG sync.WaitGroup
var ids = map[string]int{}
var idsMu sync.Mutex

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(cfg *conf.C) (v2.Input, error) {
idsMu.Lock()
defer idsMu.Unlock()
id, err := cfg.String("id", -1)
assert.NoError(t, err, "OnConfigure: could not get 'id' fom config")
idsCount := ids[id]
ids[id] = idsCount + 1

countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error {
runWG.Done()
countRun++
return nil
},
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4"
inputCfg := fmt.Sprintf(`
id: %s
parsers:
- container: null
paths:
- /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "could not create input")

runWG.Add(1)
runner.Start()
defer runner.Stop()
// wait input to be running
runWG.Wait()

err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "unexpected error when calling CheckConfig")

// validate: configured an input, but do not run test or run
assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times")
assert.Equal(t, 0, countTest, "OnTest should not have been called")
assert.Equal(t, 1, countRun, "OnRun should be called only once")
idsMu.Lock()
assert.Equal(t, 1, ids[inputID])
idsMu.Unlock()
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
Expand Down Expand Up @@ -118,3 +186,48 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) {
assert.Error(t, err)
})
}

func TestGenerateCheckConfig(t *testing.T) {
tcs := []struct {
name string
cfg *conf.C
want *conf.C
wantErr error
assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool
}{
{
name: "id is present",
cfg: conf.MustNewConfigFrom("id: some-id"),
assertCfg: assert.NotEqual,
},
{
name: "absent id",
cfg: conf.MustNewConfigFrom(""),
wantErr: errors.New("failed to get 'id'"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
{
name: "invalid config",
cfg: nil,
wantErr: errors.New("failed to create new config"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
f := factory{}

got, err := f.generateCheckConfig(tc.cfg)
if tc.wantErr != nil {
assert.ErrorContains(t, err, tc.wantErr.Error())
}

tc.assertCfg(t, tc.cfg, got)
})
}
}

0 comments on commit 54f9fc7

Please sign in to comment.