Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: MQTT Consumer ServiceInput plugin #644

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,34 @@ func NewMetric(
}, nil
}

// MetricParser is an object for Parsing incoming metrics.
type MetricParser struct {
// DefaultTags will be added to every parsed metric
DefaultTags map[string]string
}

func NewMetricParser() *MetricParser {
return &MetricParser{}
}

// ParseMetrics returns a slice of Metrics from a text representation of a
// metric (in line-protocol format)
// with each metric separated by newlines. If any metrics fail to parse,
// a non-nil error will be returned in addition to the metrics that parsed
// successfully.
func ParseMetrics(buf []byte) ([]Metric, error) {
func (mp *MetricParser) Parse(buf []byte) ([]Metric, error) {
// parse even if the buffer begins with a newline
buf = bytes.TrimPrefix(buf, []byte("\n"))
points, err := models.ParsePoints(buf)
metrics := make([]Metric, len(points))
for i, point := range points {
tags := point.Tags()
for k, v := range mp.DefaultTags {
tags[k] = v
}
// Ignore error here because it's impossible that a model.Point
// wouldn't parse into client.Point properly
metrics[i], _ = NewMetric(point.Name(), point.Tags(),
metrics[i], _ = NewMetric(point.Name(), tags,
point.Fields(), point.Time())
}
return metrics, err
Expand Down
6 changes: 3 additions & 3 deletions metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ cpu,host usage_idle=99
`

func TestParseValidMetrics(t *testing.T) {
metrics, err := ParseMetrics([]byte(validMs))
metrics, err := NewMetricParser().Parse([]byte(validMs))
assert.NoError(t, err)
assert.Len(t, metrics, 1)
m := metrics[0]
Expand All @@ -50,13 +50,13 @@ func TestParseValidMetrics(t *testing.T) {
}

func TestParseInvalidMetrics(t *testing.T) {
metrics, err := ParseMetrics([]byte(invalidMs))
metrics, err := NewMetricParser().Parse([]byte(invalidMs))
assert.Error(t, err)
assert.Len(t, metrics, 0)
}

func TestParseValidAndInvalidMetrics(t *testing.T) {
metrics, err := ParseMetrics([]byte(validInvalidMs))
metrics, err := NewMetricParser().Parse([]byte(validInvalidMs))
assert.Error(t, err)
assert.Len(t, metrics, 3)
}
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mongodb"
_ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/mysql"
_ "github.com/influxdata/telegraf/plugins/inputs/nginx"
_ "github.com/influxdata/telegraf/plugins/inputs/nsq"
Expand Down
2 changes: 1 addition & 1 deletion plugins/inputs/exec/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error {
acc.AddFields("exec", f.Fields, nil)
case "influx":
now := time.Now()
metrics, err := telegraf.ParseMetrics(out)
metrics, err := telegraf.NewMetricParser().Parse(out)
for _, metric := range metrics {
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now)
}
Expand Down
33 changes: 19 additions & 14 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,18 @@ type Kafka struct {
Topics []string
ZookeeperPeers []string
Consumer *consumergroup.ConsumerGroup
PointBuffer int
Offset string
MetricBuffer int
// TODO remove PointBuffer, legacy support
PointBuffer int
Offset string

sync.Mutex

// channel for all incoming kafka messages
in <-chan *sarama.ConsumerMessage
// channel for all kafka consumer errors
errs <-chan *sarama.ConsumerError
// channel for all incoming parsed kafka points
// channel for all incoming parsed kafka metrics
metricC chan telegraf.Metric
done chan struct{}

Expand All @@ -42,8 +44,8 @@ var sampleConfig = `
zookeeper_peers = ["localhost:2181"]
# the name of the consumer group
consumer_group = "telegraf_metrics_consumers"
# Maximum number of points to buffer between collection intervals
point_buffer = 100000
# Maximum number of metrics to buffer between collection intervals
metric_buffer = 100000
# Offset (must be either "oldest" or "newest")
offset = "oldest"
`
Expand Down Expand Up @@ -90,10 +92,13 @@ func (k *Kafka) Start() error {
}

k.done = make(chan struct{})
if k.PointBuffer == 0 {
k.PointBuffer = 100000
if k.PointBuffer == 0 && k.MetricBuffer == 0 {
k.MetricBuffer = 100000
} else if k.PointBuffer > 0 {
// Legacy support of PointBuffer field TODO remove
k.MetricBuffer = k.PointBuffer
}
k.metricC = make(chan telegraf.Metric, k.PointBuffer)
k.metricC = make(chan telegraf.Metric, k.MetricBuffer)

// Start the kafka message reader
go k.parser()
Expand All @@ -112,7 +117,7 @@ func (k *Kafka) parser() {
case err := <-k.errs:
log.Printf("Kafka Consumer Error: %s\n", err.Error())
case msg := <-k.in:
metrics, err := telegraf.ParseMetrics(msg.Value)
metrics, err := telegraf.NewMetricParser().Parse(msg.Value)
if err != nil {
log.Printf("Could not parse kafka message: %s, error: %s",
string(msg.Value), err.Error())
Expand All @@ -124,7 +129,7 @@ func (k *Kafka) parser() {
continue
default:
log.Printf("Kafka Consumer buffer is full, dropping a metric." +
" You may want to increase the point_buffer setting")
" You may want to increase the metric_buffer setting")
}
}

Expand All @@ -151,10 +156,10 @@ func (k *Kafka) Stop() {
func (k *Kafka) Gather(acc telegraf.Accumulator) error {
k.Lock()
defer k.Unlock()
npoints := len(k.metricC)
for i := 0; i < npoints; i++ {
point := <-k.metricC
acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time())
nmetrics := len(k.metricC)
for i := 0; i < nmetrics; i++ {
metric := <-k.metricC
acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time())
}
return nil
}
Expand Down
39 changes: 39 additions & 0 deletions plugins/inputs/mqtt_consumer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# mqtt_consumer Input Plugin

The example plugin gathers metrics about example things

### Configuration:

```
# Description
[[inputs.example]]
# SampleConfig
```

### Measurements & Fields:

<optional description>

- measurement1
- field1 (type, unit)
- field2 (float, percent)
- measurement2
- field3 (integer, bytes)

### Tags:

- All measurements have the following tags:
- tag1 (optional description)
- tag2
- measurement2 has the following tags:
- tag3

### Example Output:

Give an example `-test` output here

```
$ ./telegraf -config telegraf.conf -input-filter example -test
measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455
measurement2,tag1=foo,tag2=bar,tag3=baz field3=1i 1453831884664956455
```
Loading