Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/1363: Additional configuration for JSON parser #4351

Merged
merged 21 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions docs/DATA_FORMATS_INPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ but can be overridden using the `name_override` config option.

#### JSON Configuration:

The JSON data format supports specifying "tag keys". If specified, keys
will be searched for in the root-level of the JSON blob. If the key(s) exist,
they will be applied as tags to the Telegraf metrics.
The JSON data format supports specifying "tag keys" and "field keys". If specified, keys
will be searched for in the root-level and any nested lists of the JSON blob. If the key(s) exist,
they will be applied as tags or fields to the Telegraf metrics. If "field_keys" is not specified,
all int and float values will be set as fields by default.

For example, if you had this configuration:

Expand Down Expand Up @@ -173,6 +174,7 @@ For example, if the following configuration:
"my_tag_1",
"my_tag_2"
]
field_keys = ["b_c"]
```

with this JSON output from a command:
Expand All @@ -198,11 +200,11 @@ with this JSON output from a command:
]
```

Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2"
Your Telegraf metrics would get tagged with "my_tag_1" and "my_tag_2" and fielded with "b_c"

```
exec_mycollector,my_tag_1=foo,my_tag_2=baz a=5,b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz a=7,b_c=8
exec_mycollector,my_tag_1=foo,my_tag_2=baz b_c=6
exec_mycollector,my_tag_1=bar,my_tag_2=baz b_c=8
```

# Value:
Expand Down
13 changes: 13 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,18 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
}
}

if node, ok := tbl.Fields["field_keys"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if ary, ok := kv.Value.(*ast.Array); ok {
for _, elem := range ary.Value {
if str, ok := elem.(*ast.String); ok {
c.FieldKeys = append(c.FieldKeys, str.Value)
}
}
}
}
}

if node, ok := tbl.Fields["data_type"]; ok {
if kv, ok := node.(*ast.KeyValue); ok {
if str, ok := kv.Value.(*ast.String); ok {
Expand Down Expand Up @@ -1344,6 +1356,7 @@ func buildParser(name string, tbl *ast.Table) (parsers.Parser, error) {
delete(tbl.Fields, "separator")
delete(tbl.Fields, "templates")
delete(tbl.Fields, "tag_keys")
delete(tbl.Fields, "field_keys")
delete(tbl.Fields, "data_type")
delete(tbl.Fields, "collectd_auth_file")
delete(tbl.Fields, "collectd_security_level")
Expand Down
5 changes: 4 additions & 1 deletion internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ func TestConfig_LoadDirectory(t *testing.T) {
"Testdata did not produce correct memcached metadata.")

ex := inputs.Inputs["exec"]().(*exec.Exec)
p, err := parsers.NewJSONParser("exec", nil, nil)
p, err := parsers.NewParser(&parsers.Config{
MetricName: "exec",
DataFormat: "json",
})
assert.NoError(t, err)
ex.SetParser(p)
ex.Command = "/usr/bin/myothercollector --foo=bar"
Expand Down
15 changes: 12 additions & 3 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func (r runnerMock) Run(e *Exec, command string, acc telegraf.Accumulator) ([]by
}

func TestExec(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(validJson), nil),
Commands: []string{"testcommand arg1"},
Expand All @@ -119,7 +122,10 @@ func TestExec(t *testing.T) {
}

func TestExecMalformed(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock([]byte(malformedJson), nil),
Commands: []string{"badcommand arg1"},
Expand All @@ -132,7 +138,10 @@ func TestExecMalformed(t *testing.T) {
}

func TestCommandError(t *testing.T) {
parser, _ := parsers.NewJSONParser("exec", []string{}, nil)
parser, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "exec",
})
e := &Exec{
runner: newRunnerMock(nil, fmt.Errorf("exit status code 1")),
Commands: []string{"badcommand"},
Expand Down
24 changes: 18 additions & 6 deletions plugins/inputs/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ func TestHTTPwithJSONFormat(t *testing.T) {
URLs: []string{url},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down Expand Up @@ -63,8 +67,11 @@ func TestHTTPHeaders(t *testing.T) {
URLs: []string{url},
Headers: map[string]string{header: headerValue},
}
metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)

p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -83,7 +90,10 @@ func TestInvalidStatusCode(t *testing.T) {
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: metricName,
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand All @@ -105,8 +115,10 @@ func TestMethod(t *testing.T) {
Method: "POST",
}

metricName := "metricName"
p, _ := parsers.NewJSONParser(metricName, nil, nil)
p, _ := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "metricName",
})
plugin.SetParser(p)

var acc testutil.Accumulator
Expand Down
7 changes: 6 additions & 1 deletion plugins/inputs/httpjson/httpjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,12 @@ func (h *HttpJson) gatherServer(
"server": serverURL,
}

parser, err := parsers.NewJSONParser(msrmnt_name, h.TagKeys, tags)
parser, err := parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: msrmnt_name,
TagKeys: h.TagKeys,
DefaultTags: tags,
})
if err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
k.acc = &acc
defer close(k.done)

k.parser, _ = parsers.NewJSONParser("kafka_json_test", []string{}, nil)
k.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "kafka_json_test",
})
go k.receiver()
in <- saramaMsg(testMsgJSON)
acc.Wait(1)
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/mqtt_consumer/mqtt_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
go n.receiver()
in <- mqttMsg(testMsgJSON)

Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/nats_consumer/nats_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,10 @@ func TestRunParserAndGatherJSON(t *testing.T) {
n.acc = &acc
defer close(n.done)

n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil)
n.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "nats_json_test",
})
n.wg.Add(1)
go n.receiver()
in <- natsMsg(testMsgJSON)
Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/tcp_listener/tcp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1)
go listener.tcpParser()

