Skip to content

Commit

Permalink
New multiline mode in Filebeat: while_pattern (elastic#19662)
Browse files Browse the repository at this point in the history
## What does this PR do?

Added a new type for multiline reader. It combines the lines that satisfy the pattern.

## Why is it important?

The current pattern reader combines lines matching the pattern + line at the beginning or at the end. So you need to know where the multiline starts (or ends), and in cases where the logs are collected from different sources, there can be many different multilines (panic, exception python ...)

I would like to configure filebeat once according to the following rule: usually, the application writes logs in the json format, in cases of exceptions it writes multiline in some format (not json) and needs to detect such multilines.

## How to test this PR locally

example of Filebeat configuration:
```
filebeat.inputs:
  - type: log
    paths:
      - app.log
    reload.enabled: true
    multiline:
      type: 'while_pattern'
      pattern: '^{'
      negate: true
output.console:
  pretty: true
```

example app.log:
```
{}
{}
panic: some text
 some text 2
 some text 3
{}
{}
```

expect 5 events


## Use cases

- Aggregate stacktraces from many different programming languages, without the need to describe each stackrace
  • Loading branch information
tufitko authored Jul 31, 2020
1 parent 88c19e8 commit 43bbf51
Show file tree
Hide file tree
Showing 6 changed files with 290 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Adds Gsuite Drive support. {pull}19704[19704]
- Adds Gsuite Groups support. {pull}19725[19725]
- Move file metrics to dataset endpoint {pull}19977[19977]
- Add `while_pattern` type to multiline reader. {pull}19662[19662]

*Heartbeat*

Expand Down
8 changes: 4 additions & 4 deletions filebeat/docs/multiline.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ Also read <<yaml-tips>> and <<regexp-support>> to avoid common mistakes.

You can specify the following options in the +{beatname_lc}.inputs+ section of
the +{beatname_lc}.yml+ config file to control how {beatname_uc} deals with messages
that span multiple lines.
that span multiple lines.

The following example shows how to configure {beatname_uc} to handle a multiline message where the first line of the message begins with a bracket (`[`).

Expand All @@ -47,8 +47,8 @@ multiline.match: after
at org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction.checkBlock(TransportDeleteIndexAction.java:75)
-------------------------------------------------------------------------------------

*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other option
is `count` which lets you aggregate constant number of lines.
*`multiline.type`*:: Defines which aggregation method to use. The default is `pattern`. The other options
are `count` which lets you aggregate constant number of lines and `while_pattern` which aggregate lines by pattern without match option.

*`multiline.pattern`*:: Specifies the regular expression pattern to match. Note that the regexp patterns supported by {beatname_uc}
differ somewhat from the patterns supported by Logstash. See <<regexp-support>> for a list of supported regexp patterns.
Expand All @@ -71,7 +71,7 @@ the pattern.
+
NOTE: The `after` setting is equivalent to `previous` in https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html[Logstash], and `before` is equivalent to `next`.

*`multiline.flush_pattern`*:: Specifies a regular expression, in which the current multiline will be flushed from memory, ending the multiline-message.
*`multiline.flush_pattern`*:: Specifies a regular expression, in which the current multiline will be flushed from memory, ending the multiline-message. Work only with `pattern` type.

*`multiline.max_lines`*:: The maximum number of lines that can be combined into one event. If
the multiline message contains more than `max_lines`, any additional
Expand Down
10 changes: 7 additions & 3 deletions libbeat/reader/multiline/multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,14 @@ func New(
maxBytes int,
config *Config,
) (reader.Reader, error) {
if config.Type == patternMode {
switch config.Type {
case patternMode:
return newMultilinePatternReader(r, separator, maxBytes, config)
} else if config.Type == countMode {
case countMode:
return newMultilineCountReader(r, separator, maxBytes, config)
case whilePatternMode:
return newMultilineWhilePatternReader(r, separator, maxBytes, config)
default:
return nil, fmt.Errorf("unknown multiline type %d", config.Type)
}
return nil, fmt.Errorf("unknown multiline type %d", config.Type)
}
15 changes: 11 additions & 4 deletions libbeat/reader/multiline/multiline_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ type multilineType uint8
const (
patternMode multilineType = iota
countMode
whilePatternMode

patternStr = "pattern"
countStr = "count"
patternStr = "pattern"
countStr = "count"
whilePatternStr = "while_pattern"
)

var (
multilineTypes = map[string]multilineType{
patternStr: patternMode,
countStr: countMode,
patternStr: patternMode,
countStr: countMode,
whilePatternStr: whilePatternMode,
}
)

Expand Down Expand Up @@ -69,6 +72,10 @@ func (c *Config) Validate() error {
if c.LinesCount == 0 {
return fmt.Errorf("multiline.count_lines cannot be zero when count based is selected")
}
} else if c.Type == whilePatternMode {
if c.Pattern == nil {
return fmt.Errorf("multiline.pattern cannot be empty when pattern based matching is selected")
}
}
return nil
}
Expand Down
42 changes: 42 additions & 0 deletions libbeat/reader/multiline/multiline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,48 @@ func TestMultilineCount(t *testing.T) {
)
}

