From 6298d2a9d5dc95ac137ea3ff4c5e63b559f79cf3 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 3 Feb 2016 13:13:03 -0700 Subject: [PATCH] MQTT Consumer ServiceInput plugin --- plugins/inputs/all/all.go | 1 + .../inputs/kafka_consumer/kafka_consumer.go | 31 +-- plugins/inputs/mqtt_consumer/README.md | 39 ++++ plugins/inputs/mqtt_consumer/mqtt_consumer.go | 201 ++++++++++++++++++ .../mqtt_consumer/mqtt_consumer_test.go | 1 + plugins/outputs/mqtt/mqtt.go | 3 - 6 files changed, 260 insertions(+), 16 deletions(-) create mode 100644 plugins/inputs/mqtt_consumer/README.md create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer.go create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer_test.go diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 52ab428f85a92..b3f652fdfd9a7 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" + _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 499b2e50bc8e1..42f046462be3f 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,8 +17,10 @@ type Kafka struct { Topics []string ZookeeperPeers []string Consumer *consumergroup.ConsumerGroup - PointBuffer int - Offset string + MetricBuffer int + // TODO remove PointBuffer, legacy support + PointBuffer int + Offset string sync.Mutex @@ -26,7 +28,7 @@ type Kafka struct { in <-chan *sarama.ConsumerMessage // channel for all kafka consumer errors errs <-chan *sarama.ConsumerError - // channel for all incoming parsed kafka points + // channel for all incoming parsed kafka metrics metricC chan telegraf.Metric done chan struct{} @@ -42,8 +44,8 @@ var sampleConfig = ` zookeeper_peers = ["localhost:2181"] # the name of the consumer group consumer_group = "telegraf_metrics_consumers" - # Maximum number of points to buffer between collection intervals - point_buffer = 100000 + # Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 # Offset (must be either "oldest" or "newest") offset = "oldest" ` @@ -90,10 +92,13 @@ func (k *Kafka) Start() error { } k.done = make(chan struct{}) - if k.PointBuffer == 0 { - k.PointBuffer = 100000 + if k.PointBuffer == 0 && k.MetricBuffer == 0 { + k.MetricBuffer = 100000 + } else if k.PointBuffer > 0 { + // Legacy support of PointBuffer field TODO remove + k.MetricBuffer = k.PointBuffer } - k.metricC = make(chan telegraf.Metric, k.PointBuffer) + k.metricC = make(chan telegraf.Metric, k.MetricBuffer) // Start the kafka message reader go k.parser() @@ -124,7 +129,7 @@ func (k *Kafka) parser() { continue default: log.Printf("Kafka Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -151,10 +156,10 @@ func (k *Kafka) Stop() { func (k *Kafka) Gather(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - npoints := len(k.metricC) - for i := 0; i < npoints; i++ { - point := <-k.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(k.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-k.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md new file mode 100644 index 0000000000000..422bfa03fbbbc --- /dev/null +++ b/plugins/inputs/mqtt_consumer/README.md @@ -0,0 +1,39 @@ +# mqtt_consumer Input Plugin + +The example plugin gathers metrics about example things + +### Configuration: + +``` +# Description +[[inputs.example]] + # SampleConfig +``` + +### Measurements & Fields: + + + +- measurement1 + - field1 (type, unit) + - field2 (float, percent) +- measurement2 + - field3 (integer, bytes) + +### Tags: + +- All measurements have the following tags: + - tag1 (optional description) + - tag2 +- measurement2 has the following tags: + - tag3 + +### Example Output: + +Give an example `-test` output here + +``` +$ ./telegraf -config telegraf.conf -input-filter example -test +measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455 +measurement2,tag1=foo,tag2=bar,tag3=baz field3=1i 1453831884664956455 +``` diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go new file mode 100644 index 0000000000000..2bc8ff99714c9 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -0,0 +1,201 @@ +package mqtt_consumer + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +) + +type MQTTConsumer struct { + Servers []string + Topics []string + Username string + Password string + MetricBuffer int + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + sync.Mutex + client *mqtt.Client + // channel for all incoming parsed mqtt metrics + metricC chan telegraf.Metric + done chan struct{} + in chan []byte +} + +var sampleConfig = ` + servers = ["localhost:1883"] + + ### Topics to subscribe to + topics = [ + "telegraf/host01/cpu", + "telegraf/host02/mem", + ] + + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 + + ### username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ### Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ### Use SSL but skip chain & host verification + # insecure_skip_verify = false +` + +func (m *MQTTConsumer) SampleConfig() string { + return sampleConfig +} + +func (m *MQTTConsumer) Description() string { + return "Read line-protocol metrics from MQTT topic(s)" +} + +func (m *MQTTConsumer) Start() error { + m.Lock() + defer m.Unlock() + + opts, err := m.createOpts() + if err != nil { + return err + } + + m.client = mqtt.NewClient(opts) + if token := m.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + m.in = make(chan []byte, m.MetricBuffer) + m.done = make(chan struct{}) + if m.MetricBuffer == 0 { + m.MetricBuffer = 100000 + } + m.metricC = make(chan telegraf.Metric, m.MetricBuffer) + + topics := make(map[string]byte) + for _, topic := range m.Topics { + topics[topic] = byte(0) + } + subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) + subscribeToken.Wait() + if subscribeToken.Error() != nil { + return subscribeToken.Error() + } + + go m.parser() + + return nil +} + +func (m *MQTTConsumer) parser() { + for { + select { + case <-m.done: + return + case msg := <-m.in: + metrics, err := telegraf.ParseMetrics(msg) + if err != nil { + log.Printf("Could not parse MQTT message: %s, error: %s", + string(msg), err.Error()) + } + + for _, metric := range metrics { + select { + case m.metricC <- metric: + continue + default: + log.Printf("MQTT Consumer buffer is full, dropping a metric." + + " You may want to increase the metric_buffer setting") + } + } + } + } +} + +func (m *MQTTConsumer) recvMessage(_ *mqtt.Client, msg mqtt.Message) { + m.in <- msg.Payload() +} + +func (m *MQTTConsumer) Stop() { + m.Lock() + defer m.Unlock() + close(m.done) + m.client.Disconnect(200) +} + +func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { + m.Lock() + defer m.Unlock() + nmetrics := len(m.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-m.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + return nil +} + +func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { + opts := mqtt.NewClientOptions() + + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + + tlsCfg, err := internal.GetTLSConfig( + m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) + if err != nil { + return nil, err + } + + scheme := "tcp" + if tlsCfg != nil { + scheme = "ssl" + opts.SetTLSConfig(tlsCfg) + } + + user := m.Username + if user == "" { + opts.SetUsername(user) + } + password := m.Password + if password != "" { + opts.SetPassword(password) + } + + if len(m.Servers) == 0 { + return opts, fmt.Errorf("could not get host infomations") + } + for _, host := range m.Servers { + server := fmt.Sprintf("%s://%s", scheme, host) + + opts.AddBroker(server) + } + opts.SetAutoReconnect(true) + // Setting KeepAlive to 0 disables it. + // TODO set KeepAlive to a real value (60s?) when this change is merged: + // https://git.eclipse.org/r/#/c/65850/ + opts.SetKeepAlive(time.Duration(0)) + return opts, nil +} + +func init() { + inputs.Add("mqtt_consumer", func() telegraf.Input { + return &MQTTConsumer{} + }) +} diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go new file mode 100644 index 0000000000000..816ca04d1b097 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -0,0 +1 @@ +package mqtt_consumer diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 5d2694ff3a205..a9559730867d6 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -11,9 +11,6 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" ) -const MaxRetryCount = 3 -const ClientIdPrefix = "telegraf" - type MQTT struct { Servers []string `toml:"servers"` Username string