Expand Down
5 changes: 4 additions & 1 deletion plugins/inputs/udp_listener/udp_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,10 @@ func TestRunParserJSONMsg(t *testing.T) {
listener.acc = &acc
defer close(listener.done)

listener.parser, _ = parsers.NewJSONParser("udp_json_test", []string{}, nil)
listener.parser, _ = parsers.NewParser(&parsers.Config{
DataFormat: "json",
MetricName: "udp_json_test",
})
listener.wg.Add(1)
go listener.udpParser()

Expand Down
12 changes: 12 additions & 0 deletions plugins/parsers/json/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var (
type JSONParser struct {
MetricName string
TagKeys []string
FieldKeys []string
DefaultTags map[string]string
}

Expand Down Expand Up @@ -86,6 +87,17 @@ func (p *JSONParser) switchFieldToTag(tags map[string]string, fields map[string]
}
}

//if field_keys is specified, only those values should be reported as fields
if len(p.FieldKeys) > 0 {
nFields := make(map[string]interface{})
for _, name := range p.FieldKeys {
if fields[name] != nil {
nFields[name] = fields[name]
}
}
return tags, nFields
}

//remove any additional string/bool values from fields
for k := range fields {
switch fields[k].(type) {
Expand Down
23 changes: 15 additions & 8 deletions plugins/parsers/json/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package json

import (
"log"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -448,22 +449,28 @@ func TestJSONParseNestedArray(t *testing.T) {
"total_devices": 5,
"total_threads": 10,
"shares": {
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": 7.93
"total": 5,
"accepted": 5,
"rejected": 0,
"avg_find_time": 4,
"tester": "work",
"tester2": "don't want this",
"tester3": {
"hello":"sup",
"fun":"money",
"break":9
}
}
}`

parser := JSONParser{
MetricName: "json_test",
TagKeys: []string{"total_devices", "total_threads", "shares_tester", "shares_tester3"},
TagKeys: []string{"total_devices", "total_threads", "shares_tester3_fun"},
FieldKeys: []string{"shares_tester", "shares_tester3_break"},
}

metrics, err := parser.Parse([]byte(testString))
log.Printf("m[0] name: %v, tags: %v, fields: %v", metrics[0].Name(), metrics[0].Tags(), metrics[0].Fields())
require.NoError(t, err)
require.Equal(t, len(parser.TagKeys), len(metrics[0].Tags()))
}
22 changes: 20 additions & 2 deletions plugins/parsers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ type Config struct {

// TagKeys only apply to JSON data
TagKeys []string
// FieldKeys only apply to JSON
FieldKeys []string
// MetricName applies to JSON & value. This will be the name of the measurement.
MetricName string

Expand Down Expand Up @@ -95,8 +97,8 @@ func NewParser(config *Config) (Parser, error) {
var parser Parser
switch config.DataFormat {
case "json":
parser, err = NewJSONParser(config.MetricName,
config.TagKeys, config.DefaultTags)
parser, err = newJSONParser(config.MetricName,
config.TagKeys, config.FieldKeys, config.DefaultTags)
case "value":
parser, err = NewValueParser(config.MetricName,
config.DataType, config.DefaultTags)
Expand Down Expand Up @@ -126,6 +128,22 @@ func NewParser(config *Config) (Parser, error) {
return parser, err
}

func newJSONParser(
metricName string,
tagKeys []string,
fieldKeys []string,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we want to avoid changing exported function signatures, as it destabilizes packages. Perhaps what we could do is add a newJSONParser that takes different (or functional) parameters. We could then update those things that call NewJSONParser directly to call NewParser, which will in turn call newJSONParser.

defaultTags map[string]string,
) (Parser, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error isn't useful here. Where it's an unexported function not matching any interface, I'd remove it, or remove the function altogether and just instantiate a JSONParser on line 120

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error is ever returned by the new_Parser functions, but i thought i'd include it here to match the format of the rest of the parsers

parser := &json.JSONParser{
MetricName: metricName,
TagKeys: tagKeys,
FieldKeys: fieldKeys,
DefaultTags: defaultTags,
}
return parser, nil
}

//Deprecated: Use NewParser to get a JSONParser object
func NewJSONParser(
metricName string,
tagKeys []string,
Expand Down