func TestMultilineWhilePattern(t *testing.T) {
pattern := match.MustCompile(`^{`)
testMultilineOK(t,
Config{
Type: whilePatternMode,
Pattern: &pattern,
Negate: false,
},
3,
"{line1\n{line1.1\n",
"not matched line\n",
"{line2\n{line2.1\n",
)
// use negated
testMultilineOK(t,
Config{
Type: whilePatternMode,
Pattern: &pattern,
Negate: true,
},
3,
"{line1\n",
"panic:\n~stacktrace~\n",
"{line2\n",
)
// truncated
maxLines := 2
testMultilineTruncated(t,
Config{
Type: whilePatternMode,
Pattern: &pattern,
MaxLines: &maxLines,
},
1,
true,
[]string{
"{line1\n{line1.1\n{line1.2\n"},
[]string{
"{line1\n{line1.1\n"},
)
}

func testMultilineOK(t *testing.T, cfg Config, events int, expected ...string) {
_, buf := createLineBuffer(expected...)
r := createMultilineTestReader(t, buf, cfg)
Expand Down
225 changes: 225 additions & 0 deletions libbeat/reader/multiline/while.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package multiline

import (
"github.com/elastic/beats/v7/libbeat/common/match"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
)

// MultiLine reader combining multiple line events into one multi-line event.
//
// Consecutive lines that satisfy the regular expression will be combined.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type whilePatternReader struct {
reader reader.Reader
matcher lineMatcherFunc
logger *logp.Logger
msgBuffer *messageBuffer
state func(*whilePatternReader) (reader.Message, error)
}

func newMultilineWhilePatternReader(
r reader.Reader,
separator string,
maxBytes int,
config *Config,
) (reader.Reader, error) {
maxLines := defaultMaxLines
if config.MaxLines != nil {
maxLines = *config.MaxLines
}

tout := defaultMultilineTimeout
if config.Timeout != nil {
tout = *config.Timeout
}

if tout > 0 {
r = readfile.NewTimeoutReader(r, sigMultilineTimeout, tout)
}

matcherFunc := lineMatcher(*config.Pattern)
if config.Negate {
matcherFunc = negatedLineMatcher(matcherFunc)
}

pr := &whilePatternReader{
reader: r,
matcher: matcherFunc,
msgBuffer: newMessageBuffer(maxBytes, maxLines, []byte(separator), config.SkipNewLine),
logger: logp.NewLogger("reader_multiline"),
state: (*whilePatternReader).readFirst,
}
return pr, nil
}

// Next returns next multi-line event.
func (pr *whilePatternReader) Next() (reader.Message, error) {
return pr.state(pr)
}

