Skip to content

Commit

Permalink
Add name, time, path and string field options to JSON parser (influxd…
Browse files Browse the repository at this point in the history
  • Loading branch information
maxunt authored and rgitzel committed Oct 17, 2018
1 parent d46db66 commit f6c1d43
Show file tree
Hide file tree
Showing 15 changed files with 670 additions and 164 deletions.
137 changes: 126 additions & 11 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,31 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
The JSON data format supports specifying "tag_keys", "string_keys", and "json_query".
If specified, keys in "tag_keys" and "string_keys" will be searched for in the root-level
and any nested lists of the JSON blob. All int and float values are added to fields by default.
If the key(s) exist, they will be applied as tags or fields to the Telegraf metrics.
If "string_keys" is specified, the string will be added as a field.

The "json_query" configuration is a gjson path to an JSON object or
list of JSON objects. If this path leads to an array of values or
single data point an error will be thrown. If this configuration
is specified, only the result of the query will be parsed and returned as metrics.

The "json_name_key" configuration specifies the key of the field whos value will be
added as the metric name.

Object paths are specified using gjson path format, which is denoted by object keys
concatenated with "." to go deeper in nested JSON objects.
Additional information on gjson paths can be found here: https://github.com/tidwall/gjson#path-syntax

The JSON data format also supports extracting time values through the
config "json_time_key" and "json_time_format". If "json_time_key" is set,
"json_time_format" must be specified. The "json_time_key" describes the
name of the field containing time information. The "json_time_format"
must be a recognized Go time format.
If there is no year provided, the metrics will have the current year.
More info on time formats can be found here: https://golang.org/pkg/time/#Parse

For example, if you had this configuration:

Expand All @@ -127,11 +149,28 @@ For example, if you had this configuration:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
## List of tag names to extract from JSON server response
tag_keys = [
"my_tag_1",
"my_tag_2"
]

## The json path specifying where to extract the metric name from
# json_name_key = ""

## List of field names to extract from JSON and add as string fields
# json_string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed.
## gjson query paths are described here: https://github.com/tidwall/gjson#path-syntax
# json_query = ""

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON output from a command:
Expand All @@ -152,8 +191,9 @@ Your Telegraf metrics would get tagged with "my_tag_1"
exec_mycollector,my_tag_1=foo a=5,b_c=6
```

If the JSON data is an array, then each element of the array is parsed with the configured settings.
Each resulting metric will be output with the same timestamp.
If the JSON data is an array, then each element of the array is
parsed with the configured settings. Each resulting metric will
be output with the same timestamp.

For example, if the following configuration:

Expand All @@ -176,6 +216,19 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]

## List of field names to extract from JSON and add as string fields
# string_fields = []

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
# json_query = ""

## holds the name of the tag of timestamp
json_time_key = "b_time"

## holds the format of timestamp to be parsed
json_time_format = "02 Jan 06 15:04 MST"
```

with this JSON output from a command:
Expand All @@ -185,27 +238,89 @@ with this JSON output from a command:
{
"a": 5,
"b": {
"c": 6
"c": 6,
"time":"04 Jan 06 15:04 MST"
},
"my_tag_1": "foo",
"my_tag_2": "baz"
},
{
"a": 7,
"b": {
"c": 8
"c": 8,
"time":"11 Jan 07 15:04 MST"
},
"my_tag_1": "bar",
"my_tag_2": "baz"
}
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"
The metric's time will be a time.Time object, as specified by "b_time"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6 1136387040000000000
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8 1168527840000000000
```

If you want to only use a specific portion of your JSON, use the "json_query"
configuration to specify a path to a JSON object.

For example, with the following config:
```toml
[[inputs.exec]]
## Commands array
commands = ["/usr/bin/mycollector --foo=bar"]

## measurement name suffix (for separating different commands)
name_suffix = "_mycollector"

## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "json"

## List of tag names to extract from top-level of JSON server response
tag_keys = ["first"]

## List of field names to extract from JSON and add as string fields
string_fields = ["last"]

## gjson query path to specify a specific chunk of JSON to be parsed with
## the above configuration. If not specified, the whole file will be parsed
json_query = "obj.friends"

## holds the name of the tag of timestamp
# json_time_key = ""

## holds the format of timestamp to be parsed
# json_time_format = ""
```

with this JSON as input:
```json
{
"obj": {
"name": {"first": "Tom", "last": "Anderson"},
"age":37,
"children": ["Sara","Alex","Jack"],
"fav.movie": "Deer Hunter",
"friends": [
{"first": "Dale", "last": "Murphy", "age": 44},
{"first": "Roger", "last": "Craig", "age": 68},
{"first": "Jane", "last": "Murphy", "age": 47}
]
}
}
```
You would recieve 3 metrics tagged with "first", and fielded with "last" and "age"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
exec_mycollector, "first":"Dale" "last":"Murphy","age":44
exec_mycollector, "first":"Roger" "last":"Craig","age":68
exec_mycollector, "first":"Jane" "last":"Murphy","age":47
```

# Value:
Expand Down
49 changes: 49 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,50 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["json_string_fields"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.JSONStringFields = append(c.JSONStringFields, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["json_name_key"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONNameKey = str.Value
}
}
}

if node, ok := tbl.Fields["json_query"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONQuery = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_key"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeKey = str.Value
}
}
}

if node, ok := tbl.Fields["json_time_format"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
c.JSONTimeFormat = str.Value
}
}
}

if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand Down Expand Up @@ -1405,6 +1449,11 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "string_fields")
delete(tbl.Fields, "json_query")
delete(tbl.Fields, "json_name_key")
delete(tbl.Fields, "json_time_key")
delete(tbl.Fields, "json_time_format")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
Expand Down
5 changes: 4 additions & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")

ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"
Expand Down
15 changes: 12 additions & 3 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}

func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
Expand All @@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
}

func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
Expand All @@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
}

func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
Expand Down
24 changes: 18 additions & 6 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down Expand Up @@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST",
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}

parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Loading

0 comments on commit f6c1d43

Please sign in to comment.