Skip to content

Commit

Permalink
Move parsers outside of filestream input so others can use them as we…
Browse files Browse the repository at this point in the history
…ll (#26541)

## What does this PR do?

The object has its own `Unpack` function, so it is enough for you to add it as an attribute to your configuration.

```golang
config parser.Config
```

Then create the parser based on the configuration

```golang
p = inp.parserConfig.Create(r)
```

Example configuration accepted by the code above

```yaml
parsers:
  - multiline:
      type: count
      lines_count: 3
```
  • Loading branch information
kvch authored Jun 29, 2021
1 parent c45aba5 commit 2a56cd7
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 67 deletions.
8 changes: 2 additions & 6 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ type readerConfig struct {
MaxBytes int `config:"message_max_bytes" validate:"min=0,nonzero"`
Tail bool `config:"seek_to_tail"`

Parsers []common.ConfigNamespace `config:"parsers"`
Parsers parser.Config `config:",inline"`
}

type backoffConfig struct {
Expand Down Expand Up @@ -127,7 +128,6 @@ func defaultReaderConfig() readerConfig {
LineTerminator: readfile.AutoLineTerminator,
MaxBytes: 10 * humanize.MiByte,
Tail: false,
Parsers: make([]common.ConfigNamespace, 0),
}
}

Expand All @@ -136,9 +136,5 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if err := validateParserConfig(parserConfig{maxBytes: c.Reader.MaxBytes, lineTerminator: c.Reader.LineTerminator}, c.Reader.Parsers); err != nil {
return fmt.Errorf("cannot parse parser configuration: %+v", err)
}

return nil
}
9 changes: 4 additions & 5 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/debug"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
)
Expand All @@ -57,7 +58,7 @@ type filestream struct {
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
closerConfig closerConfig
parserConfig []common.ConfigNamespace
parsers parser.Config
}

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down Expand Up @@ -97,6 +98,7 @@ func configure(cfg *common.Config) (loginp.Prospector, loginp.Harvester, error)
readerConfig: config.Reader,
encodingFactory: encodingFactory,
closerConfig: config.Close,
parsers: config.Reader.Parsers,
}

