Skip to content

Commit

Permalink
Allow exec plugin to parse line-protocol
Browse files Browse the repository at this point in the history
closes #613
  • Loading branch information
sparrc committed Jan 29, 2016
1 parent 338341a commit 4f6ee8c
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 34 deletions.
3 changes: 3 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package telegraf

import (
"bytes"
"time"

"github.com/influxdata/influxdb/client/v2"
Expand Down Expand Up @@ -68,6 +69,8 @@ func NewMetric(
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func ParseMetrics(buf []byte) ([]Metric, error) {
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
points, err := models.ParsePoints(buf)
metrics := make([]Metric, len(points))
for i, point := range points {
Expand Down
73 changes: 55 additions & 18 deletions plugins/inputs/exec/README.md
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
# Exec Plugin
# Exec Input Plugin

The exec plugin can execute arbitrary commands which output JSON. Then it flattens JSON and finds
all numeric values, treating them as floats.
The exec plugin can execute arbitrary commands which output JSON or
InfluxDB [line-protocol](https://docs.influxdata.com/influxdb/v0.9/write_protocols/line/).

For example, if you have a json-returning command called mycollector, you could
setup the exec plugin with:
If using JSON, only numeric values are parsed and turned into floats. Booleans
and strings will be ignored.

### Configuration

```
# Read flattened metrics from one or more commands that output JSON to stdout
[[inputs.exec]]
command = "/usr/bin/mycollector --output=json"
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
interval = "10s"
```

The name suffix is appended to exec as "exec_name_suffix" to identify the input stream.
Other options for modifying the measurement names are:

The interval is used to determine how often a particular command should be run. Each
time the exec plugin runs, it will only run a particular command if it has been at least
`interval` seconds since the exec plugin last ran the command.
```
name_override = "measurement_name"
name_prefix = "prefix_"
```

### Example 1

# Sample
Let's say that we have the above configuration, and mycollector outputs the
following JSON:

Let's say that we have a command with the name_suffix "_mycollector", which gives the following output:
```json
{
"a": 0.5,
Expand All @@ -33,13 +44,39 @@ Let's say that we have a command with the name_suffix "_mycollector", which give
}
```

The collected metrics will be stored as field values under the same measurement "exec_mycollector":
The collected metrics will be stored as fields under the measurement
"exec_mycollector":

```
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
exec_mycollector a=0.5,b_c=0.1,b_d=5 1452815002357578567
```

Other options for modifying the measurement names are:
### Example 2

Now let's say we have the following configuration:

```
name_override = "newname"
name_prefix = "prefix_"
[[inputs.exec]]
# the command to run
command = "/usr/bin/line_protocol_collector"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "influx"
```

And line_protocol_collector outputs the following line protocol:

```
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
```

You will get data in InfluxDB exactly as it is defined above,
tags are cpu=cpuN, host=foo, and datacenter=us-east with fields usage_idle
and usage_busy. They will receive a timestamp at collection time.
47 changes: 31 additions & 16 deletions plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"os/exec"
"time"

"github.com/gonuts/go-shellquote"

Expand All @@ -14,18 +15,20 @@ import (
)

const sampleConfig = `
# NOTE This plugin only reads numerical measurements, strings and booleans
# will be ignored.
# the command to run
command = "/usr/bin/mycollector --foo=bar"
# Data format to consume. This can be "json" or "influx" (line-protocol)
# NOTE json only reads numerical measurements, strings and booleans are ignored.
data_format = "json"
# measurement name suffix (for separating different commands)
name_suffix = "_mycollector"
`

type Exec struct {
Command string
Command string
DataFormat string

runner Runner
}
Expand Down Expand Up @@ -71,20 +74,32 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
return err
}

var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
switch e.DataFormat {
case "", "json":
var jsonOut interface{}
err = json.Unmarshal(out, &jsonOut)
if err != nil {
return fmt.Errorf("exec: unable to parse output of '%s' as JSON, %s",
e.Command, err)
}

f := internal.JSONFlattener{}
err = f.FlattenJSON("", jsonOut)
if err != nil {
return err
}
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
return err
default:
return fmt.Errorf("Unsupported data format: %s. Must be either json "+
"or influx.", e.DataFormat)
}

acc.AddFields("exec", f.Fields, nil)
return nil
}

Expand Down
73 changes: 73 additions & 0 deletions plugins/inputs/exec/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@ const malformedJson = `
"status": "green",
`

const lineProtocol = "cpu,host=foo,datacenter=us-east usage_idle=99,usage_busy=1"

const lineProtocolMulti = `
cpu,cpu=cpu0,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu1,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu2,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu3,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu4,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu5,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
cpu,cpu=cpu6,host=foo,datacenter=us-east usage_idle=99,usage_busy=1
`

type runnerMock struct {
out []byte
err error
Expand Down Expand Up @@ -97,3 +109,64 @@ func TestCommandError(t *testing.T) {
require.Error(t, err)
assert.Equal(t, acc.NFields(), 0, "No new points should have been added")
}

func TestLineProtocolParse(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}

func TestLineProtocolParseMultiple(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocolMulti), nil),
Command: "line-protocol",
DataFormat: "influx",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"usage_idle": float64(99),
"usage_busy": float64(1),
}
tags := map[string]string{
"host": "foo",
"datacenter": "us-east",
}
cpuTags := []string{"cpu0", "cpu1", "cpu2", "cpu3", "cpu4", "cpu5", "cpu6"}

for _, cpu := range cpuTags {
tags["cpu"] = cpu
acc.AssertContainsTaggedFields(t, "cpu", fields, tags)
}
}

func TestInvalidDataFormat(t *testing.T) {
e := &Exec{
runner: newRunnerMock([]byte(lineProtocol), nil),
Command: "bad data format",
DataFormat: "FooBar",
}

var acc testutil.Accumulator
err := e.Gather(&acc)
require.Error(t, err)
}

0 comments on commit 4f6ee8c

Please sign in to comment.