From 039d2e48617cb49d6b266f8a6ec25e522fe40539 Mon Sep 17 00:00:00 2001 From: maxunt Date: Wed, 22 Aug 2018 19:26:48 -0700 Subject: [PATCH] Add name, time, path and string field options to JSON parser (#4351) --- docs/DATA_FORMATS_INPUT.md | 137 +++++- internal/config/config.go | 49 ++ internal/config/config_test.go | 5 +- plugins/inputs/exec/exec_test.go | 15 +- plugins/inputs/http/http_test.go | 24 +- plugins/inputs/httpjson/httpjson.go | 7 +- .../kafka_consumer/kafka_consumer_test.go | 5 +- .../kafka_consumer_legacy_test.go | 5 +- .../mqtt_consumer/mqtt_consumer_test.go | 5 +- .../nats_consumer/nats_consumer_test.go | 5 +- .../inputs/tcp_listener/tcp_listener_test.go | 5 +- .../inputs/udp_listener/udp_listener_test.go | 5 +- plugins/parsers/json/parser.go | 73 ++- plugins/parsers/json/parser_test.go | 447 +++++++++++++----- plugins/parsers/registry.go | 47 +- 15 files changed, 670 insertions(+), 164 deletions(-) diff --git a/docs/DATA_FORMATS_INPUT.md b/docs/DATA_FORMATS_INPUT.md index 7f7c94930e1e6..6e1b6a75178da 100644 --- a/docs/DATA_FORMATS_INPUT.md +++ b/docs/DATA_FORMATS_INPUT.md @@ -107,9 +107,31 @@ but can be overridden using the `name_override` config option. #### JSON Configuration: -The JSON data format supports specifying "tag keys". If specified, keys -will be searched for in the root-level of the JSON blob. If the key(s) exist, -they will be applied as tags to the Telegraf metrics. +The JSON data format supports specifying "tag_keys", "string_keys", and "json_query". +If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level +and any nested lists of the JSON blob. All int and float values are added to fields by default. +If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics. +If "string_keys" is specified, the string will be added as a field. + +The "json_query" configuration is a gjson path to an JSON object or +list of JSON objects. If this path leads to an array of values or +single data point an error will be thrown. If this configuration +is specified, only the result of the query will be parsed and returned as metrics. + +The "json_name_key" configuration specifies the key of the field whos value will be +added as the metric name. + +Object paths are specified using gjson path format, which is denoted by object keys +concatenated with "." to go deeper in nested JSON objects. +Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax + +The JSON data format also supports extracting time values through the +config "json_time_key" and "json_time_format". If "json_time_key" is set, +"json_time_format" must be specified. The "json_time_key" describes the +name of the field containing time information. The "json_time_format" +must be a recognized Go time format. +If there is no year provided, the metrics will have the current year. +More info on time formats can be found here: https://golang.org/pkg/time/#Parse For example, if you had this configuration: @@ -127,11 +149,28 @@ For example, if you had this configuration: ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md data_format = "json" - ## List of tag names to extract from top-level of JSON server response + ## List of tag names to extract from JSON server response tag_keys = [ "my_tag_1", "my_tag_2" ] + + ## The json path specifying where to extract the metric name from + # json_name_key = "" + + ## List of field names to extract from JSON and add as string fields + # json_string_fields = [] + + ## gjson query path to specify a specific chunk of JSON to be parsed with + ## the above configuration. If not specified, the whole file will be parsed. + ## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax + # json_query = "" + + ## holds the name of the tag of timestamp + # json_time_key = "" + + ## holds the format of timestamp to be parsed + # json_time_format = "" ``` with this JSON output from a command: @@ -152,8 +191,9 @@ Your Telegraf metrics would get tagged with "my_tag_1" exec_mycollector,my_tag_1=foo a=5,b_c=6 ``` -If the JSON data is an array, then each element of the array is parsed with the configured settings. -Each resulting metric will be output with the same timestamp. +If the JSON data is an array, then each element of the array is +parsed with the configured settings. Each resulting metric will +be output with the same timestamp. For example, if the following configuration: @@ -176,6 +216,19 @@ For example, if the following configuration: "my_tag_1", "my_tag_2" ] + + ## List of field names to extract from JSON and add as string fields + # string_fields = [] + + ## gjson query path to specify a specific chunk of JSON to be parsed with + ## the above configuration. If not specified, the whole file will be parsed + # json_query = "" + + ## holds the name of the tag of timestamp + json_time_key = "b_time" + + ## holds the format of timestamp to be parsed + json_time_format = "02 Jan 06 15:04 MST" ``` with this JSON output from a command: @@ -185,7 +238,8 @@ with this JSON output from a command: { "a": 5, "b": { - "c": 6 + "c": 6, + "time":"04 Jan 06 15:04 MST" }, "my_tag_1": "foo", "my_tag_2": "baz" @@ -193,7 +247,8 @@ with this JSON output from a command: { "a": 7, "b": { - "c": 8 + "c": 8, + "time":"11 Jan 07 15:04 MST" }, "my_tag_1": "bar", "my_tag_2": "baz" @@ -201,11 +256,71 @@ with this JSON output from a command: ] ``` -Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" +Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c" +The metric's time will be a time.Time object, as specified by "b_time" + +``` +exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000 +exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000 +``` + +If you want to only use a specific portion of your JSON, use the "json_query" +configuration to specify a path to a JSON object. + +For example, with the following config: +```toml +[[inputs.exec]] + ## Commands array + commands = ["/usr/bin/mycollector --foo=bar"] + + ## measurement name suffix (for separating different commands) + name_suffix = "_mycollector" + + ## Data format to consume. + ## Each data format has its own unique set of configuration options, read + ## more about them here: + ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md + data_format = "json" + + ## List of tag names to extract from top-level of JSON server response + tag_keys = ["first"] + + ## List of field names to extract from JSON and add as string fields + string_fields = ["last"] + + ## gjson query path to specify a specific chunk of JSON to be parsed with + ## the above configuration. If not specified, the whole file will be parsed + json_query = "obj.friends" + + ## holds the name of the tag of timestamp + # json_time_key = "" + + ## holds the format of timestamp to be parsed + # json_time_format = "" +``` + +with this JSON as input: +```json +{ + "obj": { + "name": {"first": "Tom", "last": "Anderson"}, + "age":37, + "children": ["Sara","Alex","Jack"], + "fav.movie": "Deer Hunter", + "friends": [ + {"first": "Dale", "last": "Murphy", "age": 44}, + {"first": "Roger", "last": "Craig", "age": 68}, + {"first": "Jane", "last": "Murphy", "age": 47} + ] + } +} +``` +You would recieve 3 metrics tagged with "first", and fielded with "last" and "age" ``` -exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6 -exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8 +exec_mycollector, "first":"Dale" "last":"Murphy","age":44 +exec_mycollector, "first":"Roger" "last":"Craig","age":68 +exec_mycollector, "first":"Jane" "last":"Murphy","age":47 ``` # Value: diff --git a/internal/config/config.go b/internal/config/config.go index 21c71d94695c2..5926f6132533f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1261,6 +1261,50 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { } } + if node, ok := tbl.Fields["json_string_fields"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if ary, ok := kv.Value.(*ast.Array); ok { + for _, elem := range ary.Value { + if str, ok := elem.(*ast.String); ok { + c.JSONStringFields = append(c.JSONStringFields, str.Value) + } + } + } + } + } + + if node, ok := tbl.Fields["json_name_key"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.JSONNameKey = str.Value + } + } + } + + if node, ok := tbl.Fields["json_query"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.JSONQuery = str.Value + } + } + } + + if node, ok := tbl.Fields["json_time_key"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.JSONTimeKey = str.Value + } + } + } + + if node, ok := tbl.Fields["json_time_format"]; ok { + if kv, ok := node.(*ast.KeyValue); ok { + if str, ok := kv.Value.(*ast.String); ok { + c.JSONTimeFormat = str.Value + } + } + } + if node, ok := tbl.Fields["data_type"]; ok { if kv, ok := node.(*ast.KeyValue); ok { if str, ok := kv.Value.(*ast.String); ok { @@ -1405,6 +1449,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) { delete(tbl.Fields, "separator") delete(tbl.Fields, "templates") delete(tbl.Fields, "tag_keys") + delete(tbl.Fields, "string_fields") + delete(tbl.Fields, "json_query") + delete(tbl.Fields, "json_name_key") + delete(tbl.Fields, "json_time_key") + delete(tbl.Fields, "json_time_format") delete(tbl.Fields, "data_type") delete(tbl.Fields, "collectd_auth_file") delete(tbl.Fields, "collectd_security_level") diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 3498d815d0078..b136fec8cd6e2 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) { "Testdata did not produce correct memcached metadata.") ex := inputs.Inputs["exec"]().(*exec.Exec) - p, err := parsers.NewJSONParser("exec", nil, nil) + p, err := parsers.NewParser(&parsers.Config{ + MetricName: "exec", + DataFormat: "json", + }) assert.NoError(t, err) ex.SetParser(p) ex.Command = "/usr/bin/myothercollector --foo=bar" diff --git a/plugins/inputs/exec/exec_test.go b/plugins/inputs/exec/exec_test.go index c7c181b179f99..0bfeece54feb7 100644 --- a/plugins/inputs/exec/exec_test.go +++ b/plugins/inputs/exec/exec_test.go @@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by } func TestExec(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "exec", + }) e := &Exec{ runner: newRunnerMock([]byte(validJson), nil), Commands: []string{"testcommand arg1"}, @@ -119,7 +122,10 @@ func TestExec(t *testing.T) { } func TestExecMalformed(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "exec", + }) e := &Exec{ runner: newRunnerMock([]byte(malformedJson), nil), Commands: []string{"badcommand arg1"}, @@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) { } func TestCommandError(t *testing.T) { - parser, _ := parsers.NewJSONParser("exec", []string{}, nil) + parser, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "exec", + }) e := &Exec{ runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")), Commands: []string{"badcommand"}, diff --git a/plugins/inputs/http/http_test.go b/plugins/inputs/http/http_test.go index 486edabc94911..4cd465bceb1a4 100644 --- a/plugins/inputs/http/http_test.go +++ b/plugins/inputs/http/http_test.go @@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) { URLs: []string{url}, } metricName := "metricName" - p, _ := parsers.NewJSONParser(metricName, nil, nil) + + p, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) plugin.SetParser(p) var acc testutil.Accumulator @@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) { URLs: []string{url}, Headers: map[string]string{header: headerValue}, } - metricName := "metricName" - p, _ := parsers.NewJSONParser(metricName, nil, nil) + + p, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) plugin.SetParser(p) var acc testutil.Accumulator @@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) { } metricName := "metricName" - p, _ := parsers.NewJSONParser(metricName, nil, nil) + p, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: metricName, + }) plugin.SetParser(p) var acc testutil.Accumulator @@ -105,8 +115,10 @@ func TestMethod(t *testing.T) { Method: "POST", } - metricName := "metricName" - p, _ := parsers.NewJSONParser(metricName, nil, nil) + p, _ := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "metricName", + }) plugin.SetParser(p) var acc testutil.Accumulator diff --git a/plugins/inputs/httpjson/httpjson.go b/plugins/inputs/httpjson/httpjson.go index c7324dee4330a..e09eafc94c16a 100644 --- a/plugins/inputs/httpjson/httpjson.go +++ b/plugins/inputs/httpjson/httpjson.go @@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer( "server": serverURL, } - parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags) + parser, err := parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: msrmnt_name, + TagKeys: h.TagKeys, + DefaultTags: tags, + }) if err != nil { return err } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer_test.go b/plugins/inputs/kafka_consumer/kafka_consumer_test.go index 9a585d6ede4ca..18f7f80bead79 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer_test.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer_test.go @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) { k.acc = &acc defer close(k.done) - k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + k.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "kafka_json_test", + }) go k.receiver() in <- saramaMsg(testMsgJSON) acc.Wait(1) diff --git a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go index 630aca163a12e..38bc48290a37e 100644 --- a/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go +++ b/plugins/inputs/kafka_consumer_legacy/kafka_consumer_legacy_test.go @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) { k.acc = &acc defer close(k.done) - k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil) + k.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "kafka_json_test", + }) go k.receiver() in <- saramaMsg(testMsgJSON) acc.Wait(1) diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go index eb5e3048cdd54..a2e5deaa8a94a 100644 --- a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.acc = &acc defer close(n.done) - n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + n.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "nats_json_test", + }) go n.receiver() in <- mqttMsg(testMsgJSON) diff --git a/plugins/inputs/nats_consumer/nats_consumer_test.go b/plugins/inputs/nats_consumer/nats_consumer_test.go index a0b84ff2e97fb..a1f499554c392 100644 --- a/plugins/inputs/nats_consumer/nats_consumer_test.go +++ b/plugins/inputs/nats_consumer/nats_consumer_test.go @@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) { n.acc = &acc defer close(n.done) - n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + n.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "nats_json_test", + }) n.wg.Add(1) go n.receiver() in <- natsMsg(testMsgJSON) diff --git a/plugins/inputs/tcp_listener/tcp_listener_test.go b/plugins/inputs/tcp_listener/tcp_listener_test.go index 1063cb5c142de..6ff40ad87d6ab 100644 --- a/plugins/inputs/tcp_listener/tcp_listener_test.go +++ b/plugins/inputs/tcp_listener/tcp_listener_test.go @@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) { listener.acc = &acc defer close(listener.done) - listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "udp_json_test", + }) listener.wg.Add(1) go listener.tcpParser() diff --git a/plugins/inputs/udp_listener/udp_listener_test.go b/plugins/inputs/udp_listener/udp_listener_test.go index e0e0e862ee0f1..49115434a7911 100644 --- a/plugins/inputs/udp_listener/udp_listener_test.go +++ b/plugins/inputs/udp_listener/udp_listener_test.go @@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) { listener.acc = &acc defer close(listener.done) - listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil) + listener.parser, _ = parsers.NewParser(&parsers.Config{ + DataFormat: "json", + MetricName: "udp_json_test", + }) listener.wg.Add(1) go listener.udpParser() diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index 62d17c76aca75..9fb0816fe040f 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -11,6 +11,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" + "github.com/tidwall/gjson" ) var ( @@ -18,9 +19,14 @@ var ( ) type JSONParser struct { - MetricName string - TagKeys []string - DefaultTags map[string]string + MetricName string + TagKeys []string + StringFields []string + JSONNameKey string + JSONQuery string + JSONTimeKey string + JSONTimeFormat string + DefaultTags map[string]string } func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { @@ -34,6 +40,9 @@ func (p *JSONParser) parseArray(buf []byte) ([]telegraf.Metric, error) { } for _, item := range jsonOut { metrics, err = p.parseObject(metrics, item) + if err != nil { + return nil, err + } } return metrics, nil } @@ -51,10 +60,42 @@ func (p *JSONParser) parseObject(metrics []telegraf.Metric, jsonOut map[string]i return nil, err } - tags, nFields := p.switchFieldToTag(tags, f.Fields) + //checks if json_name_key is set + if p.JSONNameKey != "" { + p.MetricName = f.Fields[p.JSONNameKey].(string) + } - metric, err := metric.New(p.MetricName, tags, nFields, time.Now().UTC()) + //if time key is specified, set it to nTime + nTime := time.Now().UTC() + if p.JSONTimeKey != "" { + if p.JSONTimeFormat == "" { + err := fmt.Errorf("use of 'json_time_key' requires 'json_time_format'") + return nil, err + } + if f.Fields[p.JSONTimeKey] == nil { + err := fmt.Errorf("JSON time key could not be found") + return nil, err + } + + timeStr, ok := f.Fields[p.JSONTimeKey].(string) + if !ok { + err := fmt.Errorf("time: %v could not be converted to string", f.Fields[p.JSONTimeKey]) + return nil, err + } + nTime, err = time.Parse(p.JSONTimeFormat, timeStr) + if err != nil { + return nil, err + } + + //if the year is 0, set to current year + if nTime.Year() == 0 { + nTime = nTime.AddDate(time.Now().Year(), 0, 0) + } + } + + tags, nFields := p.switchFieldToTag(tags, f.Fields) + metric, err := metric.New(p.MetricName, tags, nFields, nTime) if err != nil { return nil, err } @@ -88,6 +129,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string] //remove any additional string/bool values from fields for k := range fields { + //check if field is in StringFields + sField := false + for _, v := range p.StringFields { + if v == k { + sField = true + } + } + if sField { + continue + } + switch fields[k].(type) { case string: delete(fields, k) @@ -99,6 +151,15 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string] } func (p *JSONParser) Parse(buf []byte) ([]telegraf.Metric, error) { + if p.JSONQuery != "" { + result := gjson.GetBytes(buf, p.JSONQuery) + buf = []byte(result.Raw) + if !result.IsArray() && !result.IsObject() { + err := fmt.Errorf("E! Query path must lead to a JSON object or array of objects, but lead to: %v", result.Type) + return nil, err + } + } + buf = bytes.TrimSpace(buf) buf = bytes.TrimPrefix(buf, utf8BOM) if len(buf) == 0 { @@ -126,7 +187,7 @@ func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { } if len(metrics) < 1 { - return nil, fmt.Errorf("Can not parse the line: %s, for data format: influx ", line) + return nil, fmt.Errorf("can not parse the line: %s, for data format: json ", line) } return metrics[0], nil diff --git a/plugins/parsers/json/parser_test.go b/plugins/parsers/json/parser_test.go index c26b209a2e10f..39e43bece15e2 100644 --- a/plugins/parsers/json/parser_test.go +++ b/plugins/parsers/json/parser_test.go @@ -1,9 +1,10 @@ package json import ( + "fmt" + "log" "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -55,46 +56,46 @@ func TestParseValidJSON(t *testing.T) { // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) // Test that newlines are fine metrics, err = parser.Parse([]byte(validJSONNewline)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "d": float64(7), "b_d": float64(8), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) // Test that strings without TagKeys defined are ignored metrics, err = parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) // Test that whitespace only will parse as an empty list of metrics metrics, err = parser.Parse([]byte("\n\t")) - assert.NoError(t, err) - assert.Len(t, metrics, 0) + require.NoError(t, err) + require.Len(t, metrics, 0) // Test that an empty string will parse as an empty list of metrics metrics, err = parser.Parse([]byte("")) - assert.NoError(t, err) - assert.Len(t, metrics, 0) + require.NoError(t, err) + require.Len(t, metrics, 0) } func TestParseLineValidJSON(t *testing.T) { @@ -104,33 +105,33 @@ func TestParseLineValidJSON(t *testing.T) { // Most basic vanilla test metric, err := parser.ParseLine(validJSON) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metric.Fields()) - assert.Equal(t, map[string]string{}, metric.Tags()) + require.Equal(t, map[string]string{}, metric.Tags()) // Test that newlines are fine metric, err = parser.ParseLine(validJSONNewline) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "d": float64(7), "b_d": float64(8), }, metric.Fields()) - assert.Equal(t, map[string]string{}, metric.Tags()) + require.Equal(t, map[string]string{}, metric.Tags()) // Test that strings without TagKeys defined are ignored metric, err = parser.ParseLine(validJSONTags) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metric.Fields()) - assert.Equal(t, map[string]string{}, metric.Tags()) + require.Equal(t, map[string]string{}, metric.Tags()) } func TestParseInvalidJSON(t *testing.T) { @@ -139,11 +140,11 @@ func TestParseInvalidJSON(t *testing.T) { } _, err := parser.Parse([]byte(invalidJSON)) - assert.Error(t, err) + require.Error(t, err) _, err = parser.Parse([]byte(invalidJSON2)) - assert.Error(t, err) + require.Error(t, err) _, err = parser.ParseLine(invalidJSON) - assert.Error(t, err) + require.Error(t, err) } func TestParseWithTagKeys(t *testing.T) { @@ -153,14 +154,14 @@ func TestParseWithTagKeys(t *testing.T) { TagKeys: []string{"wrongtagkey"}, } metrics, err := parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) // Test that single tag key is found and applied parser = JSONParser{ @@ -168,14 +169,14 @@ func TestParseWithTagKeys(t *testing.T) { TagKeys: []string{"mytag"}, } metrics, err = parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foobar", }, metrics[0].Tags()) @@ -185,14 +186,14 @@ func TestParseWithTagKeys(t *testing.T) { TagKeys: []string{"mytag", "othertag"}, } metrics, err = parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foobar", "othertag": "baz", }, metrics[0].Tags()) @@ -205,13 +206,13 @@ func TestParseLineWithTagKeys(t *testing.T) { TagKeys: []string{"wrongtagkey"}, } metric, err := parser.ParseLine(validJSONTags) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metric.Fields()) - assert.Equal(t, map[string]string{}, metric.Tags()) + require.Equal(t, map[string]string{}, metric.Tags()) // Test that single tag key is found and applied parser = JSONParser{ @@ -219,13 +220,13 @@ func TestParseLineWithTagKeys(t *testing.T) { TagKeys: []string{"mytag"}, } metric, err = parser.ParseLine(validJSONTags) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metric.Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foobar", }, metric.Tags()) @@ -235,13 +236,13 @@ func TestParseLineWithTagKeys(t *testing.T) { TagKeys: []string{"mytag", "othertag"}, } metric, err = parser.ParseLine(validJSONTags) - assert.NoError(t, err) - assert.Equal(t, "json_test", metric.Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Equal(t, "json_test", metric.Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metric.Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foobar", "othertag": "baz", }, metric.Tags()) @@ -258,25 +259,25 @@ func TestParseValidJSONDefaultTags(t *testing.T) { // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{"t4g": "default"}, metrics[0].Tags()) + require.Equal(t, map[string]string{"t4g": "default"}, metrics[0].Tags()) // Test that tagkeys and default tags are applied metrics, err = parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "t4g": "default", "mytag": "foobar", }, metrics[0].Tags()) @@ -294,25 +295,25 @@ func TestParseValidJSONDefaultTagsOverride(t *testing.T) { // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSON)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{"mytag": "default"}, metrics[0].Tags()) + require.Equal(t, map[string]string{"mytag": "default"}, metrics[0].Tags()) // Test that tagkeys override default tags metrics, err = parser.Parse([]byte(validJSONTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foobar", }, metrics[0].Tags()) } @@ -325,31 +326,31 @@ func TestParseValidJSONArray(t *testing.T) { // Most basic vanilla test metrics, err := parser.Parse([]byte(validJSONArray)) - assert.NoError(t, err) - assert.Len(t, metrics, 1) - assert.Equal(t, "json_array_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 1) + require.Equal(t, "json_array_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) // Basic multiple datapoints metrics, err = parser.Parse([]byte(validJSONArrayMultiple)) - assert.NoError(t, err) - assert.Len(t, metrics, 2) - assert.Equal(t, "json_array_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 2) + require.Equal(t, "json_array_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[1].Tags()) - assert.Equal(t, "json_array_test", metrics[1].Name()) - assert.Equal(t, map[string]interface{}{ + require.Equal(t, map[string]string{}, metrics[1].Tags()) + require.Equal(t, "json_array_test", metrics[1].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(7), "b_c": float64(8), }, metrics[1].Fields()) - assert.Equal(t, map[string]string{}, metrics[1].Tags()) + require.Equal(t, map[string]string{}, metrics[1].Tags()) } func TestParseArrayWithTagKeys(t *testing.T) { @@ -359,21 +360,21 @@ func TestParseArrayWithTagKeys(t *testing.T) { TagKeys: []string{"wrongtagkey"}, } metrics, err := parser.Parse([]byte(validJSONArrayTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 2) - assert.Equal(t, "json_array_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 2) + require.Equal(t, "json_array_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{}, metrics[0].Tags()) + require.Equal(t, map[string]string{}, metrics[0].Tags()) - assert.Equal(t, "json_array_test", metrics[1].Name()) - assert.Equal(t, map[string]interface{}{ + require.Equal(t, "json_array_test", metrics[1].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(7), "b_c": float64(8), }, metrics[1].Fields()) - assert.Equal(t, map[string]string{}, metrics[1].Tags()) + require.Equal(t, map[string]string{}, metrics[1].Tags()) // Test that single tag key is found and applied parser = JSONParser{ @@ -381,23 +382,23 @@ func TestParseArrayWithTagKeys(t *testing.T) { TagKeys: []string{"mytag"}, } metrics, err = parser.Parse([]byte(validJSONArrayTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 2) - assert.Equal(t, "json_array_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 2) + require.Equal(t, "json_array_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foo", }, metrics[0].Tags()) - assert.Equal(t, "json_array_test", metrics[1].Name()) - assert.Equal(t, map[string]interface{}{ + require.Equal(t, "json_array_test", metrics[1].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(7), "b_c": float64(8), }, metrics[1].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "bar", }, metrics[1].Tags()) @@ -407,24 +408,24 @@ func TestParseArrayWithTagKeys(t *testing.T) { TagKeys: []string{"mytag", "othertag"}, } metrics, err = parser.Parse([]byte(validJSONArrayTags)) - assert.NoError(t, err) - assert.Len(t, metrics, 2) - assert.Equal(t, "json_array_test", metrics[0].Name()) - assert.Equal(t, map[string]interface{}{ + require.NoError(t, err) + require.Len(t, metrics, 2) + require.Equal(t, "json_array_test", metrics[0].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(5), "b_c": float64(6), }, metrics[0].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "foo", "othertag": "baz", }, metrics[0].Tags()) - assert.Equal(t, "json_array_test", metrics[1].Name()) - assert.Equal(t, map[string]interface{}{ + require.Equal(t, "json_array_test", metrics[1].Name()) + require.Equal(t, map[string]interface{}{ "a": float64(7), "b_c": float64(8), }, metrics[1].Fields()) - assert.Equal(t, map[string]string{ + require.Equal(t, map[string]string{ "mytag": "bar", "othertag": "baz", }, metrics[1].Tags()) @@ -439,7 +440,7 @@ func TestHttpJsonBOM(t *testing.T) { // Most basic vanilla test _, err := parser.Parse(jsonBOM) - assert.NoError(t, err) + require.NoError(t, err) } //for testing issue #4260 @@ -448,22 +449,212 @@ func TestJSONParseNestedArray(t *testing.T) { "total_devices": 5, "total_threads": 10, "shares": { - "total": 5, - "accepted": 5, - "rejected": 0, - "avg_find_time": 4, - "tester": "work", - "tester2": "don't want this", - "tester3": 7.93 + "total": 5, + "accepted": 5, + "rejected": 0, + "avg_find_time": 4, + "tester": "work", + "tester2": "don't want this", + "tester3": { + "hello":"sup", + "fun":"money", + "break":9 + } } }` parser := JSONParser{ MetricName: "json_test", - TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"}, + TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"}, } metrics, err := parser.Parse([]byte(testString)) + log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields()) require.NoError(t, err) require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags())) } + +func TestJSONQueryErrorOnArray(t *testing.T) { + testString := `{ + "total_devices": 5, + "total_threads": 10, + "shares": { + "total": 5, + "accepted": 6, + "test_string": "don't want this", + "test_obj": { + "hello":"sup", + "fun":"money", + "break":9 + }, + "myArr":[4,5,6] + } + }` + + parser := JSONParser{ + MetricName: "json_test", + TagKeys: []string{}, + JSONQuery: "shares.myArr", + } + + _, err := parser.Parse([]byte(testString)) + require.Error(t, err) +} + +func TestArrayOfObjects(t *testing.T) { + testString := `{ + "meta": { + "info":9, + "shares": [{ + "channel": 6, + "time": 1130, + "ice":"man" + }, + { + "channel": 5, + "time": 1030, + "ice":"bucket" + }, + { + "channel": 10, + "time": 330, + "ice":"cream" + }] + }, + "more_stuff":"junk" + }` + + parser := JSONParser{ + MetricName: "json_test", + TagKeys: []string{"ice"}, + JSONQuery: "meta.shares", + } + + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, 3, len(metrics)) +} + +func TestUseCaseJSONQuery(t *testing.T) { + testString := `{ + "obj": { + "name": {"first": "Tom", "last": "Anderson"}, + "age":37, + "children": ["Sara","Alex","Jack"], + "fav.movie": "Deer Hunter", + "friends": [ + {"first": "Dale", "last": "Murphy", "age": 44}, + {"first": "Roger", "last": "Craig", "age": 68}, + {"first": "Jane", "last": "Murphy", "age": 47} + ] + } + }` + + parser := JSONParser{ + MetricName: "json_test", + StringFields: []string{"last"}, + TagKeys: []string{"first"}, + JSONQuery: "obj.friends", + } + + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, 3, len(metrics)) + require.Equal(t, metrics[0].Fields()["last"], "Murphy") +} + +func TestTimeParser(t *testing.T) { + testString := `[ + { + "a": 5, + "b": { + "c": 6, + "time":"04 Jan 06 15:04 MST" + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }, + { + "a": 7, + "b": { + "c": 8, + "time":"11 Jan 07 15:04 MST" + }, + "my_tag_1": "bar", + "my_tag_2": "baz" + } + ]` + + parser := JSONParser{ + MetricName: "json_test", + JSONTimeKey: "b_time", + JSONTimeFormat: "02 Jan 06 15:04 MST", + } + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, 2, len(metrics)) + require.Equal(t, false, metrics[0].Time() == metrics[1].Time()) +} + +func TestTimeErrors(t *testing.T) { + testString := `{ + "a": 5, + "b": { + "c": 6, + "time":"04 Jan 06 15:04 MST" + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }` + + parser := JSONParser{ + MetricName: "json_test", + JSONTimeKey: "b_time", + JSONTimeFormat: "02 January 06 15:04 MST", + } + + metrics, err := parser.Parse([]byte(testString)) + require.Error(t, err) + require.Equal(t, 0, len(metrics)) + + testString2 := `{ + "a": 5, + "b": { + "c": 6 + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }` + + parser = JSONParser{ + MetricName: "json_test", + JSONTimeKey: "b_time", + JSONTimeFormat: "02 January 06 15:04 MST", + } + + metrics, err = parser.Parse([]byte(testString2)) + log.Printf("err: %v", err) + require.Error(t, err) + require.Equal(t, 0, len(metrics)) + require.Equal(t, fmt.Errorf("JSON time key could not be found"), err) +} + +func TestNameKey(t *testing.T) { + testString := `{ + "a": 5, + "b": { + "c": "this is my name", + "time":"04 Jan 06 15:04 MST" + }, + "my_tag_1": "foo", + "my_tag_2": "baz" + }` + + parser := JSONParser{ + JSONNameKey: "b_c", + } + + metrics, err := parser.Parse([]byte(testString)) + require.NoError(t, err) + require.Equal(t, "this is my name", metrics[0].Name()) +} diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index e198cb2cb96c6..89fdc9a105022 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -59,9 +59,22 @@ type Config struct { // TagKeys only apply to JSON data TagKeys []string + // FieldKeys only apply to JSON + JSONStringFields []string + + JSONNameKey string // MetricName applies to JSON & value. This will be the name of the measurement. MetricName string + // holds a gjson path for json parser + JSONQuery string + + // key of time + JSONTimeKey string + + // time format + JSONTimeFormat string + // Authentication file for collectd CollectdAuthFile string // One of none (default), sign, or encrypt @@ -108,8 +121,14 @@ func NewParser(config *Config) (Parser, error) { var parser Parser switch config.DataFormat { case "json": - parser, err = NewJSONParser(config.MetricName, - config.TagKeys, config.DefaultTags) + parser = newJSONParser(config.MetricName, + config.TagKeys, + config.JSONNameKey, + config.JSONStringFields, + config.JSONQuery, + config.JSONTimeKey, + config.JSONTimeFormat, + config.DefaultTags) case "value": parser, err = NewValueParser(config.MetricName, config.DataType, config.DefaultTags) @@ -151,6 +170,30 @@ func NewParser(config *Config) (Parser, error) { return parser, err } +func newJSONParser( + metricName string, + tagKeys []string, + jsonNameKey string, + stringFields []string, + jsonQuery string, + timeKey string, + timeFormat string, + defaultTags map[string]string, +) Parser { + parser := &json.JSONParser{ + MetricName: metricName, + TagKeys: tagKeys, + StringFields: stringFields, + JSONNameKey: jsonNameKey, + JSONQuery: jsonQuery, + JSONTimeKey: timeKey, + JSONTimeFormat: timeFormat, + DefaultTags: defaultTags, + } + return parser +} + +//Deprecated: Use NewParser to get a JSONParser object func newGrokParser(metricName string, patterns []string, nPatterns []string,