Skip to content

Commit

Permalink
Support Fixed length extraction (elastic#17191)
Browse files Browse the repository at this point in the history
Signed-off-by: Peter Deng <yundeng@ebay.com>
  • Loading branch information
newly12 authored and ycombinator committed May 5, 2020
1 parent e0078f2 commit 97c8d4b
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 33 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add config example of how to skip the `add_host_metadata` processor when forwarding logs. {issue}13920[13920] {pull}18153[18153]
- When using the `decode_json_fields` processor, decoded fields are now deep-merged into existing event. {pull}17958[17958]
- Add backoff configuration options for the Kafka output. {issue}16777[16777] {pull}17808[17808]
- Add keystore support for autodiscover static configurations. {pull]16306[16306]
- Add Kerberos support to Elasticsearch output. {pull}17927[17927]
- Add support for fixed length extraction in `dissect` processor. {pull}17191[17191]

*Auditbeat*

Expand Down
12 changes: 10 additions & 2 deletions libbeat/processors/dissect/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ var (
// ` %{key}, %{key/2}`
// into:
// [["", "key" ], [", ", "key/2"]]
delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}")
suffixRE = regexp.MustCompile("(.+?)(/(\\d{1,2}))?(->)?$")
ordinalIndicator = "/"
fixedLengthIndicator = "#"

skipFieldPrefix = "?"
appendFieldPrefix = "+"
Expand All @@ -39,6 +39,14 @@ var (
greedySuffix = "->"
pointerFieldPrefix = "*"

numberRE = "\\d{1,2}"

delimiterRE = regexp.MustCompile("(?s)(.*?)%\\{([^}]*?)}")
suffixRE = regexp.MustCompile("(.+?)" + // group 1 for key name
"(" + ordinalIndicator + "(" + numberRE + ")" + ")?" + // group 2, 3 for ordinal
"(" + fixedLengthIndicator + "(" + numberRE + ")" + ")?" + // group 4, 5 for fixed length
"(" + greedySuffix + ")?$") // group 6 for greedy

defaultJoinString = " "

errParsingFailure = errors.New("parsing failure")
Expand Down
34 changes: 28 additions & 6 deletions libbeat/processors/dissect/dissect.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,27 @@ func (d *Dissector) extract(s string) (positions, error) {
// move through all the other delimiters, until we have consumed all of them.
for dl.Next() != nil {
start = offset
end = dl.Next().IndexOf(s, offset)
if end == -1 {
return nil, fmt.Errorf(
"could not find delimiter: `%s` in remaining: `%s`, (offset: %d)",
dl.Delimiter(), s[offset:], offset,
)

// corresponding field of the delimiter
field := d.parser.fields[d.parser.fieldsIdMap[i]]

// for fixed-length field, just step the same size of its length
if field.IsFixedLength() {
end = offset + field.Length()
if end > len(s) {
return nil, fmt.Errorf(
"field length is grater than string length: remaining: `%s`, (offset: %d), field: %s",
s[offset:], offset, field,
)
}
} else {
end = dl.Next().IndexOf(s, offset)
if end == -1 {
return nil, fmt.Errorf(
"could not find delimiter: `%s` in remaining: `%s`, (offset: %d)",
dl.Delimiter(), s[offset:], offset,
)
}
}

offset = end
Expand All @@ -118,6 +133,13 @@ func (d *Dissector) extract(s string) (positions, error) {
dl = dl.Next()
}

field := d.parser.fields[d.parser.fieldsIdMap[i]]

if field.IsFixedLength() && offset+field.Length() != len(s) {
return nil, fmt.Errorf("last fixed length key `%s` (length: %d) does not fit into remaining: `%s`, (offset: %d)",
field, field.Length(), s, offset,
)
}
// If we have remaining contents and have not captured all the requested fields
if offset < len(s) && i < len(d.parser.fields) {
positions[i] = position{start: offset, end: len(s)}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/dissect/docs/dissect.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ an error; you need to either drop or rename the key before using dissect.
For tokenization to be successful, all keys must be found and extracted, if one of them cannot be
found an error will be logged and no modification is done on the original event.

NOTE: A key can contain any characters except reserved suffix or prefix modifiers: `/`,`&`, `+`
NOTE: A key can contain any characters except reserved suffix or prefix modifiers: `/`,`&`, `+`, `#`
and `?`.

See <<conditions>> for a list of supported conditions.
Expand Down
56 changes: 37 additions & 19 deletions libbeat/processors/dissect/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ type field interface {
MarkGreedy()
IsGreedy() bool
Ordinal() int
Length() int
Key() string
ID() int
Apply(b string, m Map)
String() string
IsSaveable() bool
IsFixedLength() bool
}

type baseField struct {
id int
key string
ordinal int
length int
greedy bool
}

Expand All @@ -53,6 +56,10 @@ func (f baseField) Ordinal() int {
return f.ordinal
}

func (f baseField) Length() int {
return f.length
}

func (f baseField) Key() string {
return f.key
}
Expand All @@ -65,6 +72,10 @@ func (f baseField) IsSaveable() bool {
return true
}

func (f baseField) IsFixedLength() bool {
return f.length > 0
}

func (f baseField) String() string {
return fmt.Sprintf("field: %s, ordinal: %d, greedy: %v", f.key, f.ordinal, f.IsGreedy())
}
Expand Down Expand Up @@ -193,7 +204,7 @@ func newField(id int, rawKey string, previous delimiter) (field, error) {
return newSkipField(id), nil
}

key, ordinal, greedy := extractKeyParts(rawKey)
key, ordinal, length, greedy := extractKeyParts(rawKey)

// Conflicting prefix used.
if strings.HasPrefix(key, appendIndirectPrefix) {
Expand All @@ -205,81 +216,88 @@ func newField(id int, rawKey string, previous delimiter) (field, error) {
}

if strings.HasPrefix(key, skipFieldPrefix) {
return newNamedSkipField(id, key[1:]), nil
return newNamedSkipField(id, key[1:], length), nil
}

if strings.HasPrefix(key, pointerFieldPrefix) {
return newPointerField(id, key[1:]), nil
return newPointerField(id, key[1:], length), nil
}

if strings.HasPrefix(key, appendFieldPrefix) {
return newAppendField(id, key[1:], ordinal, greedy, previous), nil
return newAppendField(id, key[1:], ordinal, length, greedy, previous), nil
}

if strings.HasPrefix(key, indirectFieldPrefix) {
return newIndirectField(id, key[1:]), nil
return newIndirectField(id, key[1:], length), nil
}

return newNormalField(id, key, ordinal, greedy), nil
return newNormalField(id, key, ordinal, length, greedy), nil
}

func newSkipField(id int) skipField {
return skipField{baseField{id: id}}
}

func newNamedSkipField(id int, key string) namedSkipField {
func newNamedSkipField(id int, key string, length int) namedSkipField {
return namedSkipField{
baseField{id: id, key: key},
baseField{id: id, key: key, length: length},
}
}

func newPointerField(id int, key string) pointerField {
func newPointerField(id int, key string, length int) pointerField {
return pointerField{
baseField{id: id, key: key},
baseField{id: id, key: key, length: length},
}
}

func newAppendField(id int, key string, ordinal int, greedy bool, previous delimiter) appendField {
func newAppendField(id int, key string, ordinal int, length int, greedy bool, previous delimiter) appendField {
return appendField{
baseField: baseField{
id: id,
key: key,
ordinal: ordinal,
length: length,
greedy: greedy,
},
previous: previous,
}
}

func newIndirectField(id int, key string) indirectField {
func newIndirectField(id int, key string, length int) indirectField {
return indirectField{
baseField{
id: id,
key: key,
id: id,
key: key,
length: length,
},
}
}

func newNormalField(id int, key string, ordinal int, greedy bool) normalField {
func newNormalField(id int, key string, ordinal int, length int, greedy bool) normalField {
return normalField{
baseField{
id: id,
key: key,
ordinal: ordinal,
length: length,
greedy: greedy,
},
}
}

func extractKeyParts(rawKey string) (key string, ordinal int, greedy bool) {
func extractKeyParts(rawKey string) (key string, ordinal int, length int, greedy bool) {
m := suffixRE.FindAllStringSubmatch(rawKey, -1)

if m[0][3] != "" {
ordinal, _ = strconv.Atoi(m[0][3])
}

if strings.EqualFold(greedySuffix, m[0][4]) {
if m[0][5] != "" {
length, _ = strconv.Atoi(m[0][5])
}

if strings.EqualFold(greedySuffix, m[0][6]) {
greedy = true
}
return m[0][1], ordinal, greedy

return m[0][1], ordinal, length, greedy
}
6 changes: 6 additions & 0 deletions libbeat/processors/dissect/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
type parser struct {
delimiters []delimiter
fields []field
fieldsIdMap map[int]int
referenceFields []field
}

Expand Down Expand Up @@ -81,6 +82,10 @@ func newParser(tokenizer string) (*parser, error) {
sort.Slice(fields, func(i, j int) bool {
return fields[i].Ordinal() < fields[j].Ordinal()
})
fieldsIdMap := make(map[int]int)
for i, f := range fields {
fieldsIdMap[f.ID()] = i
}

// List of fields needed for indirection but don't need to appear in the final event.
var referenceFields []field
Expand All @@ -93,6 +98,7 @@ func newParser(tokenizer string) (*parser, error) {
return &parser{
delimiters: delimiters,
fields: fields,
fieldsIdMap: fieldsIdMap,
referenceFields: referenceFields,
}, nil
}
Expand Down
60 changes: 59 additions & 1 deletion libbeat/processors/dissect/testdata/dissect_tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,5 +230,63 @@
},
"skip": false,
"fail": false
},
{
"name": "simple fixed length",
"tok": "%{class#1}%{month#2}%{day#2}",
"msg": "A0118",
"expected": {
"class": "A",
"month": "01",
"day": "18"
},
"skip": false,
"fail": false
},
{
"name": "simple ordered and fixed length field",
"tok": "%{+key/3#1}%{+key/1#1} %{+key/2}",
"msg": "12 3",
"expected": {
"key": "2 3 1"
},
"skip": false,
"fail": false
},
{
"name": "simple padding and fixed length field",
"tok": "%{+key/3#1}%{+key/1#1->} %{+key/2}",
"msg": "12 3",
"expected": {
"key": "2 3 1"
},
"skip": false,
"fail": false
},
{
"name": "mixed pointer and indirect and fixed length",
"tok": "%{*key#5}%{\u0026key#5}",
"msg": "helloworld",
"expected": {
"hello": "world"
},
"skip": false,
"fail": false
},
{
"name": "fails when there is remaining string after the fixed-length key",
"tok": "%{class#1}%{month#2}%{day#2}",
"msg": "A0118 ",
"expected": null,
"skip": false,
"fail": true
},
{
"name": "fails when there is no enough string for the fixed-length key",
"tok": "%{key#10}",
"msg": "foobar",
"expected": null,
"skip": false,
"fail": true
}
]
]
8 changes: 4 additions & 4 deletions libbeat/processors/dissect/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ func TestValidate(t *testing.T) {
{
name: "when we find reference field for all indirect field",
p: &parser{
fields: []field{newIndirectField(1, "hello"), newNormalField(0, "hola", 1, false)},
referenceFields: []field{newPointerField(2, "hello")},
fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", 1, 0, false)},
referenceFields: []field{newPointerField(2, "hello", 0)},
},
expectError: false,
},
{
name: "when we cannot find all the reference field for all indirect field",
p: &parser{
fields: []field{newIndirectField(1, "hello"), newNormalField(0, "hola", 1, false)},
referenceFields: []field{newPointerField(2, "okhello")},
fields: []field{newIndirectField(1, "hello", 0), newNormalField(0, "hola", 1, 0, false)},
referenceFields: []field{newPointerField(2, "okhello", 0)},
},
expectError: true,
},
Expand Down

0 comments on commit 97c8d4b

Please sign in to comment.