Skip to content

Commit

Permalink
filebeat: input v2 compat uses random ID for CheckConfig
Browse files Browse the repository at this point in the history
The CheckConfig function validates a configuration by creating and immediately discarding an input. However, a potential conflict arises when CheckConfig is used with autodiscover in Kubernetes.

Autodiscover accumulates configuration changes and applies them in batches. This can be problematic if a stop event for a pod is closely followed by a start event for the same pod (e.g., during a pod restart) before the inputs are reloaded. In this scenario, autodiscover might attempt to validate the configuration for the start event while the input for the pod is already running. This would lead to filestream input manager to see two inputs with the same ID, triggering a log warning.

Although this situation generates a warning, it doesn't result in data duplication. As the second input is only created to validate the configuration and later discarded. Also the reload process will ensure only new inputs are created, any input already running won't be duplicated.
  • Loading branch information
AndersonQ committed Nov 11, 2024
1 parent bfde79f commit 32f04e3
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover,

- Add kafka compression support for ZSTD.

Expand Down
33 changes: 31 additions & 2 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"context"
"errors"
"fmt"
"math/rand" // using for better performance
"strconv"
"sync"

"github.com/mitchellh/hashstructure"
Expand Down Expand Up @@ -73,12 +75,18 @@ func RunnerFactory(
}

func (f *factory) CheckConfig(cfg *conf.C) error {
_, err := f.loader.Configure(cfg)
// just check the config, therefore to avoid potential side effects (ID duplication)
// change the ID.
checkCfg, err := f.generateCheckConfig(cfg)
if err != nil {
f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true))
}
_, err = f.loader.Configure(checkCfg)
if err != nil {
return fmt.Errorf("runner factory could not check config: %w", err)
}

if err = f.loader.Delete(cfg); err != nil {
if err = f.loader.Delete(checkCfg); err != nil {
return fmt.Errorf(
"runner factory failed to delete an input after config check: %w",
err)
Expand Down Expand Up @@ -176,3 +184,24 @@ func configID(config *conf.C) (string, error) {

return fmt.Sprintf("%16X", id), nil
}

func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) {
testCfg, err := conf.NewConfigFrom(config)
if err != nil {
return config, fmt.Errorf("failed to create new config: %w", err)
}

// let's try to override the `inputID` field, if it fails, give up
inputID, err := testCfg.String("inputID", -1)
if err != nil {
return config, fmt.Errorf("failed to get 'inputID': %w", err)
}

// using math/rand for performance, generate a 0-9 string
err = testCfg.SetString("inputID", -1, inputID+strconv.Itoa(rand.Intn(10)))
if err != nil {
return config, fmt.Errorf("failed to set 'inputID': %w", err)
}

return testCfg, nil
}
109 changes: 109 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package compat

import (
"errors"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) {
assert.Equal(t, 0, countRun)
})

t.Run("does not cause input ID duplication", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int
var runWG sync.WaitGroup
var ids = map[string]int{}
var idsMu sync.Mutex

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(cfg *conf.C) (v2.Input, error) {
idsMu.Lock()
defer idsMu.Unlock()
id, err := cfg.String("id", -1)
assert.NoError(t, err, "OnConfigure: could not get 'id' fom config")
idsCount := ids[id]
ids[id] = idsCount + 1

countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error {
runWG.Done()
countRun++
return nil
},
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4"
inputCfg := fmt.Sprintf(`
id: %s
parsers:
- container: null
paths:
- /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "could not create input")

runWG.Add(1)
runner.Start()
defer runner.Stop()
// wait input to be running
runWG.Wait()

err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "unexpected error when calling CheckConfig")

// validate: configured an input, but do not run test or run
assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times")
assert.Equal(t, 0, countTest, "OnTest should not have been called")
assert.Equal(t, 1, countRun, "OnRun should be called only once")
idsMu.Lock()
assert.Equal(t, 1, ids[inputID])
idsMu.Unlock()
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
Expand Down Expand Up @@ -118,3 +186,44 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) {
assert.Error(t, err)
})
}

func TestGenerateCheckConfig(t *testing.T) {
tcs := []struct {
name string
cfg *conf.C
want *conf.C
wantErr error
assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool
}{
{
name: "id is present",
cfg: conf.MustNewConfigFrom("inputID: some-id"),
assertCfg: assert.NotEqual,
},
{
name: "absent id",
cfg: conf.MustNewConfigFrom(""),
wantErr: errors.New("failed to get 'inputID'"),
assertCfg: assert.Equal,
},
{
name: "invalid config",
cfg: nil,
wantErr: errors.New("failed to create new config"),
assertCfg: assert.Equal,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
f := factory{}

got, err := f.generateCheckConfig(tc.cfg)
if tc.wantErr != nil {
assert.ErrorContains(t, err, tc.wantErr.Error())
}

tc.assertCfg(t, tc.cfg, got)
})
}
}

0 comments on commit 32f04e3

Please sign in to comment.