Skip to content

Commit

Permalink
Add custom suffix to identifiers in filestream input when needed (#26669
Browse files Browse the repository at this point in the history
)

## What does this PR do?

This PR lets you inject suffixes to the state identifiers based on the configuration of the filestream input. For starters, it is needed by the container parser so tracking of different streams (stdout/stderr) can be done separately.

## Why is it important?

Without this, the container input cannot be substituted with filestream input with a container parser.
  • Loading branch information
kvch authored Jul 15, 2021
1 parent 90654db commit 2876cfb
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 6 deletions.
40 changes: 37 additions & 3 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,13 @@ func (f fileSource) Name() string {
}

// newFileIdentifier creates a new state identifier for a log input.
func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) {
func newFileIdentifier(ns *common.ConfigNamespace, suffix string) (fileIdentifier, error) {
if ns == nil {
return newINodeDeviceIdentifier(nil)
i, err := newINodeDeviceIdentifier(nil)
if err != nil {
return nil, err
}
return withSuffix(i, suffix), nil
}

identifierType := ns.Name()
Expand All @@ -87,7 +91,11 @@ func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) {
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType)
}

return f(ns.Config())
i, err := f(ns.Config())
if err != nil {
return nil, err
}
return withSuffix(i, suffix), nil
}

type inodeDeviceIdentifier struct {
Expand Down Expand Up @@ -159,6 +167,32 @@ func (p *pathIdentifier) Supports(f identifierFeature) bool {
return false
}

type suffixIdentifier struct {
i fileIdentifier
suffix string
}

func withSuffix(inner fileIdentifier, suffix string) fileIdentifier {
if suffix == "" {
return inner
}
return &suffixIdentifier{i: inner, suffix: suffix}
}

func (s *suffixIdentifier) GetSource(e loginp.FSEvent) fileSource {
fs := s.i.GetSource(e)
fs.name += "-" + s.suffix
return fs
}

func (s *suffixIdentifier) Name() string {
return s.i.Name()
}

func (s *suffixIdentifier) Supports(f identifierFeature) bool {
return s.i.Supports(f)
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

Expand Down
28 changes: 26 additions & 2 deletions filebeat/input/filestream/identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type testFileIdentifierConfig struct {

func TestFileIdentifier(t *testing.T) {
t.Run("default file identifier", func(t *testing.T) {
identifier, err := newFileIdentifier(nil)
identifier, err := newFileIdentifier(nil, "")
require.NoError(t, err)
assert.Equal(t, DefaultIdentifierName, identifier.Name())

Expand All @@ -59,6 +59,30 @@ func TestFileIdentifier(t *testing.T) {
assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String(), src.Name())
})

t.Run("default file identifier with suffix", func(t *testing.T) {
identifier, err := newFileIdentifier(nil, "my-suffix")
require.NoError(t, err)
assert.Equal(t, DefaultIdentifierName, identifier.Name())

tmpFile, err := ioutil.TempFile("", "test_file_identifier_native")
if err != nil {
t.Fatalf("cannot create temporary file for test: %v", err)
}
defer os.Remove(tmpFile.Name())

fi, err := tmpFile.Stat()
if err != nil {
t.Fatalf("cannot stat temporary file for test: %v", err)
}

src := identifier.GetSource(loginp.FSEvent{
NewPath: tmpFile.Name(),
Info: fi,
})

assert.Equal(t, identifier.Name()+"::"+file.GetOSState(fi).String()+"-my-suffix", src.Name())
})

t.Run("path identifier", func(t *testing.T) {
c := common.MustNewConfigFrom(map[string]interface{}{
"identifier": map[string]interface{}{
Expand All @@ -69,7 +93,7 @@ func TestFileIdentifier(t *testing.T) {
err := c.Unpack(&cfg)
require.NoError(t, err)

identifier, err := newFileIdentifier(cfg.Identifier)
identifier, err := newFileIdentifier(cfg.Identifier, "")
require.NoError(t, err)
assert.Equal(t, pathName, identifier.Name())

Expand Down
6 changes: 5 additions & 1 deletion filebeat/input/filestream/prospector_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func newProspector(config config) (loginp.Prospector, error) {
return nil, fmt.Errorf("error while creating filewatcher %v", err)
}

identifier, err := newFileIdentifier(config.FileIdentity)
identifier, err := newFileIdentifier(config.FileIdentity, getIdentifierSuffix(config))
if err != nil {
return nil, fmt.Errorf("error while creating file identifier: %v", err)
}
Expand Down Expand Up @@ -104,3 +104,7 @@ func newProspector(config config) (loginp.Prospector, error) {
}
return nil, fmt.Errorf("no such rotation method: %s", rotationMethod)
}

func getIdentifierSuffix(config config) string {
return config.Reader.Parsers.Suffix
}
10 changes: 10 additions & 0 deletions libbeat/reader/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type CommonConfig struct {
}

type Config struct {
Suffix string

pCfg CommonConfig
parsers []common.ConfigNamespace
}
Expand Down Expand Up @@ -79,6 +81,7 @@ func (c *Config) Unpack(cc *common.Config) error {
}

func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) {
var suffix string
for _, ns := range parsers {
name := ns.Name()
switch name {
Expand All @@ -103,12 +106,19 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er
if err != nil {
return nil, fmt.Errorf("error while parsing container parser config: %+v", err)
}
if config.Stream != readjson.All {
if suffix != "" {
return nil, fmt.Errorf("only one stream selection is allowed")
}
suffix = config.Stream.String()
}
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
}

return &Config{
Suffix: suffix,
pCfg: pCfg,
parsers: parsers,
}, nil
Expand Down
69 changes: 69 additions & 0 deletions libbeat/reader/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,75 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)

func TestParsersConfigSuffix(t *testing.T) {
tests := map[string]struct {
parsers map[string]interface{}
expectedSuffix string
expectedError string
}{
"parsers with no suffix config": {
parsers: map[string]interface{}{
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"stream": "all",
},
},
},
},
},
"parsers with correct suffix config": {
parsers: map[string]interface{}{
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"stream": "stdout",
},
},
},
},
expectedSuffix: "stdout",
},
"parsers with multiple suffix config": {
parsers: map[string]interface{}{
"parsers": []map[string]interface{}{
map[string]interface{}{
"container": map[string]interface{}{
"stream": "stdout",
},
},
map[string]interface{}{
"container": map[string]interface{}{
"stream": "stderr",
},
},
},
},
expectedError: "only one stream selection is allowed",
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := common.MustNewConfigFrom(test.parsers)
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)

if test.expectedError == "" {
require.NoError(t, err)
} else {
require.Contains(t, err.Error(), test.expectedError)
return
}
require.Equal(t, c.Suffix, test.expectedSuffix)
})
}

}

func TestParsersConfigAndReading(t *testing.T) {
tests := map[string]struct {
lines string
Expand Down

0 comments on commit 2876cfb

Please sign in to comment.