Skip to content

Commit

Permalink
Dissect: Support trimming captured values and other configuration opt…
Browse files Browse the repository at this point in the history
…ions (elastic#19464) (elastic#19685)

This adds 4 new configuration parameters for the dissect processor:

- ignore_failure, so that it doesn't return an error when the tokenizer doesn't match the input. This is useful for chaining multiple processors together.
- overwrite_keys, so that existing keys are overwritten if they already exist.
- trim_values, to enable trimming blank space (left,(leading) right(trailing) or all) in captured values.
- trim_chars is the set of characters that are trimmed by the above option (default is the ascii space character).

The default values for these new flags have been chosen so that the default behavior of the processor is unchanged.

(cherry picked from commit a57e390)
  • Loading branch information
adriansr committed Jul 13, 2020
1 parent e0235ae commit a1e598c
Show file tree
Hide file tree
Showing 9 changed files with 639 additions and 9 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,9 @@ field. You can revert this change by configuring tags for the module and omittin
- Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817]
- Add support for multiple sets of hints on autodiscover {pull}18883[18883]
- Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181]
- Add the `ignore_failure` configuration option to the dissect processor. {pull}19464[19464]
- Add the `overwrite_keys` configuration option to the dissect processor. {pull}19464[19464]
- Add support to trim captured values in the dissect processor. {pull}19464[19464]

*Auditbeat*

Expand Down
43 changes: 40 additions & 3 deletions libbeat/processors/dissect/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,35 @@

package dissect

import (
"strings"

"github.com/pkg/errors"
)

type trimMode byte

const (
trimModeNone trimMode = iota
trimModeRight
trimModeLeft
trimModeAll = trimModeRight | trimModeLeft
)

type config struct {
Tokenizer *tokenizer `config:"tokenizer" validate:"required"`
Field string `config:"field"`
TargetPrefix string `config:"target_prefix"`
Tokenizer *tokenizer `config:"tokenizer" validate:"required"`
Field string `config:"field"`
TargetPrefix string `config:"target_prefix"`
IgnoreFailure bool `config:"ignore_failure"`
OverwriteKeys bool `config:"overwrite_keys"`
TrimValues trimMode `config:"trim_values"`
TrimChars string `config:"trim_chars"`
}

var defaultConfig = config{
Field: "message",
TargetPrefix: "dissect",
TrimChars: " ",
}

// tokenizer add validation at the unpack level for this specific field.
Expand All @@ -40,3 +60,20 @@ func (t *tokenizer) Unpack(v string) error {
*t = *d
return nil
}

// Unpack the trim mode from a string.
func (tm *trimMode) Unpack(v string) error {
switch strings.ToLower(v) {
case "", "none":
*tm = trimModeNone
case "left":
*tm = trimModeLeft
case "right":
*tm = trimModeRight
case "all", "both":
*tm = trimModeAll
default:
return errors.Errorf("unsupported value %s. Must be one of [none, left, right, all]", v)
}
return nil
}
36 changes: 36 additions & 0 deletions libbeat/processors/dissect/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestConfig(t *testing.T) {
if !assert.NoError(t, err) {
return
}
assert.Equal(t, trimModeNone, cfg.TrimValues)
})

t.Run("invalid", func(t *testing.T) {
Expand Down Expand Up @@ -100,4 +101,39 @@ func TestConfig(t *testing.T) {
return
}
})

t.Run("with wrong trim_mode", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "hello %{what}",
"field": "message",
"trim_values": "bananas",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.Error(t, err) {
return
}
})

t.Run("with valid trim_mode", func(t *testing.T) {
c, err := common.NewConfigFrom(map[string]interface{}{
"tokenizer": "hello %{what}",
"field": "message",
"trim_values": "all",
})
if !assert.NoError(t, err) {
return
}

cfg := config{}
err = c.Unpack(&cfg)
if !assert.NoError(t, err) {
return
}
assert.Equal(t, trimModeAll, cfg.TrimValues)
})
}
12 changes: 9 additions & 3 deletions libbeat/processors/dissect/dissect.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ type position struct {
// Dissector is a tokenizer based on the Dissect syntax as defined at:
// https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html
type Dissector struct {
raw string
parser *parser
raw string
parser *parser
trimmer trimmer
}

// Dissect takes the raw string and will use the defined tokenizer to return a map with the
Expand All @@ -57,7 +58,12 @@ func (d *Dissector) Dissect(s string) (Map, error) {
if len(positions) == 0 {
return nil, errParsingFailure
}

if d.trimmer != nil {
for idx, pos := range positions {
pos.start, pos.end = d.trimmer.Trim(s, pos.start, pos.end)
positions[idx] = pos
}
}
return d.resolve(s, positions), nil
}

Expand Down
24 changes: 23 additions & 1 deletion libbeat/processors/dissect/docs/dissect.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,29 @@ The `dissect` processor has the following configuration settings:
`target_prefix`:: (Optional) The name of the field where the values will be extracted. When an empty
string is defined, the processor will create the keys at the root of the event. Default is
`dissect`. When the target key already exists in the event, the processor won't replace it and log
an error; you need to either drop or rename the key before using dissect.
an error; you need to either drop or rename the key before using dissect, or
enable the `overwrite_keys` flag.

`ignore_failure`:: (Optional) Flag to control whether the processor returns an error if the
tokenizer fails to match the message field. If set to true, the processor will silently restore
the original event, allowing execution of subsequent processors (if any). If set to false
(default), the processor will log an error, preventing execution of other processors.

`overwrite_keys`:: (Optional) When set to true, the processor will overwrite
existing keys in the event. The default is false, which causes the processor
to fail when a key already exists.

`trim_values`:: (Optional) Enables the trimming of the extracted values. Useful
to remove leading and/or trailing spaces. Possible values are:
- `none`: (default) no trimming is performed.
- `left`: values are trimmed on the left (leading).
- `right`: values are trimmed on the right (trailing).
- `all`: values are trimmed for leading and trailing.

`trim_chars`:: (Optional) Set of characters to trim from values, when trimming
is enabled. The default is to trim the space character (`" "`). To trim multiple
characters, simply set it to a string containing all characters to trim. For example,
`trim_chars: " \t"` will trim spaces and/or tabs.

For tokenization to be successful, all keys must be found and extracted, if one of them cannot be
found an error will be logged and no modification is done on the original event.
Expand Down
14 changes: 12 additions & 2 deletions libbeat/processors/dissect/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ func NewProcessor(c *common.Config) (processors.Processor, error) {
if err != nil {
return nil, err
}
if config.TrimValues != trimModeNone {
config.Tokenizer.trimmer, err = newTrimmer(config.TrimChars,
config.TrimValues&trimModeLeft != 0,
config.TrimValues&trimModeRight != 0)
if err != nil {
return nil, err
}
}
p := &processor{config: config}

return p, nil
Expand All @@ -72,7 +80,9 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) {
); err != nil {
return event, errors.Wrap(err, "cannot add new flag the event")
}

if p.config.IgnoreFailure {
return event, nil
}
return event, err
}

Expand All @@ -94,7 +104,7 @@ func (p *processor) mapper(event *beat.Event, m common.MapStr) (*beat.Event, err
var prefixKey string
for k, v := range m {
prefixKey = prefix + k
if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound {
if _, err := event.GetValue(prefixKey); err == common.ErrKeyNotFound || p.config.OverwriteKeys {
event.PutValue(prefixKey, v)
} else {
event.Fields = copy
Expand Down
Loading

0 comments on commit a1e598c

Please sign in to comment.