From b045e56221f502281cd1c1d7e2b58b546ed011cc Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Tue, 12 May 2020 11:29:55 -0700 Subject: [PATCH 01/21] Dissect processor - Convert strings to data types specified in tokenizer --- libbeat/processors/dissect/config_test.go | 48 ++++++++++++++ libbeat/processors/dissect/dissect.go | 2 +- libbeat/processors/dissect/dissect_test.go | 77 ++++++++++++++++++++++ libbeat/processors/dissect/field.go | 55 ++++++++++++++-- 4 files changed, 177 insertions(+), 5 deletions(-) diff --git a/libbeat/processors/dissect/config_test.go b/libbeat/processors/dissect/config_test.go index 09d4da5a180..517cfc6fd3c 100644 --- a/libbeat/processors/dissect/config_test.go +++ b/libbeat/processors/dissect/config_test.go @@ -101,3 +101,51 @@ func TestConfig(t *testing.T) { } }) } + +func TestConfigForDataType(t *testing.T) { + t.Run("valid data type", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{value1|integer} %{value2|float} %{value3|boolean} %{value4|long} %{value5|double}", + "field": "message", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.NoError(t, err) { + return + } + }) + t.Run("invalid data type", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{value1|int} %{value2|short} %{value3|char} %{value4|void} %{value5|unsigned}", + "field": "message", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.NoError(t, err) { + return + } + }) + t.Run("missing data type", func(t *testing.T) { + c, err := common.NewConfigFrom(map[string]interface{}{ + "tokenizer": "%{value1|} %{value2|} %{value3|} %{value4|} %{value5|}", + "field": "message", + }) + if !assert.NoError(t, err) { + return + } + + cfg := config{} + err = c.Unpack(&cfg) + if !assert.NoError(t, err) { + return + } + }) +} diff --git a/libbeat/processors/dissect/dissect.go b/libbeat/processors/dissect/dissect.go index c9093c476f8..f062fddde5c 100644 --- a/libbeat/processors/dissect/dissect.go +++ b/libbeat/processors/dissect/dissect.go @@ -20,7 +20,7 @@ package dissect import "fmt" // Map represents the keys and their values extracted with the defined tokenizer. -type Map = map[string]string +type Map = map[string]interface{} // positions represents the start and end position of the keys found in the string. type positions []position diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index c97e020dd16..bb57aa7dda4 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -33,6 +33,83 @@ func TestNoToken(t *testing.T) { assert.Equal(t, errInvalidTokenizer, err) } +func TestDissectConversion(t *testing.T) { + tests := []struct { + Name string + Tok string + Msg string + Expected map[string]interface{} + Fail bool + }{ + { + Name: "Convert 1 value", + Tok: "id=%{id|integer} msg=\"%{message}\"", + Msg: "id=7736 msg=\"Single value OK\"}", + Expected: map[string]interface{}{ + "id": int32(7736), + "message": "Single value OK", + }, + Fail: false, + }, + { + Name: "Convert multiple values values", + Tok: "id=%{id|integer} status=%{status|integer} duration=%{duration|float} uptime=%{uptime|long} success=%{success|boolean} msg=\"%{message}\"", + Msg: "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", + Expected: map[string]interface{}{ + "id": int32(7736), + "status": int32(202), + "duration": float32(0.975), + "uptime": int64(1588975628), + "success": true, + "message": "Request accepted", + }, + Fail: false, + }, + { + Name: "Missing data type, drop fields", + Tok: "id=%{id|} status=%{status|} msg=\"%{message}\"", + Msg: "id=1857 status=404 msg=\"File not found\"}", + Expected: map[string]interface{}{ + "id": "1857", + "message": "File not found", + "status": "404", + }, + Fail: false, + }, + { + Name: "Invalid data type, drop fields", + Tok: "id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", + Msg: "id=1945 status=500 msg=\"Internal server error\"}", + Expected: map[string]interface{}{ + "message": "Internal server error", + }, + Fail: false, + }, + } + + for _, test := range tests { + t.Run(test.Name, func(t *testing.T) { + d, err := New(test.Tok) + if !assert.NoError(t, err) { + return + } + + if test.Fail { + _, err := d.Dissect(test.Msg) + assert.Error(t, err) + return + } + + r, err := d.Dissect(test.Msg) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, test.Expected, r) + }) + } +} + func TestEmptyString(t *testing.T) { d, err := New("%{hello}") _, err = d.Dissect("") diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index eae6ba7cdf7..8d487ba40f2 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -19,6 +19,9 @@ package dissect import ( "fmt" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/processors/convert" "strconv" "strings" ) @@ -28,6 +31,7 @@ type field interface { IsGreedy() bool Ordinal() int Key() string + DataType() string ID() int Apply(b string, m Map) String() string @@ -39,6 +43,7 @@ type baseField struct { key string ordinal int greedy bool + dataType string } func (f baseField) IsGreedy() bool { @@ -56,6 +61,9 @@ func (f baseField) Ordinal() int { func (f baseField) Key() string { return f.key } +func (f baseField) DataType() string { + return f.dataType +} func (f baseField) ID() int { return f.id @@ -66,7 +74,7 @@ func (f baseField) IsSaveable() bool { } func (f baseField) String() string { - return fmt.Sprintf("field: %s, ordinal: %d, greedy: %v", f.key, f.ordinal, f.IsGreedy()) + return fmt.Sprintf("field: %s, ordinal: %d, greedy: %v, dataType: %s", f.key, f.ordinal, f.IsGreedy(), f.DataType()) } // normalField is a simple key reference like this: `%{key}` @@ -80,7 +88,32 @@ type normalField struct { } func (f normalField) Apply(b string, m Map) { - m[f.Key()] = b + if len(f.dataType) == 0 { + m[f.Key()] = b + } else { + config:= common.MapStr{ + "fields": []common.MapStr{ + {"from": f.Key(), "to": f.Key(), "type": f.DataType()}, + }, + } + processor, err := convert.New(common.MustNewConfigFrom(config)) + if err != nil { + fmt.Errorf("%s\n", err) + } else { + input := beat.Event{ + Fields: common.MapStr{ + f.Key(): b, + }, + } + result, err := processor.Run(&input) + if err == nil { + v, err := result.GetValue(f.Key()) + if err == nil { + m[f.Key()] = v + } + } + } + } } // skipField is an skip field without a name like this: `%{}`, this is often used to @@ -150,7 +183,7 @@ type indirectField struct { func (f indirectField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - m[v] = b + m[v.(string)] = b return } } @@ -175,7 +208,7 @@ type appendField struct { func (f appendField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - m[f.Key()] = v + f.JoinString() + b + m[f.Key()] = v.(string) + f.JoinString() + b return } m[f.Key()] = b @@ -261,6 +294,20 @@ func newIndirectField(id int, key string) indirectField { } func newNormalField(id int, key string, ordinal int, greedy bool) normalField { + parts := strings.Split(key, "|") + if len(parts) > 1 { + return normalField{ + baseField{ + id: id, + key: parts[0], + ordinal: ordinal, + greedy: greedy, + dataType: parts[1], + }, + } + } else { + key = parts[0] + } return normalField{ baseField{ id: id, From 2fac969861975b25419f9f2d1eb93cf053e34d17 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Thu, 14 May 2020 14:29:06 -0700 Subject: [PATCH 02/21] Add benchmark test cases --- libbeat/processors/dissect/dissect_test.go | 48 ++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index bb57aa7dda4..35eb77c1f88 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -256,3 +256,51 @@ func BenchmarkDissect(b *testing.B) { } }) } + +func dissectConversion(tok, msg string, b *testing.B) { + d, err := New(tok) + assert.NoError(b, err) + + _, err = d.Dissect(msg) + assert.NoError(b, err) +} + +func benchmarkConversion(tok, msg string, b *testing.B) { + for n := 0; n < b.N; n++ { + dissectConversion(tok, msg, b) + } +} +func BenchmarkDissectNoConversionOneValue(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) +} +func BenchmarkDissectWithConversionOneValue(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id|integer} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) +} +func BenchmarkDissectNoConversionMultipleValues(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id} status=%{status} duration=%{duration} uptime=%{uptime} success=%{success} msg=\"%{message}\"", + "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) +} +func BenchmarkDissectWithConversionMultipleValues(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id|integer} status=%{status|integer} duration=%{duration|float} uptime=%{uptime|long} success=%{success|boolean} msg=\"%{message}\"", + "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) +} +func BenchmarkConversionNoConversionMissingType(b *testing.B) { + benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", + "id=1857 status=404 msg=\"File not found\"}", b) +} +func BenchmarkConversionWithConversionMissingType(b *testing.B) { + benchmarkConversion("id=%{id|} status=%{status|} msg=\"%{message}\"", + "id=1857 status=404 msg=\"File not found\"}", b) +} +func BenchmarkConversionNoConversionInvalidType(b *testing.B) { + benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", + "id=1945 status=500 msg=\"Internal server error\"}", b) +} +func BenchmarkConversionWithConversionInvalidType(b *testing.B) { + benchmarkConversion("id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", + "id=1945 status=500 msg=\"Internal server error\"}", b) +} From 2e6230cbdd0928b6eac6710ef11c485618a74fce Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Sat, 16 May 2020 08:18:27 -0700 Subject: [PATCH 03/21] Update benchmark tests --- libbeat/processors/dissect/dissect_test.go | 20 ++-- libbeat/processors/dissect/field.go | 112 +++++++++++++++------ 2 files changed, 92 insertions(+), 40 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 35eb77c1f88..8e24eaaa12d 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -270,11 +270,11 @@ func benchmarkConversion(tok, msg string, b *testing.B) { dissectConversion(tok, msg, b) } } -func BenchmarkDissectNoConversionOneValue(b *testing.B) { +func BenchmarkDissectNoConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) } -func BenchmarkDissectWithConversionOneValue(b *testing.B) { +func BenchmarkDissectWithConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id|integer} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) } @@ -288,19 +288,23 @@ func BenchmarkDissectWithConversionMultipleValues(b *testing.B) { benchmarkConversion("id=%{id|integer} status=%{status|integer} duration=%{duration|float} uptime=%{uptime|long} success=%{success|boolean} msg=\"%{message}\"", "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } -func BenchmarkConversionNoConversionMissingType(b *testing.B) { +func BenchmarkDissectNoConversionMissingType(b *testing.B) { + b.ReportAllocs() benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", - "id=1857 status=404 msg=\"File not found\"}", b) + "id=1857 status=404 msg=\"File not found\"}", b) } -func BenchmarkConversionWithConversionMissingType(b *testing.B) { +func BenchmarkDissectWithConversionMissingType(b *testing.B) { + b.ReportAllocs() benchmarkConversion("id=%{id|} status=%{status|} msg=\"%{message}\"", "id=1857 status=404 msg=\"File not found\"}", b) } -func BenchmarkConversionNoConversionInvalidType(b *testing.B) { +func BenchmarkDissectNoConversionInvalidType(b *testing.B) { + b.ReportAllocs() benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", - "id=1945 status=500 msg=\"Internal server error\"}", b) + "id=1945 status=500 msg=\"Internal server error\"}", b) } -func BenchmarkConversionWithConversionInvalidType(b *testing.B) { +func BenchmarkDissectWithConversionInvalidType(b *testing.B) { + b.ReportAllocs() benchmarkConversion("id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", "id=1945 status=500 msg=\"Internal server error\"}", b) } diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index 73b7a077b58..9009dfeaed2 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -19,11 +19,11 @@ package dissect import ( "fmt" - "github.com/elastic/beats/v7/libbeat/beat" - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/processors/convert" + "net" "strconv" "strings" + + "github.com/pkg/errors" ) type field interface { @@ -41,14 +41,39 @@ type field interface { } type baseField struct { - id int - key string - ordinal int - length int - greedy bool + id int + key string + ordinal int + length int + greedy bool dataType string } +type dataType uint8 + +// List of dataTypes. +const ( + unset dataType = iota + Integer + Long + Float + Double + String + Boolean + IP +) + +var dataTypeNames = map[string]dataType{ + "[unset]": unset, + "integer": Integer, + "long": Long, + "float": Float, + "double": Double, + "string": String, + "boolean": Boolean, + "ip": IP, +} + func (f baseField) IsGreedy() bool { return f.greedy } @@ -98,30 +123,53 @@ type normalField struct { baseField } +// strToInt is a helper to interpret a string as either base 10 or base 16. +func strToInt(s string, bitSize int) (int64, error) { + base := 10 + if strings.HasPrefix(s, "0x") || strings.HasPrefix(s, "0X") { + // strconv.ParseInt will accept the '0x' or '0X` prefix only when base is 0. + base = 0 + } + return strconv.ParseInt(s, base, bitSize) +} + +func transformType(typ dataType, value string) (interface{}, error) { + switch typ { + case String: + return fmt.Sprintf("%v", value), nil + case Long: + return strToInt(value, 64) + case Integer: + i, err := strToInt(value, 32) + return int32(i), err + case Float: + f, err := strconv.ParseFloat(value, 32) + return float32(f), err + case Double: + d, err := strconv.ParseFloat(value, 64) + return float64(d), err + case Boolean: + return strconv.ParseBool(value) + case IP: + if net.ParseIP(value) != nil { + return value, nil + } + return "", errors.New("value is not a valid IP address") + default: + return value, nil + } +} + func (f normalField) Apply(b string, m Map) { if len(f.dataType) == 0 { m[f.Key()] = b } else { - config:= common.MapStr{ - "fields": []common.MapStr{ - {"from": f.Key(), "to": f.Key(), "type": f.DataType()}, - }, - } - processor, err := convert.New(common.MustNewConfigFrom(config)) - if err != nil { - fmt.Errorf("%s\n", err) - } else { - input := beat.Event{ - Fields: common.MapStr{ - f.Key(): b, - }, - } - result, err := processor.Run(&input) + if dt, ok := dataTypeNames[f.dataType]; ok { + value, err := transformType(dt, b) if err == nil { - v, err := result.GetValue(f.Key()) - if err == nil { - m[f.Key()] = v - } + m[f.Key()] = value + } else { + fmt.Errorf("%s\n", err) } } } @@ -310,11 +358,11 @@ func newNormalField(id int, key string, ordinal int, length int, greedy bool) no if len(parts) > 1 { return normalField{ baseField{ - id: id, - key: parts[0], - ordinal: ordinal, - length: length, - greedy: greedy, + id: id, + key: parts[0], + ordinal: ordinal, + length: length, + greedy: greedy, dataType: parts[1], }, } From 0260ec790aeeb5d70e8b1540d8ee647f68aeebde Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Sat, 16 May 2020 11:02:07 -0700 Subject: [PATCH 04/21] Updated benchmark tests to one value and multiple value conversions --- libbeat/processors/dissect/dissect_test.go | 32 +++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 8e24eaaa12d..9ab678a30d8 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -270,6 +270,7 @@ func benchmarkConversion(tok, msg string, b *testing.B) { dissectConversion(tok, msg, b) } } + func BenchmarkDissectNoConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) @@ -278,6 +279,15 @@ func BenchmarkDissectWithConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id|integer} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) } +func BenchmarkDissectWithConversionOneValueMissingType(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id|} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) +} +func BenchmarkDissectWithConversionOneValueInvalidType(b *testing.B) { + b.ReportAllocs() + benchmarkConversion("id=%{id|xyz} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) +} + func BenchmarkDissectNoConversionMultipleValues(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id} status=%{status} duration=%{duration} uptime=%{uptime} success=%{success} msg=\"%{message}\"", @@ -288,23 +298,13 @@ func BenchmarkDissectWithConversionMultipleValues(b *testing.B) { benchmarkConversion("id=%{id|integer} status=%{status|integer} duration=%{duration|float} uptime=%{uptime|long} success=%{success|boolean} msg=\"%{message}\"", "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } -func BenchmarkDissectNoConversionMissingType(b *testing.B) { +func BenchmarkDissectWithConversionMultipleValuesMissingType(b *testing.B) { b.ReportAllocs() - benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", - "id=1857 status=404 msg=\"File not found\"}", b) -} -func BenchmarkDissectWithConversionMissingType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id|} status=%{status|} msg=\"%{message}\"", - "id=1857 status=404 msg=\"File not found\"}", b) -} -func BenchmarkDissectNoConversionInvalidType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id} status=%{status} msg=\"%{message}\"", - "id=1945 status=500 msg=\"Internal server error\"}", b) + benchmarkConversion("id=%{id|} status=%{status|} duration=%{duration|} uptime=%{uptime|} success=%{success|} msg=\"%{message}\"", + "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } -func BenchmarkDissectWithConversionInvalidType(b *testing.B) { +func BenchmarkDissectWithConversionMultipleValuesInvalidType(b *testing.B) { b.ReportAllocs() - benchmarkConversion("id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", - "id=1945 status=500 msg=\"Internal server error\"}", b) + benchmarkConversion("id=%{id|abc} status=%{status|def} duration=%{duration|jkl} uptime=%{uptime|tux} success=%{success|xyz} msg=\"%{message}\"", + "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } From 1f5ee5e816ee69c487753014532048dc6dfb719c Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Sat, 16 May 2020 13:44:16 -0700 Subject: [PATCH 05/21] convert to .string, output error --- dev-tools/mage/check.go | 2 +- libbeat/processors/dissect/field.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dev-tools/mage/check.go b/dev-tools/mage/check.go index 11767ae7d3c..6c689432924 100644 --- a/dev-tools/mage/check.go +++ b/dev-tools/mage/check.go @@ -99,7 +99,7 @@ func GitDiffIndex() ([]string, error) { return nil, errors.Wrap(err, "failed to dissect git diff-index output") } - paths := strings.Split(m["paths"], "\t") + paths := strings.Split(m["paths"].(string), "\t") if len(paths) > 1 { modified = append(modified, paths[1]) } else { diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index 9009dfeaed2..b06dd964347 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -169,7 +169,7 @@ func (f normalField) Apply(b string, m Map) { if err == nil { m[f.Key()] = value } else { - fmt.Errorf("%s\n", err) + errors.Errorf("%s\n", err) } } } From 5e89636e33fcf60a9a616dc93600cacc0c9c59d2 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Wed, 20 May 2020 23:00:16 -0700 Subject: [PATCH 06/21] Add benchmark to demonstrate degradation --- libbeat/processors/dissect/dissect_test.go | 129 +++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 9ab678a30d8..0afe98a364a 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -308,3 +308,132 @@ func BenchmarkDissectWithConversionMultipleValuesInvalidType(b *testing.B) { benchmarkConversion("id=%{id|abc} status=%{status|def} duration=%{duration|jkl} uptime=%{uptime|tux} success=%{success|xyz} msg=\"%{message}\"", "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } + +func BenchmarkDissectComplexStackTraceDegradation(b *testing.B) { + message := `18-Apr-2018 06:53:20.411 INFO [http-nio-8080-exec-1] org.apache.coyote.http11.Http11Processor.service Error parsing HTTP request header + Note: further occurrences of HTTP header parsing errors will be logged at DEBUG level. + java.lang.IllegalArgumentException: Invalid character found in method name. HTTP method names must be tokens + at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:426) + at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:687) + at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) + at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) + at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) + at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) + at java.lang.Thread.run(Thread.java:748) MACHINE[hello]` + + b.Run("ComplexStackTrace1", func(b *testing.B) { + tok := "%{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace2", func(b *testing.B) { + tok := "%{day}-%{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace3", func(b *testing.B) { + tok := "%{day}-%{month}-%{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace4", func(b *testing.B) { + tok := "%{day}-%{month}-%{year} %{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace5", func(b *testing.B) { + tok := "%{day}-%{month}-%{year} %{hour} %{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace6", func(b *testing.B) { + tok := "%{day}-%{month}-%{year} %{hour} %{severity} %{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace7", func(b *testing.B) { + tok := "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + b.Run("ComplexStackTrace8", func(b *testing.B) { + tok := "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{first_line} %{message}" + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) +} From f974066d479cebcd26b012fcaa4a2507a04d475b Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Wed, 20 May 2020 23:19:46 -0700 Subject: [PATCH 07/21] Add documentation for data conversion --- libbeat/processors/dissect/docs/dissect.asciidoc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libbeat/processors/dissect/docs/dissect.asciidoc b/libbeat/processors/dissect/docs/dissect.asciidoc index e11d8ed50b9..c62ab07d8ae 100644 --- a/libbeat/processors/dissect/docs/dissect.asciidoc +++ b/libbeat/processors/dissect/docs/dissect.asciidoc @@ -11,7 +11,7 @@ The `dissect` processor tokenizes incoming strings using defined patterns. ------- processors: - dissect: - tokenizer: "%{key1} %{key2}" + tokenizer: "%{key1} %{key2} %{key3|convert_datatype}" field: "message" target_prefix: "dissect" ------- @@ -19,6 +19,8 @@ processors: The `dissect` processor has the following configuration settings: `tokenizer`:: The field used to define the *dissection* pattern. + Optional convert datatype can be provided after the key using `|` as separator + to convert the value from string to integer, long, float, double, boolean or ip. `field`:: (Optional) The event field to tokenize. Default is `message`. From ac44544fa14647887296b632f323ebc71aed6dfb Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Wed, 20 May 2020 23:29:29 -0700 Subject: [PATCH 08/21] Update example to include data conversion --- libbeat/processors/dissect/docs/dissect.asciidoc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/libbeat/processors/dissect/docs/dissect.asciidoc b/libbeat/processors/dissect/docs/dissect.asciidoc index c62ab07d8ae..b655838d561 100644 --- a/libbeat/processors/dissect/docs/dissect.asciidoc +++ b/libbeat/processors/dissect/docs/dissect.asciidoc @@ -44,12 +44,12 @@ For this example, imagine that an application generates the following messages: [source,sh] ---- -"App01 - WebServer is starting" -"App01 - WebServer is up and running" -"App01 - WebServer is scaling 2 pods" -"App02 - Database is will be restarted in 5 minutes" -"App02 - Database is up and running" -"App02 - Database is refreshing tables" +"321 - App01 - WebServer is starting" +"321 - App01 - WebServer is up and running" +"321 - App01 - WebServer is scaling 2 pods" +"789 - App02 - Database is will be restarted in 5 minutes" +"789 - App02 - Database is up and running" +"789 - App02 - Database is refreshing tables" ---- Use the `dissect` processor to split each message into two fields, for example, @@ -59,7 +59,7 @@ Use the `dissect` processor to split each message into two fields, for example, ---- processors: - dissect: - tokenizer: '"%{service.name} - %{service.status}"' + tokenizer: '"%{pid|integer} - %{service.name} - %{service.status}"' field: "message" target_prefix: "" ---- @@ -69,6 +69,7 @@ This configuration produces fields like: [source,json] ---- "service": { + "pid": 321, "name": "App01", "status": "WebServer is up and running" }, From d6ce235867ff75c1240ccc1401ef447961bf030d Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Thu, 21 May 2020 00:31:15 -0700 Subject: [PATCH 09/21] Update test case --- libbeat/processors/dissect/dissect_test.go | 166 +++++++-------------- 1 file changed, 54 insertions(+), 112 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 0afe98a364a..9aeb8145177 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -324,116 +324,58 @@ func BenchmarkDissectComplexStackTraceDegradation(b *testing.B) { at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) at java.lang.Thread.run(Thread.java:748) MACHINE[hello]` - b.Run("ComplexStackTrace1", func(b *testing.B) { - tok := "%{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace2", func(b *testing.B) { - tok := "%{day}-%{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace3", func(b *testing.B) { - tok := "%{day}-%{month}-%{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace4", func(b *testing.B) { - tok := "%{day}-%{month}-%{year} %{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace5", func(b *testing.B) { - tok := "%{day}-%{month}-%{year} %{hour} %{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace6", func(b *testing.B) { - tok := "%{day}-%{month}-%{year} %{hour} %{severity} %{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace7", func(b *testing.B) { - tok := "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) - b.Run("ComplexStackTrace8", func(b *testing.B) { - tok := "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{first_line} %{message}" - msg := message - d, err := New(tok) - if !assert.NoError(b, err) { - return - } - b.ReportAllocs() - for n := 0; n < b.N; n++ { - r, err := d.Dissect(msg) - assert.NoError(b, err) - results = r - } - }) + tests := []struct { + Name string + Tok string + }{ + { + Name:"ComplexStackTrace-1", + Tok: "%{origin} %{message}", + }, + { + Name:"ComplexStackTrace-2", + Tok: "%{day} %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-3", + Tok: "%{day}-%{month} %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-4", + Tok: "%{day}-%{month}-%{year} %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-5", + Tok: "%{day}-%{month}-%{year} %{hour} %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-6", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-7", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}", + }, + { + Name:"ComplexStackTrace-8", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{first_line} %{message}", + }, + } + + for _, test := range tests { + b.Run(test.Name, func(b *testing.B) { + tok := test.Tok + msg := message + d, err := New(tok) + if !assert.NoError(b, err) { + return + } + b.ReportAllocs() + for n := 0; n < b.N; n++ { + r, err := d.Dissect(msg) + assert.NoError(b, err) + results = r + } + }) + } } From 44c62411c46c0b4749547ca079093bc0b3161964 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Thu, 21 May 2020 00:38:09 -0700 Subject: [PATCH 10/21] Update change log to include this PR 18683 --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 86ddfdd632d..326c2e06c3a 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -248,6 +248,7 @@ field. You can revert this change by configuring tags for the module and omittin - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] - Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905] +- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] *Auditbeat* From 7fc0caf00f266970c4cd2929e6664fc38ec8b554 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Thu, 21 May 2020 15:04:24 -0700 Subject: [PATCH 11/21] formatting issue fixed --- libbeat/processors/dissect/dissect_test.go | 36 +++++++++++----------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 9aeb8145177..5c06272cabc 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -325,40 +325,40 @@ func BenchmarkDissectComplexStackTraceDegradation(b *testing.B) { at java.lang.Thread.run(Thread.java:748) MACHINE[hello]` tests := []struct { - Name string - Tok string + Name string + Tok string }{ { - Name:"ComplexStackTrace-1", - Tok: "%{origin} %{message}", + Name: "ComplexStackTrace-1", + Tok: "%{origin} %{message}", }, { - Name:"ComplexStackTrace-2", - Tok: "%{day} %{origin} %{message}", + Name: "ComplexStackTrace-2", + Tok: "%{day} %{origin} %{message}", }, { - Name:"ComplexStackTrace-3", - Tok: "%{day}-%{month} %{origin} %{message}", + Name: "ComplexStackTrace-3", + Tok: "%{day}-%{month} %{origin} %{message}", }, { - Name:"ComplexStackTrace-4", - Tok: "%{day}-%{month}-%{year} %{origin} %{message}", + Name: "ComplexStackTrace-4", + Tok: "%{day}-%{month}-%{year} %{origin} %{message}", }, { - Name:"ComplexStackTrace-5", - Tok: "%{day}-%{month}-%{year} %{hour} %{origin} %{message}", + Name: "ComplexStackTrace-5", + Tok: "%{day}-%{month}-%{year} %{hour} %{origin} %{message}", }, { - Name:"ComplexStackTrace-6", - Tok: "%{day}-%{month}-%{year} %{hour} %{severity} %{origin} %{message}", + Name: "ComplexStackTrace-6", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} %{origin} %{message}", }, { - Name:"ComplexStackTrace-7", - Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}", + Name: "ComplexStackTrace-7", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{message}", }, { - Name:"ComplexStackTrace-8", - Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{first_line} %{message}", + Name: "ComplexStackTrace-8", + Tok: "%{day}-%{month}-%{year} %{hour} %{severity} [%{thread_id}] %{origin} %{first_line} %{message}", }, } From 790332823864e3292edf08227d12ccd5c955ce49 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Sun, 7 Jun 2020 22:37:31 -0700 Subject: [PATCH 12/21] Remove line from CHANGELOG.next.asciidoc, update config test for invalid and missing data type, update dissect test for padding ->, move data type extraction to extractKeyParts, cleanup newNormalField. --- CHANGELOG.next.asciidoc | 1 - libbeat/processors/dissect/config_test.go | 6 +- libbeat/processors/dissect/const.go | 2 + libbeat/processors/dissect/dissect_test.go | 22 ++---- libbeat/processors/dissect/field.go | 76 +++++++++++++-------- libbeat/processors/dissect/validate_test.go | 4 +- 6 files changed, 60 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0aa0bb4ea3d..56b050fe8c3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -267,7 +267,6 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] -- Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905] - Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] diff --git a/libbeat/processors/dissect/config_test.go b/libbeat/processors/dissect/config_test.go index 517cfc6fd3c..9fd4fcac2be 100644 --- a/libbeat/processors/dissect/config_test.go +++ b/libbeat/processors/dissect/config_test.go @@ -120,7 +120,7 @@ func TestConfigForDataType(t *testing.T) { }) t.Run("invalid data type", func(t *testing.T) { c, err := common.NewConfigFrom(map[string]interface{}{ - "tokenizer": "%{value1|int} %{value2|short} %{value3|char} %{value4|void} %{value5|unsigned}", + "tokenizer": "%{value1|int} %{value2|short} %{value3|char} %{value4|void} %{value5|unsigned} id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", "field": "message", }) if !assert.NoError(t, err) { @@ -129,7 +129,7 @@ func TestConfigForDataType(t *testing.T) { cfg := config{} err = c.Unpack(&cfg) - if !assert.NoError(t, err) { + if !assert.Error(t, err) { return } }) @@ -144,7 +144,7 @@ func TestConfigForDataType(t *testing.T) { cfg := config{} err = c.Unpack(&cfg) - if !assert.NoError(t, err) { + if !assert.Error(t, err) { return } }) diff --git a/libbeat/processors/dissect/const.go b/libbeat/processors/dissect/const.go index aa0349cf82d..ba607ba17c8 100644 --- a/libbeat/processors/dissect/const.go +++ b/libbeat/processors/dissect/const.go @@ -55,4 +55,6 @@ var ( errMixedPrefixIndirectAppend = errors.New("mixed prefix `&+`") errMixedPrefixAppendIndirect = errors.New("mixed prefix `&+`") errEmptyKey = errors.New("empty key") + errInvalidDatatype = errors.New("invalid data type") + errMissingDatatype = errors.New("missing data type") ) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 5c06272cabc..9a71adc1ed7 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -66,22 +66,14 @@ func TestDissectConversion(t *testing.T) { Fail: false, }, { - Name: "Missing data type, drop fields", - Tok: "id=%{id|} status=%{status|} msg=\"%{message}\"", - Msg: "id=1857 status=404 msg=\"File not found\"}", + Name: "Greedy padding skip test ->", + Tok: "id=%{id|integer->} padding_removed=%{padding_removed|boolean->} length=%{length|long->} msg=\"%{message}\"", + Msg: "id=1945 padding_removed=true length=123456789 msg=\"Testing for padding\"}", Expected: map[string]interface{}{ - "id": "1857", - "message": "File not found", - "status": "404", - }, - Fail: false, - }, - { - Name: "Invalid data type, drop fields", - Tok: "id=%{id|xyz} status=%{status|abc} msg=\"%{message}\"", - Msg: "id=1945 status=500 msg=\"Internal server error\"}", - Expected: map[string]interface{}{ - "message": "Internal server error", + "id": int32(1945), + "padding_removed": true, + "length": int64(123456789), + "message": "Testing for padding", }, Fail: false, }, diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index b06dd964347..5e76ffd894a 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -51,6 +51,10 @@ type baseField struct { type dataType uint8 +var ( + dataTypeSeparator = "|" +) + // List of dataTypes. const ( unset dataType = iota @@ -134,9 +138,10 @@ func strToInt(s string, bitSize int) (int64, error) { } func transformType(typ dataType, value string) (interface{}, error) { + value = strings.TrimRight(value, " ") switch typ { case String: - return fmt.Sprintf("%v", value), nil + return value, nil case Long: return strToInt(value, 64) case Integer: @@ -168,8 +173,6 @@ func (f normalField) Apply(b string, m Map) { value, err := transformType(dt, b) if err == nil { m[f.Key()] = value - } else { - errors.Errorf("%s\n", err) } } } @@ -242,7 +245,9 @@ type indirectField struct { func (f indirectField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - m[v.(string)] = b + if v, ok := v.(string); ok { + m[v] = b + } return } } @@ -267,7 +272,9 @@ type appendField struct { func (f appendField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - m[f.Key()] = v.(string) + f.JoinString() + b + if val, ok := v.(string); ok { + m[f.Key()] = val + f.JoinString() + b + } return } m[f.Key()] = b @@ -285,7 +292,17 @@ func newField(id int, rawKey string, previous delimiter) (field, error) { return newSkipField(id), nil } - key, ordinal, length, greedy := extractKeyParts(rawKey) + key, dataType, ordinal, length, greedy := extractKeyParts(rawKey) + if len(dataType) > 0 { + dt, ok := dataTypeNames[dataType] + if !ok { + return nil, errInvalidDatatype + } + + if dt == unset { + return nil, errMissingDatatype + } + } // Conflicting prefix used. if strings.HasPrefix(key, appendIndirectPrefix) { @@ -311,7 +328,7 @@ func newField(id int, rawKey string, previous delimiter) (field, error) { if strings.HasPrefix(key, indirectFieldPrefix) { return newIndirectField(id, key[1:], length), nil } - return newNormalField(id, key, ordinal, length, greedy), nil + return newNormalField(id, key, dataType, ordinal, length, greedy), nil } func newSkipField(id int) skipField { @@ -353,36 +370,35 @@ func newIndirectField(id int, key string, length int) indirectField { } } -func newNormalField(id int, key string, ordinal int, length int, greedy bool) normalField { - parts := strings.Split(key, "|") - if len(parts) > 1 { - return normalField{ - baseField{ - id: id, - key: parts[0], - ordinal: ordinal, - length: length, - greedy: greedy, - dataType: parts[1], - }, - } - } else { - key = parts[0] - } +func newNormalField(id int, key string, dataType string, ordinal int, length int, greedy bool) normalField { return normalField{ baseField{ - id: id, - key: key, - ordinal: ordinal, - length: length, - greedy: greedy, + id: id, + key: key, + ordinal: ordinal, + length: length, + greedy: greedy, + dataType: dataType, }, } } -func extractKeyParts(rawKey string) (key string, ordinal int, length int, greedy bool) { +func extractKeyParts(rawKey string) (key string, dataType string, ordinal int, length int, greedy bool) { m := suffixRE.FindAllStringSubmatch(rawKey, -1) + if m[0][1] != "" { + parts := strings.Split(m[0][1], dataTypeSeparator) + if len(parts) > 1 { + key = parts[0] + if parts[1] == "" { + dataType = "[unset]" + } else { + dataType = parts[1] + } + } else { + key = m[0][1] + } + } if m[0][3] != "" { ordinal, _ = strconv.Atoi(m[0][3]) } @@ -395,5 +411,5 @@ func extractKeyParts(rawKey string) (key string, ordinal int, length int, greedy greedy = true } - return m[0][1], ordinal, length, greedy + return key, dataType, ordinal, length, greedy } diff --git a/libbeat/processors/dissect/validate_test.go b/libbeat/processors/dissect/validate_test.go index dd19b688355..e1721146cbd 100644 --- a/libbeat/processors/dissect/validate_test.go +++ b/libbeat/processors/dissect/validate_test.go @@ -32,7 +32,7 @@ func TestValidate(t *testing.T) { { name: "when we find reference field for all indirect field", p: &parser{ - fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", 1, 0, false)}, + fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", "", 1, 0, false)}, referenceFields: []field{newPointerField(2, "hello", 0)}, }, expectError: false, @@ -40,7 +40,7 @@ func TestValidate(t *testing.T) { { name: "when we cannot find all the reference field for all indirect field", p: &parser{ - fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", 1, 0, false)}, + fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", "", 1, 0, false)}, referenceFields: []field{newPointerField(2, "okhello", 0)}, }, expectError: true, From c9c08313d195806e7fecff06963f16ec25372e7a Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Mon, 8 Jun 2020 14:06:37 -0700 Subject: [PATCH 13/21] Move parsing to regexp, move data type after greedy indicator, use const for [unset] --- libbeat/processors/dissect/const.go | 5 ++- libbeat/processors/dissect/dissect_test.go | 2 +- libbeat/processors/dissect/field.go | 46 ++++++++++------------ 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/libbeat/processors/dissect/const.go b/libbeat/processors/dissect/const.go index ba607ba17c8..18464d111ab 100644 --- a/libbeat/processors/dissect/const.go +++ b/libbeat/processors/dissect/const.go @@ -38,14 +38,17 @@ var ( indirectAppendPrefix = "&+" greedySuffix = "->" pointerFieldPrefix = "*" + dataTypeSeparator = "\\|" numberRE = "\\d{1,2}" + alphaRE = "[[:alpha:]]*" delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}") suffixRE = regexp.MustCompile("(.+?)" + // group 1 for key name "(" + ordinalIndicator + "(" + numberRE + ")" + ")?" + // group 2, 3 for ordinal "(" + fixedLengthIndicator + "(" + numberRE + ")" + ")?" + // group 4, 5 for fixed length - "(" + greedySuffix + ")?$") // group 6 for greedy + "(" + greedySuffix + ")?" + // group 6 for greedy + "(" + dataTypeSeparator + "(" + alphaRE + ")?" + ")?$") // group 7,8 for data type separator and data type defaultJoinString = " " diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 9a71adc1ed7..8e9a05fcf7b 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -67,7 +67,7 @@ func TestDissectConversion(t *testing.T) { }, { Name: "Greedy padding skip test ->", - Tok: "id=%{id|integer->} padding_removed=%{padding_removed|boolean->} length=%{length|long->} msg=\"%{message}\"", + Tok: "id=%{id->|integer} padding_removed=%{padding_removed->|boolean} length=%{length->|long} msg=\"%{message}\"", Msg: "id=1945 padding_removed=true length=123456789 msg=\"Testing for padding\"}", Expected: map[string]interface{}{ "id": int32(1945), diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index 5e76ffd894a..f82eab10060 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -51,10 +51,6 @@ type baseField struct { type dataType uint8 -var ( - dataTypeSeparator = "|" -) - // List of dataTypes. const ( unset dataType = iota @@ -67,15 +63,18 @@ const ( IP ) +var ( + unsetDataType = "[unset]" +) var dataTypeNames = map[string]dataType{ - "[unset]": unset, - "integer": Integer, - "long": Long, - "float": Float, - "double": Double, - "string": String, - "boolean": Boolean, - "ip": IP, + unsetDataType: unset, + "integer": Integer, + "long": Long, + "float": Float, + "double": Double, + "string": String, + "boolean": Boolean, + "ip": IP, } func (f baseField) IsGreedy() bool { @@ -386,19 +385,6 @@ func newNormalField(id int, key string, dataType string, ordinal int, length int func extractKeyParts(rawKey string) (key string, dataType string, ordinal int, length int, greedy bool) { m := suffixRE.FindAllStringSubmatch(rawKey, -1) - if m[0][1] != "" { - parts := strings.Split(m[0][1], dataTypeSeparator) - if len(parts) > 1 { - key = parts[0] - if parts[1] == "" { - dataType = "[unset]" - } else { - dataType = parts[1] - } - } else { - key = m[0][1] - } - } if m[0][3] != "" { ordinal, _ = strconv.Atoi(m[0][3]) } @@ -411,5 +397,13 @@ func extractKeyParts(rawKey string) (key string, dataType string, ordinal int, l greedy = true } - return key, dataType, ordinal, length, greedy + if m[0][7] != "" { + if m[0][8] == "" { + dataType = unsetDataType + } else { + dataType = m[0][8] + } + } + + return m[0][1], dataType, ordinal, length, greedy } From a22150a5b9301d87152f750f71b6771230d1af4f Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Tue, 9 Jun 2020 22:18:04 -0700 Subject: [PATCH 14/21] Refactor code, check | suffix, to panic for missing data type --- libbeat/processors/dissect/config_test.go | 2 +- libbeat/processors/dissect/const.go | 3 +- libbeat/processors/dissect/field.go | 39 +++++++++-------------- 3 files changed, 18 insertions(+), 26 deletions(-) diff --git a/libbeat/processors/dissect/config_test.go b/libbeat/processors/dissect/config_test.go index 9fd4fcac2be..0fa43082d47 100644 --- a/libbeat/processors/dissect/config_test.go +++ b/libbeat/processors/dissect/config_test.go @@ -135,7 +135,7 @@ func TestConfigForDataType(t *testing.T) { }) t.Run("missing data type", func(t *testing.T) { c, err := common.NewConfigFrom(map[string]interface{}{ - "tokenizer": "%{value1|} %{value2|} %{value3|} %{value4|} %{value5|}", + "tokenizer": "%{value1|} %{value2|}", "field": "message", }) if !assert.NoError(t, err) { diff --git a/libbeat/processors/dissect/const.go b/libbeat/processors/dissect/const.go index 18464d111ab..b34af702313 100644 --- a/libbeat/processors/dissect/const.go +++ b/libbeat/processors/dissect/const.go @@ -38,7 +38,8 @@ var ( indirectAppendPrefix = "&+" greedySuffix = "->" pointerFieldPrefix = "*" - dataTypeSeparator = "\\|" + dataTypeIndicator = "|" + dataTypeSeparator = "\\|" // Needed for regexp numberRE = "\\d{1,2}" alphaRE = "[[:alpha:]]*" diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index f82eab10060..5ac8d4ef56f 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -63,18 +63,15 @@ const ( IP ) -var ( - unsetDataType = "[unset]" -) var dataTypeNames = map[string]dataType{ - unsetDataType: unset, - "integer": Integer, - "long": Long, - "float": Float, - "double": Double, - "string": String, - "boolean": Boolean, - "ip": IP, + "[unset]": unset, + "integer": Integer, + "long": Long, + "float": Float, + "double": Double, + "string": String, + "boolean": Boolean, + "ip": IP, } func (f baseField) IsGreedy() bool { @@ -292,15 +289,15 @@ func newField(id int, rawKey string, previous delimiter) (field, error) { } key, dataType, ordinal, length, greedy := extractKeyParts(rawKey) + + // rawKey will have | as suffix when data type is missing + if strings.HasSuffix(rawKey, dataTypeIndicator) { + return nil, errMissingDatatype + } if len(dataType) > 0 { - dt, ok := dataTypeNames[dataType] - if !ok { + if _, ok := dataTypeNames[dataType]; !ok { return nil, errInvalidDatatype } - - if dt == unset { - return nil, errMissingDatatype - } } // Conflicting prefix used. @@ -397,13 +394,7 @@ func extractKeyParts(rawKey string) (key string, dataType string, ordinal int, l greedy = true } - if m[0][7] != "" { - if m[0][8] == "" { - dataType = unsetDataType - } else { - dataType = m[0][8] - } - } + dataType = m[0][8] return m[0][1], dataType, ordinal, length, greedy } From 79a443c9e5aa3ac22dbbd5082aa38d3c1dca44be Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Wed, 10 Jun 2020 07:46:02 -0700 Subject: [PATCH 15/21] Remove [unset], add a new line --- libbeat/processors/dissect/field.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index 5ac8d4ef56f..f4f192590f5 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -53,8 +53,7 @@ type dataType uint8 // List of dataTypes. const ( - unset dataType = iota - Integer + Integer dataType = iota Long Float Double @@ -64,7 +63,6 @@ const ( ) var dataTypeNames = map[string]dataType{ - "[unset]": unset, "integer": Integer, "long": Long, "float": Float, @@ -93,6 +91,7 @@ func (f baseField) Length() int { func (f baseField) Key() string { return f.key } + func (f baseField) DataType() string { return f.dataType } From acb1b6068f52802e14bb30bbf40de7d84841ab27 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Wed, 10 Jun 2020 20:16:27 -0700 Subject: [PATCH 16/21] Convert data for indirectField use case, add test case, update existing test case --- libbeat/processors/dissect/dissect_test.go | 10 ++++++++ libbeat/processors/dissect/field.go | 28 +++++++++++++-------- libbeat/processors/dissect/validate_test.go | 4 +-- 3 files changed, 29 insertions(+), 13 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index 8e9a05fcf7b..dd99446c353 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -65,6 +65,16 @@ func TestDissectConversion(t *testing.T) { }, Fail: false, }, + { + Name: "Convert 1 indirect field value", + Tok: "%{?k1}=%{&k1|integer} msg=\"%{message}\"", + Msg: "id=7736 msg=\"Single value OK\"}", + Expected: map[string]interface{}{ + "id": int32(7736), + "message": "Single value OK", + }, + Fail: false, + }, { Name: "Greedy padding skip test ->", Tok: "id=%{id->|integer} padding_removed=%{padding_removed->|boolean} length=%{length->|long} msg=\"%{message}\"", diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index f4f192590f5..1d4c2a9938d 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -160,17 +160,22 @@ func transformType(typ dataType, value string) (interface{}, error) { } } -func (f normalField) Apply(b string, m Map) { - if len(f.dataType) == 0 { - m[f.Key()] = b +func convertData(typ string, b string) interface{} { + if len(typ) == 0 { + return b } else { - if dt, ok := dataTypeNames[f.dataType]; ok { + if dt, ok := dataTypeNames[typ]; ok { value, err := transformType(dt, b) if err == nil { - m[f.Key()] = value + return value } } } + return nil +} + +func (f normalField) Apply(b string, m Map) { + m[f.Key()] = convertData(f.dataType, b) } // skipField is an skip field without a name like this: `%{}`, this is often used to @@ -241,7 +246,7 @@ func (f indirectField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { if v, ok := v.(string); ok { - m[v] = b + m[v] = convertData(f.dataType, b) } return } @@ -321,7 +326,7 @@ func newField(id int, rawKey string, previous delimiter) (field, error) { } if strings.HasPrefix(key, indirectFieldPrefix) { - return newIndirectField(id, key[1:], length), nil + return newIndirectField(id, key[1:], dataType, length), nil } return newNormalField(id, key, dataType, ordinal, length, greedy), nil } @@ -355,12 +360,13 @@ func newAppendField(id int, key string, ordinal int, length int, greedy bool, pr } } -func newIndirectField(id int, key string, length int) indirectField { +func newIndirectField(id int, key string, dataType string, length int) indirectField { return indirectField{ baseField{ - id: id, - key: key, - length: length, + id: id, + key: key, + length: length, + dataType: dataType, }, } } diff --git a/libbeat/processors/dissect/validate_test.go b/libbeat/processors/dissect/validate_test.go index e1721146cbd..d2043dff054 100644 --- a/libbeat/processors/dissect/validate_test.go +++ b/libbeat/processors/dissect/validate_test.go @@ -32,7 +32,7 @@ func TestValidate(t *testing.T) { { name: "when we find reference field for all indirect field", p: &parser{ - fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", "", 1, 0, false)}, + fields: []field{newIndirectField(1, "hello", "", 0), newNormalField(0, "hola", "", 1, 0, false)}, referenceFields: []field{newPointerField(2, "hello", 0)}, }, expectError: false, @@ -40,7 +40,7 @@ func TestValidate(t *testing.T) { { name: "when we cannot find all the reference field for all indirect field", p: &parser{ - fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", "", 1, 0, false)}, + fields: []field{newIndirectField(1, "hello", "", 0), newNormalField(0, "hola", "", 1, 0, false)}, referenceFields: []field{newPointerField(2, "okhello", 0)}, }, expectError: true, From 3f09b106de89142f86a3d657ca09af5a4d2373ea Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Thu, 11 Jun 2020 12:16:45 -0700 Subject: [PATCH 17/21] Remove failure cases from benchmark, since behavior has changed to panic in case of invalid and missing data type --- libbeat/processors/dissect/dissect_test.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index dd99446c353..f50660cb9be 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -277,39 +277,23 @@ func BenchmarkDissectNoConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) } + func BenchmarkDissectWithConversionOneValue(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id|integer} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) } -func BenchmarkDissectWithConversionOneValueMissingType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id|} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) -} -func BenchmarkDissectWithConversionOneValueInvalidType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id|xyz} msg=\"%{message}\"", "id=7736 msg=\"Single value OK\"}", b) -} func BenchmarkDissectNoConversionMultipleValues(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id} status=%{status} duration=%{duration} uptime=%{uptime} success=%{success} msg=\"%{message}\"", "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } + func BenchmarkDissectWithConversionMultipleValues(b *testing.B) { b.ReportAllocs() benchmarkConversion("id=%{id|integer} status=%{status|integer} duration=%{duration|float} uptime=%{uptime|long} success=%{success|boolean} msg=\"%{message}\"", "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) } -func BenchmarkDissectWithConversionMultipleValuesMissingType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id|} status=%{status|} duration=%{duration|} uptime=%{uptime|} success=%{success|} msg=\"%{message}\"", - "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) -} -func BenchmarkDissectWithConversionMultipleValuesInvalidType(b *testing.B) { - b.ReportAllocs() - benchmarkConversion("id=%{id|abc} status=%{status|def} duration=%{duration|jkl} uptime=%{uptime|tux} success=%{success|xyz} msg=\"%{message}\"", - "id=7736 status=202 duration=0.975 uptime=1588975628 success=true msg=\"Request accepted\"}", b) -} func BenchmarkDissectComplexStackTraceDegradation(b *testing.B) { message := `18-Apr-2018 06:53:20.411 INFO [http-nio-8080-exec-1] org.apache.coyote.http11.Http11Processor.service Error parsing HTTP request header From d3152339c135653159d4ec2c3df46f1c6532f5d2 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Fri, 12 Jun 2020 16:58:20 -0700 Subject: [PATCH 18/21] Move change log to bottom of the list --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 56b050fe8c3..470bdd96d21 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -267,8 +267,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958] - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] -- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] +- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] *Auditbeat* From b69a6727fff0ca16e3b581c382015eb34bd220a7 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Sun, 21 Jun 2020 22:42:09 -0700 Subject: [PATCH 19/21] Rever back changes to mage/check.go, change dissect and processor to dissect and convert only if data type conversion is specified for atleast 1 field. --- dev-tools/mage/check.go | 2 +- libbeat/processors/dissect/dissect.go | 108 ++++++++++++++++++- libbeat/processors/dissect/dissect_test.go | 10 +- libbeat/processors/dissect/field.go | 65 +---------- libbeat/processors/dissect/processor.go | 36 ++++++- libbeat/processors/dissect/processor_test.go | 45 ++++++++ 6 files changed, 193 insertions(+), 73 deletions(-) diff --git a/dev-tools/mage/check.go b/dev-tools/mage/check.go index e44055a8fdb..196eba36d48 100644 --- a/dev-tools/mage/check.go +++ b/dev-tools/mage/check.go @@ -99,7 +99,7 @@ func GitDiffIndex() ([]string, error) { return nil, errors.Wrap(err, "failed to dissect git diff-index output") } - paths := strings.Split(m["paths"].(string), "\t") + paths := strings.Split(m["paths"], "\t") if len(paths) > 1 { modified = append(modified, paths[1]) } else { diff --git a/libbeat/processors/dissect/dissect.go b/libbeat/processors/dissect/dissect.go index fc9e7995026..c1de0d6caf4 100644 --- a/libbeat/processors/dissect/dissect.go +++ b/libbeat/processors/dissect/dissect.go @@ -17,10 +17,20 @@ package dissect -import "fmt" +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/elastic/beats/v7/libbeat/common" + + "github.com/pkg/errors" +) // Map represents the keys and their values extracted with the defined tokenizer. -type Map = map[string]interface{} +type Map = map[string]string +type MapConverted = map[string]interface{} // positions represents the start and end position of the keys found in the string. type positions []position @@ -61,6 +71,23 @@ func (d *Dissector) Dissect(s string) (Map, error) { return d.resolve(s, positions), nil } +func (d *Dissector) DissectConvert(s string) (MapConverted, error) { + if len(s) == 0 { + return nil, errEmpty + } + + positions, err := d.extract(s) + if err != nil { + return nil, err + } + + if len(positions) == 0 { + return nil, errParsingFailure + } + + return d.resolveConvert(s, positions), nil +} + // Raw returns the raw tokenizer used to generate the actual parser. func (d *Dissector) Raw() string { return d.raw @@ -161,6 +188,35 @@ func (d *Dissector) resolve(s string, p positions) Map { return m } +func (d *Dissector) resolveConvert(s string, p positions) MapConverted { + lookup := make(common.MapStr, len(p)) + m := make(Map, len(p)) + mc := make(MapConverted, len(p)) + for _, f := range d.parser.fields { + pos := p[f.ID()] + f.Apply(s[pos.start:pos.end], m) // using map[string]string to avoid another set of apply methods + if !f.IsSaveable() { + lookup[f.Key()] = s[pos.start:pos.end] + } else { + key := f.Key() + if k, ok := lookup[f.Key()]; ok { + key = k.(string) + } + v, _ := m[key] + if f.DataType() != "" { + mc[key] = convertData(f.DataType(), v) + } else { + mc[key] = v + } + } + } + + for _, f := range d.parser.referenceFields { + delete(mc, f.Key()) + } + return mc +} + // New creates a new Dissector from a tokenized string. func New(tokenizer string) (*Dissector, error) { p, err := newParser(tokenizer) @@ -174,3 +230,51 @@ func New(tokenizer string) (*Dissector, error) { return &Dissector{parser: p, raw: tokenizer}, nil } + +// strToInt is a helper to interpret a string as either base 10 or base 16. +func strToInt(s string, bitSize int) (int64, error) { + base := 10 + if strings.HasPrefix(s, "0x") || strings.HasPrefix(s, "0X") { + // strconv.ParseInt will accept the '0x' or '0X` prefix only when base is 0. + base = 0 + } + return strconv.ParseInt(s, base, bitSize) +} + +func transformType(typ dataType, value string) (interface{}, error) { + value = strings.TrimRight(value, " ") + switch typ { + case String: + return value, nil + case Long: + return strToInt(value, 64) + case Integer: + i, err := strToInt(value, 32) + return int32(i), err + case Float: + f, err := strconv.ParseFloat(value, 32) + return float32(f), err + case Double: + d, err := strconv.ParseFloat(value, 64) + return float64(d), err + case Boolean: + return strconv.ParseBool(value) + case IP: + if net.ParseIP(value) != nil { + return value, nil + } + return "", errors.New("value is not a valid IP address") + default: + return value, nil + } +} + +func convertData(typ string, b string) interface{} { + if dt, ok := dataTypeNames[typ]; ok { + value, err := transformType(dt, b) + if err == nil { + return value + } + } + return b +} diff --git a/libbeat/processors/dissect/dissect_test.go b/libbeat/processors/dissect/dissect_test.go index f50660cb9be..3543d8c3821 100644 --- a/libbeat/processors/dissect/dissect_test.go +++ b/libbeat/processors/dissect/dissect_test.go @@ -68,10 +68,10 @@ func TestDissectConversion(t *testing.T) { { Name: "Convert 1 indirect field value", Tok: "%{?k1}=%{&k1|integer} msg=\"%{message}\"", - Msg: "id=7736 msg=\"Single value OK\"}", + Msg: "id=8268 msg=\"Single value indirect field\"}", Expected: map[string]interface{}{ - "id": int32(7736), - "message": "Single value OK", + "id": int32(8268), + "message": "Single value indirect field", }, Fail: false, }, @@ -97,12 +97,12 @@ func TestDissectConversion(t *testing.T) { } if test.Fail { - _, err := d.Dissect(test.Msg) + _, err := d.DissectConvert(test.Msg) assert.Error(t, err) return } - r, err := d.Dissect(test.Msg) + r, err := d.DissectConvert(test.Msg) if !assert.NoError(t, err) { return } diff --git a/libbeat/processors/dissect/field.go b/libbeat/processors/dissect/field.go index 1d4c2a9938d..2c697ccf73d 100644 --- a/libbeat/processors/dissect/field.go +++ b/libbeat/processors/dissect/field.go @@ -19,11 +19,8 @@ package dissect import ( "fmt" - "net" "strconv" "strings" - - "github.com/pkg/errors" ) type field interface { @@ -122,60 +119,8 @@ type normalField struct { baseField } -// strToInt is a helper to interpret a string as either base 10 or base 16. -func strToInt(s string, bitSize int) (int64, error) { - base := 10 - if strings.HasPrefix(s, "0x") || strings.HasPrefix(s, "0X") { - // strconv.ParseInt will accept the '0x' or '0X` prefix only when base is 0. - base = 0 - } - return strconv.ParseInt(s, base, bitSize) -} - -func transformType(typ dataType, value string) (interface{}, error) { - value = strings.TrimRight(value, " ") - switch typ { - case String: - return value, nil - case Long: - return strToInt(value, 64) - case Integer: - i, err := strToInt(value, 32) - return int32(i), err - case Float: - f, err := strconv.ParseFloat(value, 32) - return float32(f), err - case Double: - d, err := strconv.ParseFloat(value, 64) - return float64(d), err - case Boolean: - return strconv.ParseBool(value) - case IP: - if net.ParseIP(value) != nil { - return value, nil - } - return "", errors.New("value is not a valid IP address") - default: - return value, nil - } -} - -func convertData(typ string, b string) interface{} { - if len(typ) == 0 { - return b - } else { - if dt, ok := dataTypeNames[typ]; ok { - value, err := transformType(dt, b) - if err == nil { - return value - } - } - } - return nil -} - func (f normalField) Apply(b string, m Map) { - m[f.Key()] = convertData(f.dataType, b) + m[f.Key()] = b } // skipField is an skip field without a name like this: `%{}`, this is often used to @@ -245,9 +190,7 @@ type indirectField struct { func (f indirectField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - if v, ok := v.(string); ok { - m[v] = convertData(f.dataType, b) - } + m[v] = b return } } @@ -272,9 +215,7 @@ type appendField struct { func (f appendField) Apply(b string, m Map) { v, ok := m[f.Key()] if ok { - if val, ok := v.(string); ok { - m[f.Key()] = val + f.JoinString() + b - } + m[f.Key()] = v + f.JoinString() + b return } m[f.Key()] = b diff --git a/libbeat/processors/dissect/processor.go b/libbeat/processors/dissect/processor.go index ac812e9ffae..8f611454102 100644 --- a/libbeat/processors/dissect/processor.go +++ b/libbeat/processors/dissect/processor.go @@ -53,7 +53,14 @@ func NewProcessor(c *common.Config) (processors.Processor, error) { // Run takes the event and will apply the tokenizer on the configured field. func (p *processor) Run(event *beat.Event) (*beat.Event, error) { - v, err := event.GetValue(p.config.Field) + var ( + m Map + mc MapConverted + v interface{} + err error + ) + + v, err = event.GetValue(p.config.Field) if err != nil { return event, err } @@ -63,7 +70,18 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, fmt.Errorf("field is not a string, value: `%v`, field: `%s`", v, p.config.Field) } - m, err := p.config.Tokenizer.Dissect(s) + convertDataType := false + for _, f := range p.config.Tokenizer.parser.fields { + if f.DataType() != "" { + convertDataType = true + } + } + + if convertDataType { + mc, err = p.config.Tokenizer.DissectConvert(s) + } else { + m, err = p.config.Tokenizer.Dissect(s) + } if err != nil { if err := common.AddTagsWithKey( event.Fields, @@ -76,7 +94,11 @@ func (p *processor) Run(event *beat.Event) (*beat.Event, error) { return event, err } - event, err = p.mapper(event, mapToMapStr(m)) + if convertDataType { + event, err = p.mapper(event, mapInterfaceToMapStr(mc)) + } else { + event, err = p.mapper(event, mapToMapStr(m)) + } if err != nil { return event, err } @@ -122,3 +144,11 @@ func mapToMapStr(m Map) common.MapStr { } return newMap } + +func mapInterfaceToMapStr(m MapConverted) common.MapStr { + newMap := make(common.MapStr, len(m)) + for k, v := range m { + newMap[k] = v + } + return newMap +} diff --git a/libbeat/processors/dissect/processor_test.go b/libbeat/processors/dissect/processor_test.go index 26a579ae699..e25e5c9356e 100644 --- a/libbeat/processors/dissect/processor_test.go +++ b/libbeat/processors/dissect/processor_test.go @@ -232,3 +232,48 @@ func TestErrorFlagging(t *testing.T) { assert.Error(t, err) }) } + +func TestProcessorConvert(t *testing.T) { + tests := []struct { + name string + c map[string]interface{} + fields common.MapStr + values map[string]interface{} + }{ + { + name: "extract integer", + c: map[string]interface{}{"tokenizer": "userid=%{user_id|integer}"}, + fields: common.MapStr{"message": "userid=7736"}, + values: map[string]interface{}{"dissect.user_id": int32(7736)}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + c, err := common.NewConfigFrom(test.c) + if !assert.NoError(t, err) { + return + } + + processor, err := NewProcessor(c) + if !assert.NoError(t, err) { + return + } + + e := beat.Event{Fields: test.fields} + newEvent, err := processor.Run(&e) + if !assert.NoError(t, err) { + return + } + + for field, value := range test.values { + v, err := newEvent.GetValue(field) + if !assert.NoError(t, err) { + return + } + + assert.Equal(t, value, v) + } + }) + } +} From 49f15eb26b0d8114a4605a12cff49342eb334b8c Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Tue, 23 Jun 2020 07:07:42 -0700 Subject: [PATCH 20/21] Move change log too bottom of the group --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index ee22936fd63..b8903436d40 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -306,8 +306,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808] - Add TLS support to Kerberos authentication in Elasticsearch. {pull}18607[18607] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] -- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] - Add support for multiple sets of hints on autodiscover {pull}18883[18883] +- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] *Auditbeat* From af75de86ec2dc1c5b43c8f815e1c5ff30a49ee76 Mon Sep 17 00:00:00 2001 From: Premendra Singh Date: Fri, 3 Jul 2020 10:23:15 -0700 Subject: [PATCH 21/21] Move change log to bottom of the list --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f87ee6d1fea..e0a30718d04 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -316,8 +316,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Change ownership of files in docker images so they can be used in secured environments. {pull}12905[12905] - Upgrade k8s.io/client-go and k8s keystore tests. {pull}18817[18817] - Add support for multiple sets of hints on autodiscover {pull}18883[18883] -- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] - Add a configurable delay between retries when an app metadata cannot be retrieved by `add_cloudfoundry_metadata`. {pull}19181[19181] +- Add data type conversion in `dissect` processor for converting string values to other basic data types. {pull}18683[18683] *Auditbeat*