Skip to content

Commit

Permalink
rename config to creator better reflect what it is
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Jun 29, 2021
1 parent 984e710 commit f01f797
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 14 deletions.
4 changes: 2 additions & 2 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/parser"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if _, err := parser.NewConfig(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
if _, err := parser.NewCreator(parser.CommonConfig{MaxBytes: c.Reader.MaxBytes, LineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
return fmt.Errorf("cannot parse parser configuration: %+v", err)
}

Expand Down
10 changes: 5 additions & 5 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/parser"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)
Expand All @@ -58,7 +58,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parserConfig *parser.Config
parsers *parser.Creator
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -94,7 +94,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
return nil, nil, fmt.Errorf("unknown encoding('%v')", config.Reader.Encoding)
}

parsers, err := parser.NewConfig(
parsers, err := parser.NewCreator(
parser.CommonConfig{
MaxBytes: config.Reader.MaxBytes,
LineTerminator: config.Reader.LineTerminator,
Expand All @@ -109,7 +109,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
readerConfig: config.Reader,
encodingFactory: encodingFactory,
closerConfig: config.Close,
parserConfig: parsers,
parsers: parsers,
}

return prospector, filestream, nil
Expand Down Expand Up @@ -232,7 +232,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

r = readfile.NewFilemeta(r, fs.newPath)

r = inp.parserConfig.Create(r)
r = inp.parsers.Create(r)

r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

Expand Down
8 changes: 4 additions & 4 deletions libbeat/parser/parser.go → libbeat/reader/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ type CommonConfig struct {
LineTerminator readfile.LineTerminator
}

type Config struct {
type Creator struct {
pCfg CommonConfig
parsers []common.ConfigNamespace
}

func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) {
func NewCreator(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Creator, error) {
for _, ns := range parsers {
name := ns.Name()
switch name {
Expand Down Expand Up @@ -80,14 +80,14 @@ func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, er
}
}

return &Config{
return &Creator{
pCfg: pCfg,
parsers: parsers,
}, nil

}

func (c *Config) Create(in reader.Reader) Parser {
func (c *Creator) Create(in reader.Reader) Parser {
p := in
for _, ns := range c.parsers {
name := ns.Name()
Expand Down
102 changes: 102 additions & 0 deletions libbeat/reader/parser/parser_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package parser

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

type inputParsersConfig struct {
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Parsers []common.ConfigNamespace `config:"parsers"`
}

func TestParsersExampleInline(t *testing.T) {
tests := map[string]struct {
lines string
parsers map[string]interface{}
expectedMessages []string
}{
"multiline docker logs parser": {
lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
`,
parsers: map[string]interface{}{
"max_bytes": 1024,
"line_terminator": "auto",
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"keys_under_root": true,
"message_key": "log",
},
},
map[string]interface{}{
"multiline": map[string]interface{}{
"match": "after",
"negate": true,
"pattern": "^\\[log\\]",
},
},
},
},
expectedMessages: []string{
"[log] The following are log messages\n",
"[log] This one is\n\n on multiple\n\n lines",
"[log] In total there should be 3 events\n",
},
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := common.MustNewConfigFrom(test.parsers)
var c inputParsersConfig
err := cfg.Unpack(&c)
require.NoError(t, err)

creator, err := NewCreator(
CommonConfig{
MaxBytes: c.MaxBytes,
LineTerminator: c.LineTerminator,
},
c.Parsers,
)
require.NoError(t, err)
p := creator.Create(testReader(test.lines))

i := 0
msg, err := p.Next()
for err == nil {
require.Equal(t, test.expectedMessages[i], string(msg.Content))
i++
msg, err = p.Next()
}
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestParsersConfigAndReading(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
if test.expectedError == "" {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestJSONParsersWithFields(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(msgReader(test.message))

Expand Down Expand Up @@ -353,7 +353,7 @@ func TestContainerParser(t *testing.T) {
var parsersConfig testParsersConfig
err := cfg.Unpack(&parsersConfig)
require.NoError(t, err)
c, err := NewConfig(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
c, err := NewCreator(CommonConfig{MaxBytes: 1024, LineTerminator: readfile.AutoLineTerminator}, parsersConfig.Parsers)
require.NoError(t, err)
p := c.Create(testReader(test.lines))

Expand Down

0 comments on commit f01f797

Please sign in to comment.