return prospector, filestream, nil
Expand Down Expand Up @@ -219,10 +221,7 @@ func (inp *filestream) open(log *logp.Logger, canceler input.Canceler, fs fileSo

r = readfile.NewFilemeta(r, fs.newPath)

r, err = newParsers(r, parserConfig{maxBytes: inp.readerConfig.MaxBytes, lineTerminator: inp.readerConfig.LineTerminator}, inp.readerConfig.Parsers)
if err != nil {
return nil, err
}
r = inp.parsers.Create(r)

r = readfile.NewLimitReader(r, inp.readerConfig.MaxBytes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
// specific language governing permissions and limitations
// under the License.

package filestream
package parser

import (
"errors"
"fmt"
"io"

"github.com/dustin/go-humanize"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/multiline"
Expand All @@ -35,20 +37,48 @@ var (

// parser transforms or translates the Content attribute of a Message.
// They are able to aggregate two or more Messages into a single one.
type parser interface {
type Parser interface {
io.Closer
Next() (reader.Message, error)
}

type parserConfig struct {
maxBytes int
lineTerminator readfile.LineTerminator
type CommonConfig struct {
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
}

func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace) (parser, error) {
p := in
type Config struct {
pCfg CommonConfig
parsers []common.ConfigNamespace
}

func (c *Config) Unpack(cc *common.Config) error {
tmp := struct {
Common CommonConfig `config:",inline"`
Parsers []common.ConfigNamespace `config:"parsers"`
}{
CommonConfig{
MaxBytes: 10 * humanize.MiByte,
LineTerminator: readfile.AutoLineTerminator,
},
nil,
}
err := cc.Unpack(&tmp)
if err != nil {
return err
}

for _, ns := range c {
newC, err := NewConfig(tmp.Common, tmp.Parsers)
if err != nil {
return err
}
*c = *newC

return nil
}

func NewConfig(pCfg CommonConfig, parsers []common.ConfigNamespace) (*Config, error) {
for _, ns := range parsers {
name := ns.Name()
switch name {
case "multiline":
Expand All @@ -58,63 +88,68 @@ func newParsers(in reader.Reader, pCfg parserConfig, c []common.ConfigNamespace)
if err != nil {
return nil, fmt.Errorf("error while parsing multiline parser config: %+v", err)
}
p, err = multiline.New(p, "\n", pCfg.maxBytes, &config)
if err != nil {
return nil, fmt.Errorf("error while creating multiline parser: %+v", err)
}
case "ndjson":
var config readjson.ParserConfig
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing ndjson parser config: %+v", err)
}
p = readjson.NewJSONParser(p, &config)
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while parsing container parser config: %+v", err)
}
p = readjson.NewContainerParser(p, &config)
default:
return nil, fmt.Errorf("%s: %s", ErrNoSuchParser, name)
}
}

return p, nil
return &Config{
pCfg: pCfg,
parsers: parsers,
}, nil

}

func validateParserConfig(pCfg parserConfig, c []common.ConfigNamespace) error {
for _, ns := range c {
func (c *Config) Create(in reader.Reader) Parser {
p := in
for _, ns := range c.parsers {
name := ns.Name()
switch name {
case "multiline":
var config multiline.Config
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing multiline parser config: %+v", err)
return p
}
p, err = multiline.New(p, "\n", c.pCfg.MaxBytes, &config)
if err != nil {
return p
}
case "ndjson":
var config readjson.Config
var config readjson.ParserConfig
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing ndjson parser config: %+v", err)
return p
}
p = readjson.NewJSONParser(p, &config)
case "container":
config := readjson.DefaultContainerConfig()
cfg := ns.Config()
err := cfg.Unpack(&config)
if err != nil {
return fmt.Errorf("error while parsing container parser config: %+v", err)
return p
}
p = readjson.NewContainerParser(p, &config)
default:
return fmt.Errorf("%s: %s", ErrNoSuchParser, name)
return p
}
}

return nil
return p
}
94 changes: 94 additions & 0 deletions libbeat/reader/parser/parser_example_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package parser

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

type inputParsersConfig struct {
MaxBytes int `config:"max_bytes"`
LineTerminator readfile.LineTerminator `config:"line_terminator"`
Parsers Config `config:",inline"`
}

func TestParsersExampleInline(t *testing.T) {
tests := map[string]struct {
lines string
parsers map[string]interface{}
expectedMessages []string
}{
"multiline docker logs parser": {
lines: `{"log":"[log] The following are log messages\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] This one is\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" on multiple\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":" lines","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
{"log":"[log] In total there should be 3 events\n","stream":"stdout","time":"2016-03-02T22:58:51.338462311Z"}
`,
parsers: map[string]interface{}{
"max_bytes": 1024,
"line_terminator": "auto",
"parsers": []map[string]interface{}{
map[string]interface{}{
"ndjson": map[string]interface{}{
"keys_under_root": true,
"message_key": "log",
},
},
map[string]interface{}{
"multiline": map[string]interface{}{
"match": "after",
"negate": true,
"pattern": "^\\[log\\]",
},
},
},
},
expectedMessages: []string{
"[log] The following are log messages\n",
"[log] This one is\n\n on multiple\n\n lines",
"[log] In total there should be 3 events\n",
},
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
cfg := common.MustNewConfigFrom(test.parsers)
var c inputParsersConfig
err := cfg.Unpack(&c)
require.NoError(t, err)

p := c.Parsers.Create(testReader(test.lines))

i := 0
msg, err := p.Next()
for err == nil {
require.Equal(t, test.expectedMessages[i], string(msg.Content))
i++
msg, err = p.Next()
}
})
}
}
Loading

0 comments on commit 2a56cd7

Please sign in to comment.