func (pr *whilePatternReader) readFirst() (reader.Message, error) {
for {
message, err := pr.reader.Next()
if err != nil {
// no lines buffered -> ignore timeout
if err == sigMultilineTimeout {
continue
}

pr.logger.Debug("Multiline event flushed because timeout reached.")

// pass error to caller (next layer) for handling
return message, err
}

if message.Bytes == 0 {
continue
}

// no match, return message
if !pr.matcher(message.Content) {
return message, nil
}

// Start new multiline event
pr.msgBuffer.startNewMessage(message)
pr.setState((*whilePatternReader).readNext)
return pr.readNext()
}
}

func (pr *whilePatternReader) readNext() (reader.Message, error) {
for {
message, err := pr.reader.Next()
if err != nil {
// handle multiline timeout signal
if err == sigMultilineTimeout {
// no lines buffered -> ignore timeout
if pr.msgBuffer.isEmpty() {
continue
}

pr.logger.Debug("Multiline event flushed because timeout reached.")

// return collected multiline event and
// empty buffer for new multiline event
msg := pr.msgBuffer.finalize()
pr.resetState()
return msg, nil
}

// handle error without any bytes returned from reader
if message.Bytes == 0 {
// no lines buffered -> return error
if pr.msgBuffer.isEmpty() {
return reader.Message{}, err
}

// lines buffered, return multiline and error on next read
return pr.collectMessageAfterError(err)
}

// handle error with some content being returned by reader and
// line matching multiline criteria or no multiline started yet
if pr.msgBuffer.isEmptyMessage() || pr.matcher(message.Content) {
pr.msgBuffer.addLine(message)

// return multiline and error on next read
return pr.collectMessageAfterError(err)
}

// no match, return current multiline and return current line on next
// call to readNext
msg := pr.msgBuffer.finalize()
pr.msgBuffer.load(message)
pr.setState((*whilePatternReader).notMatchedMessageLoad)
return msg, nil
}

// no match, return message if buffer is empty, otherwise return current
// multiline and save message to buffer
if !pr.matcher(message.Content) {
if pr.msgBuffer.isEmptyMessage() {
return message, nil
}
msg := pr.msgBuffer.finalize()
pr.msgBuffer.load(message)
pr.setState((*whilePatternReader).notMatchedMessageLoad)
return msg, nil
}

// add line to current multiline event
pr.msgBuffer.addLine(message)
}
}

func (pr *whilePatternReader) collectMessageAfterError(err error) (reader.Message, error) {
msg := pr.msgBuffer.finalize()
pr.msgBuffer.setErr(err)
pr.setState((*whilePatternReader).readFailed)
return msg, nil
}

// readFailed returns empty message and error and resets line reader
func (pr *whilePatternReader) readFailed() (reader.Message, error) {
err := pr.msgBuffer.err
pr.msgBuffer.setErr(nil)
pr.resetState()
return reader.Message{}, err
}

// notMatchedMessageLoad returns not matched message from buffer
func (pr *whilePatternReader) notMatchedMessageLoad() (reader.Message, error) {
msg := pr.msgBuffer.finalize()
pr.resetState()
return msg, nil
}

// resetState sets state of the reader to readFirst
func (pr *whilePatternReader) resetState() {
pr.setState((*whilePatternReader).readFirst)
}

// setState sets state to the given function
func (pr *whilePatternReader) setState(next func(pr *whilePatternReader) (reader.Message, error)) {
pr.state = next
}

type lineMatcherFunc func(content []byte) bool

func lineMatcher(pat match.Matcher) lineMatcherFunc {
return func(content []byte) bool {
return pat.Match(content)
}
}

func negatedLineMatcher(m lineMatcherFunc) lineMatcherFunc {
return func(content []byte) bool {
return !m(content)
}
}

0 comments on commit 43bbf51

Please sign in to comment.