Skip to content

Commit

Permalink
fix unpack
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Jun 29, 2021
1 parent f01f797 commit 8394f29
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 40 deletions.
7 changes: 1 addition & 6 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type readerConfig struct {
MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"`
Tail bool `config:"seek_to_tail"`

Parsers []common.ConfigNamespace `config:"parsers"`
Parsers parser.Config `config:",inline"`
}

type backoffConfig struct {
Expand Down Expand Up @@ -128,7 +128,6 @@ func defaultReaderConfig() readerConfig {
LineTerminator: readfile.AutoLineTerminator,
MaxBytes: 10 * humanize.MiByte,
Tail: false,
Parsers: make([]common.ConfigNamespace, 0),
}
}

Expand All @@ -137,9 +136,5 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if _, err := parser.NewCreator(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
return fmt.Errorf("cannot parse parser configuration: %+v", err)
}

return nil
}
15 changes: 2 additions & 13 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parsers *parser.Creator
parsers parser.Config
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -94,22 +94,11 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

parsers, err := parser.NewCreator(
parser.CommonConfig{
MaxBytes: config.Reader.MaxBytes,
LineTerminator: config.Reader.LineTerminator,
},
config.Reader.Parsers,
)
if err != nil {
return nil, nil, fmt.Errorf("cannot create parsers: %v", err)
}

filestream := &filestream{
readerConfig: config.Reader,
encodingFactory: encodingFactory,
closerConfig: config.Close,
parsers: parsers,
parsers: config.Reader.Parsers,
}

return prospector, filestream, nil
Expand Down
39 changes: 33 additions & 6 deletions libbeat/reader/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"fmt"
"io"

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
Expand All @@ -41,16 +43,41 @@ type Parser interface {
}

type CommonConfig struct {
MaxBytes int
LineTerminator readfile.LineTerminator
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
}

type Creator struct {
type Config struct {
pCfg CommonConfig
parsers []common.ConfigNamespace
}

func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator, error) {
func (c *Config) Unpack(cc *common.Config) error {
tmp := struct {
Common CommonConfig `config:",inline"`
Parsers []common.ConfigNamespace `config:"parsers"`
}{
CommonConfig{
MaxBytes: 10 * humanize.MiByte,
LineTerminator: readfile.AutoLineTerminator,
},
nil,
}
err := cc.Unpack(&tmp)
if err != nil {
return err
}

newC, err := NewConfig(tmp.Common, tmp.Parsers)
if err != nil {
return err
}
*c = *newC

return nil
}

func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) {
for _, ns := range parsers {
name := ns.Name()
switch name {
Expand Down Expand Up @@ -80,14 +107,14 @@ func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator,
}
}

return &Creator{
return &Config{
pCfg: pCfg,
parsers: parsers,
}, nil

}

func (c *Creator) Create(in reader.Reader) Parser {
func (c *Config) Create(in reader.Reader) Parser {
p := in
for _, ns := range c.parsers {
name := ns.Name()
Expand Down
16 changes: 4 additions & 12 deletions libbeat/reader/parser/parser_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ import (
)

type inputParsersConfig struct {
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Parsers []common.ConfigNamespace `config:"parsers"`
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Parsers Config `config:",inline"`
}

func TestParsersExampleInline(t *testing.T) {
Expand Down Expand Up @@ -80,15 +80,7 @@ func TestParsersExampleInline(t *testing.T) {
err := cfg.Unpack(&c)
require.NoError(t, err)

creator, err := NewCreator(
CommonConfig{
MaxBytes: c.MaxBytes,
LineTerminator: c.LineTerminator,
},
c.Parsers,
)
require.NoError(t, err)
p := creator.Create(testReader(test.lines))
p := c.Parsers.Create(testReader(test.lines))

i := 0
msg, err := p.Next()
Expand Down
6 changes: 3 additions & 3 deletions libbeat/reader/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestParsersConfigAndReading(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
if test.expectedError == "" {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestJSONParsersWithFields(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(msgReader(test.message))

Expand Down Expand Up @@ -353,7 +353,7 @@ func TestContainerParser(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(testReader(test.lines))

Expand Down

0 comments on commit 8394f29

Please sign in to comment.