From 34892e13b3d72f5d413f1f30eab65ff230f3c003 Mon Sep 17 00:00:00 2001 From: Daniel Nelson Date: Tue, 9 Jul 2019 18:56:49 -0700 Subject: [PATCH] Support string field glob matching in json parser --- plugins/parsers/json/README.md | 2 +- plugins/parsers/json/parser.go | 119 ++++++++------ plugins/parsers/json/parser_test.go | 244 ++++++++++++++++++---------- plugins/parsers/registry.go | 62 ++----- 4 files changed, 245 insertions(+), 182 deletions(-) diff --git a/plugins/parsers/json/README.md b/plugins/parsers/json/README.md index 60e1f3f9e61d2..10bcf21bddf26 100644 --- a/plugins/parsers/json/README.md +++ b/plugins/parsers/json/README.md @@ -31,7 +31,7 @@ ignored unless specified in the `tag_key` or `json_string_fields` options. "my_tag_2" ] - ## String fields is an array of keys that should be added as string fields. + ## Array of glob pattern strings keys that should be added as string fields. json_string_fields = [] ## Name key is the key to use as the measurement name. diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index ebe31fd23972f..6df5d91966bf7 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -9,30 +9,61 @@ import ( "strings" "time" - "github.com/tidwall/gjson" - "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/filter" "github.com/influxdata/telegraf/internal" "github.com/influxdata/telegraf/metric" + "github.com/tidwall/gjson" ) var ( utf8BOM = []byte("\xef\xbb\xbf") ) -type JSONParser struct { - MetricName string - TagKeys []string - StringFields []string - JSONNameKey string - JSONQuery string - JSONTimeKey string - JSONTimeFormat string - JSONTimezone string - DefaultTags map[string]string +type Config struct { + MetricName string + TagKeys []string + NameKey string + StringFields []string + Query string + TimeKey string + TimeFormat string + Timezone string + DefaultTags map[string]string +} + +type Parser struct { + metricName string + tagKeys []string + stringFields filter.Filter + nameKey string + query string + timeKey string + timeFormat string + timezone string + defaultTags map[string]string +} + +func New(config *Config) (*Parser, error) { + stringFilter, err := filter.Compile(config.StringFields) + if err != nil { + return nil, err + } + + return &Parser{ + metricName: config.MetricName, + tagKeys: config.TagKeys, + nameKey: config.NameKey, + stringFields: stringFilter, + query: config.Query, + timeKey: config.TimeKey, + timeFormat: config.TimeFormat, + timezone: config.Timezone, + defaultTags: config.DefaultTags, + }, nil } -func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { +func (p *Parser) parseArray(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) var jsonOut []map[string]interface{} @@ -50,9 +81,9 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { return metrics, nil } -func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { +func (p *Parser) parseObject(metrics []telegraf.Metric, jsonOut map[string]interface{}) ([]telegraf.Metric, error) { tags := make(map[string]string) - for k, v := range p.DefaultTags { + for k, v := range p.defaultTags { tags[k] = v } @@ -62,33 +93,35 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return nil, err } + name := p.metricName + //checks if json_name_key is set - if p.JSONNameKey != "" { - switch field := f.Fields[p.JSONNameKey].(type) { + if p.nameKey != "" { + switch field := f.Fields[p.nameKey].(type) { case string: - p.MetricName = field + name = field } } //if time key is specified, set it to nTime nTime := time.Now().UTC() - if p.JSONTimeKey != "" { - if p.JSONTimeFormat == "" { + if p.timeKey != "" { + if p.timeFormat == "" { err := fmt.Errorf("use of 'json_time_key' requires 'json_time_format'") return nil, err } - if f.Fields[p.JSONTimeKey] == nil { + if f.Fields[p.timeKey] == nil { err := fmt.Errorf("JSON time key could not be found") return nil, err } - nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.JSONTimeKey], p.JSONTimeFormat, p.JSONTimezone) + nTime, err = internal.ParseTimestampWithLocation(f.Fields[p.timeKey], p.timeFormat, p.timezone) if err != nil { return nil, err } - delete(f.Fields, p.JSONTimeKey) + delete(f.Fields, p.timeKey) //if the year is 0, set to current year if nTime.Year() == 0 { @@ -97,7 +130,7 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i } tags, nFields := p.switchFieldToTag(tags, f.Fields) - metric, err := metric.New(p.MetricName, tags, nFields, nTime) + metric, err := metric.New(name, tags, nFields, nTime) if err != nil { return nil, err } @@ -108,8 +141,8 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i //search for TagKeys that match fieldnames and add them to tags //will delete any strings/bools that shouldn't be fields //assumes that any non-numeric values in TagKeys should be displayed as tags -func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) { - for _, name := range p.TagKeys { +func (p *Parser) switchFieldToTag(tags map[string]string, fields map[string]interface{}) (map[string]string, map[string]interface{}) { + for _, name := range p.tagKeys { //switch any fields in tagkeys into tags if fields[name] == nil { continue @@ -130,31 +163,23 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string] } //remove any additional string/bool values from fields - for k := range fields { - //check if field is in StringFields - sField := false - for _, v := range p.StringFields { - if v == k { - sField = true - } - } - if sField { - continue - } - - switch fields[k].(type) { + for fk := range fields { + switch fields[fk].(type) { case string: - delete(fields, k) + if p.stringFields != nil && p.stringFields.Match(fk) { + continue + } + delete(fields, fk) case bool: - delete(fields, k) + delete(fields, fk) } } return tags, fields } -func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { - if p.JSONQuery != "" { - result := gjson.GetBytes(buf, p.JSONQuery) +func (p *Parser) Parse(buf []byte) ([]telegraf.Metric, error) { + if p.query != "" { + result := gjson.GetBytes(buf, p.query) buf = []byte(result.Raw) if !result.IsArray() && !result.IsObject() { err := fmt.Errorf("E! Query path must lead to a JSON object or array of objects, but lead to: %v", result.Type) @@ -181,7 +206,7 @@ func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { return p.parseArray(buf) } -func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { +func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { metrics, err := p.Parse([]byte(line + "\n")) if err != nil { @@ -195,8 +220,8 @@ func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { return metrics[0], nil } -func (p *JSONParser) SetDefaultTags(tags map[string]string) { - p.DefaultTags = tags +func (p *Parser) SetDefaultTags(tags map[string]string) { + p.defaultTags = tags } type JSONFlattener struct { diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index 2db9ad78f196f..f656a96a132d8 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -53,9 +53,10 @@ const validJSONArrayTags = ` ` func TestParseValidJSON(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", - } + }) + require.NoError(t, err) // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) @@ -102,9 +103,10 @@ func TestParseValidJSON(t *testing.T) { } func TestParseLineValidJSON(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", - } + }) + require.NoError(t, err) // Most basic vanilla test metric, err := parser.ParseLine(validJSON) @@ -138,11 +140,12 @@ func TestParseLineValidJSON(t *testing.T) { } func TestParseInvalidJSON(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", - } + }) + require.NoError(t, err) - _, err := parser.Parse([]byte(invalidJSON)) + _, err = parser.Parse([]byte(invalidJSON)) require.Error(t, err) _, err = parser.Parse([]byte(invalidJSON2)) require.Error(t, err) @@ -152,10 +155,12 @@ func TestParseInvalidJSON(t *testing.T) { func TestParseWithTagKeys(t *testing.T) { // Test that strings not matching tag keys are ignored - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"wrongtagkey"}, - } + }) + require.NoError(t, err) + metrics, err := parser.Parse([]byte(validJSONTags)) require.NoError(t, err) require.Len(t, metrics, 1) @@ -167,10 +172,12 @@ func TestParseWithTagKeys(t *testing.T) { require.Equal(t, map[string]string{}, metrics[0].Tags()) // Test that single tag key is found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag"}, - } + }) + require.NoError(t, err) + metrics, err = parser.Parse([]byte(validJSONTags)) require.NoError(t, err) require.Len(t, metrics, 1) @@ -184,10 +191,11 @@ func TestParseWithTagKeys(t *testing.T) { }, metrics[0].Tags()) // Test that both tag keys are found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag", "othertag"}, - } + }) + require.NoError(t, err) metrics, err = parser.Parse([]byte(validJSONTags)) require.NoError(t, err) require.Len(t, metrics, 1) @@ -204,10 +212,11 @@ func TestParseWithTagKeys(t *testing.T) { func TestParseLineWithTagKeys(t *testing.T) { // Test that strings not matching tag keys are ignored - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"wrongtagkey"}, - } + }) + require.NoError(t, err) metric, err := parser.ParseLine(validJSONTags) require.NoError(t, err) require.Equal(t, "json_test", metric.Name()) @@ -218,10 +227,12 @@ func TestParseLineWithTagKeys(t *testing.T) { require.Equal(t, map[string]string{}, metric.Tags()) // Test that single tag key is found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag"}, - } + }) + require.NoError(t, err) + metric, err = parser.ParseLine(validJSONTags) require.NoError(t, err) require.Equal(t, "json_test", metric.Name()) @@ -234,10 +245,12 @@ func TestParseLineWithTagKeys(t *testing.T) { }, metric.Tags()) // Test that both tag keys are found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag", "othertag"}, - } + }) + require.NoError(t, err) + metric, err = parser.ParseLine(validJSONTags) require.NoError(t, err) require.Equal(t, "json_test", metric.Name()) @@ -252,13 +265,14 @@ func TestParseLineWithTagKeys(t *testing.T) { } func TestParseValidJSONDefaultTags(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag"}, DefaultTags: map[string]string{ "t4g": "default", }, - } + }) + require.NoError(t, err) // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) @@ -288,13 +302,14 @@ func TestParseValidJSONDefaultTags(t *testing.T) { // Test that default tags are overridden by tag keys func TestParseValidJSONDefaultTagsOverride(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"mytag"}, DefaultTags: map[string]string{ "mytag": "default", }, - } + }) + require.NoError(t, err) // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) @@ -323,9 +338,10 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) { // Test that json arrays can be parsed func TestParseValidJSONArray(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_array_test", - } + }) + require.NoError(t, err) // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSONArray)) @@ -358,10 +374,12 @@ func TestParseValidJSONArray(t *testing.T) { func TestParseArrayWithTagKeys(t *testing.T) { // Test that strings not matching tag keys are ignored - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_array_test", TagKeys: []string{"wrongtagkey"}, - } + }) + require.NoError(t, err) + metrics, err := parser.Parse([]byte(validJSONArrayTags)) require.NoError(t, err) require.Len(t, metrics, 2) @@ -380,10 +398,12 @@ func TestParseArrayWithTagKeys(t *testing.T) { require.Equal(t, map[string]string{}, metrics[1].Tags()) // Test that single tag key is found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_array_test", TagKeys: []string{"mytag"}, - } + }) + require.NoError(t, err) + metrics, err = parser.Parse([]byte(validJSONArrayTags)) require.NoError(t, err) require.Len(t, metrics, 2) @@ -406,10 +426,12 @@ func TestParseArrayWithTagKeys(t *testing.T) { }, metrics[1].Tags()) // Test that both tag keys are found and applied - parser = JSONParser{ + parser, err = New(&Config{ MetricName: "json_array_test", TagKeys: []string{"mytag", "othertag"}, - } + }) + require.NoError(t, err) + metrics, err = parser.Parse([]byte(validJSONArrayTags)) require.NoError(t, err) require.Len(t, metrics, 2) @@ -437,12 +459,13 @@ func TestParseArrayWithTagKeys(t *testing.T) { var jsonBOM = []byte("\xef\xbb\xbf[{\"value\":17}]") func TestHttpJsonBOM(t *testing.T) { - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", - } + }) + require.NoError(t, err) // Most basic vanilla test - _, err := parser.Parse(jsonBOM) + _, err = parser.Parse(jsonBOM) require.NoError(t, err) } @@ -466,15 +489,16 @@ func TestJSONParseNestedArray(t *testing.T) { } }` - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"}, - } + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields()) require.NoError(t, err) - require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags())) + require.Equal(t, 3, len(metrics[0].Tags())) } func TestJSONQueryErrorOnArray(t *testing.T) { @@ -494,13 +518,14 @@ func TestJSONQueryErrorOnArray(t *testing.T) { } }` - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{}, - JSONQuery: "shares.myArr", - } + Query: "shares.myArr", + }) + require.NoError(t, err) - _, err := parser.Parse([]byte(testString)) + _, err = parser.Parse([]byte(testString)) require.Error(t, err) } @@ -527,11 +552,12 @@ func TestArrayOfObjects(t *testing.T) { "more_stuff":"junk" }` - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", TagKeys: []string{"ice"}, - JSONQuery: "meta.shares", - } + Query: "meta.shares", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) @@ -553,12 +579,13 @@ func TestUseCaseJSONQuery(t *testing.T) { } }` - parser := JSONParser{ + parser, err := New(&Config{ MetricName: "json_test", StringFields: []string{"last"}, TagKeys: []string{"first"}, - JSONQuery: "obj.friends", - } + Query: "obj.friends", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) @@ -588,11 +615,12 @@ func TestTimeParser(t *testing.T) { } ]` - parser := JSONParser{ - MetricName: "json_test", - JSONTimeKey: "b_time", - JSONTimeFormat: "02 Jan 06 15:04 MST", - } + parser, err := New(&Config{ + MetricName: "json_test", + TimeKey: "b_time", + TimeFormat: "02 Jan 06 15:04 MST", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) require.Equal(t, 2, len(metrics)) @@ -604,12 +632,13 @@ func TestTimeParserWithTimezone(t *testing.T) { "time": "04 Jan 06 15:04" }` - parser := JSONParser{ - MetricName: "json_test", - JSONTimeKey: "time", - JSONTimeFormat: "02 Jan 06 15:04", - JSONTimezone: "America/New_York", - } + parser, err := New(&Config{ + MetricName: "json_test", + TimeKey: "time", + TimeFormat: "02 Jan 06 15:04", + Timezone: "America/New_York", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) require.Equal(t, 1, len(metrics)) @@ -638,11 +667,13 @@ func TestUnixTimeParser(t *testing.T) { } ]` - parser := JSONParser{ - MetricName: "json_test", - JSONTimeKey: "b_time", - JSONTimeFormat: "unix", - } + parser, err := New(&Config{ + MetricName: "json_test", + TimeKey: "b_time", + TimeFormat: "unix", + }) + require.NoError(t, err) + metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) require.Equal(t, 2, len(metrics)) @@ -671,11 +702,13 @@ func TestUnixMsTimeParser(t *testing.T) { } ]` - parser := JSONParser{ - MetricName: "json_test", - JSONTimeKey: "b_time", - JSONTimeFormat: "unix_ms", - } + parser, err := New(&Config{ + MetricName: "json_test", + TimeKey: "b_time", + TimeFormat: "unix_ms", + }) + require.NoError(t, err) + metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) require.Equal(t, 2, len(metrics)) @@ -693,11 +726,12 @@ func TestTimeErrors(t *testing.T) { "my_tag_2": "baz" }` - parser := JSONParser{ - MetricName: "json_test", - JSONTimeKey: "b_time", - JSONTimeFormat: "02 January 06 15:04 MST", - } + parser, err := New(&Config{ + MetricName: "json_test", + TimeKey: "b_time", + TimeFormat: "02 January 06 15:04 MST", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.Error(t, err) @@ -712,11 +746,12 @@ func TestTimeErrors(t *testing.T) { "my_tag_2": "baz" }` - parser = JSONParser{ - MetricName: "json_test", - JSONTimeKey: "b_time", - JSONTimeFormat: "02 January 06 15:04 MST", - } + parser, err = New(&Config{ + MetricName: "json_test", + TimeKey: "b_time", + TimeFormat: "02 January 06 15:04 MST", + }) + require.NoError(t, err) metrics, err = parser.Parse([]byte(testString2)) log.Printf("err: %v", err) @@ -736,9 +771,10 @@ func TestNameKey(t *testing.T) { "my_tag_2": "baz" }` - parser := JSONParser{ - JSONNameKey: "b_c", - } + parser, err := New(&Config{ + NameKey: "b_c", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(testString)) require.NoError(t, err) @@ -751,11 +787,12 @@ func TestTimeKeyDelete(t *testing.T) { "value": 42 }` - parser := JSONParser{ - MetricName: "json", - JSONTimeKey: "timestamp", - JSONTimeFormat: "unix", - } + parser, err := New(&Config{ + MetricName: "json", + TimeKey: "timestamp", + TimeFormat: "unix", + }) + require.NoError(t, err) metrics, err := parser.Parse([]byte(data)) require.NoError(t, err) @@ -768,3 +805,38 @@ func TestTimeKeyDelete(t *testing.T) { testutil.RequireMetricsEqual(t, expected, metrics) } + +func TestStringFieldGlob(t *testing.T) { + data := ` +{ + "color": "red", + "status": "error", + "time": "1541183052" +} +` + + parser, err := New(&Config{ + MetricName: "json", + StringFields: []string{"*"}, + TimeKey: "time", + TimeFormat: "unix", + }) + require.NoError(t, err) + + actual, err := parser.Parse([]byte(data)) + require.NoError(t, err) + + expected := []telegraf.Metric{ + testutil.MustMetric( + "json", + map[string]string{}, + map[string]interface{}{ + "color": "red", + "status": "error", + }, + time.Unix(1541183052, 0), + ), + } + + testutil.RequireMetricsEqual(t, expected, actual) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 2e8d20819396c..9e4ea2b1f8a43 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -70,7 +70,7 @@ type Config struct { // TagKeys only apply to JSON data TagKeys []string `toml:"tag_keys"` - // FieldKeys only apply to JSON + // Array of glob pattern strings keys that should be added as string fields. JSONStringFields []string `toml:"json_string_fields"` JSONNameKey string `toml:"json_name_key"` @@ -153,15 +153,19 @@ func NewParser(config *Config) (Parser, error) { var parser Parser switch config.DataFormat { case "json": - parser = newJSONParser(config.MetricName, - config.TagKeys, - config.JSONNameKey, - config.JSONStringFields, - config.JSONQuery, - config.JSONTimeKey, - config.JSONTimeFormat, - config.JSONTimezone, - config.DefaultTags) + parser, err = json.New( + &json.Config{ + MetricName: config.MetricName, + TagKeys: config.TagKeys, + NameKey: config.JSONNameKey, + StringFields: config.JSONStringFields, + Query: config.JSONQuery, + TimeKey: config.JSONTimeKey, + TimeFormat: config.JSONTimeFormat, + Timezone: config.JSONTimezone, + DefaultTags: config.DefaultTags, + }, + ) case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -283,31 +287,6 @@ func newCSVParser(metricName string, return parser, nil } -func newJSONParser( - metricName string, - tagKeys []string, - jsonNameKey string, - stringFields []string, - jsonQuery string, - timeKey string, - timeFormat string, - timezone string, - defaultTags map[string]string, -) Parser { - parser := &json.JSONParser{ - MetricName: metricName, - TagKeys: tagKeys, - StringFields: stringFields, - JSONNameKey: jsonNameKey, - JSONQuery: jsonQuery, - JSONTimeKey: timeKey, - JSONTimeFormat: timeFormat, - JSONTimezone: timezone, - DefaultTags: defaultTags, - } - return parser -} - func newGrokParser(metricName string, patterns []string, nPatterns []string, cPatterns string, cPatternFiles []string, @@ -326,19 +305,6 @@ func newGrokParser(metricName string, return &parser, err } -func NewJSONParser( - metricName string, - tagKeys []string, - defaultTags map[string]string, -) (Parser, error) { - parser := &json.JSONParser{ - MetricName: metricName, - TagKeys: tagKeys, - DefaultTags: defaultTags, - } - return parser, nil -} - func NewNagiosParser() (Parser, error) { return &nagios.NagiosParser{}, nil }