From d80f47d7f22e16492eb467a8840affe051830c04 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 3 Feb 2016 13:13:03 -0700 Subject: [PATCH] MQTT Consumer ServiceInput plugin --- metric.go | 18 +- metric_test.go | 6 +- plugins/inputs/all/all.go | 1 + plugins/inputs/exec/exec.go | 2 +- .../inputs/kafka_consumer/kafka_consumer.go | 33 +-- plugins/inputs/mqtt_consumer/README.md | 39 ++++ plugins/inputs/mqtt_consumer/mqtt_consumer.go | 211 ++++++++++++++++++ .../mqtt_consumer/mqtt_consumer_test.go | 1 + plugins/outputs/mqtt/mqtt.go | 11 +- 9 files changed, 298 insertions(+), 24 deletions(-) create mode 100644 plugins/inputs/mqtt_consumer/README.md create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer.go create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer_test.go diff --git a/metric.go b/metric.go index 99ee30369629b..7aecd2c1fd3fc 100644 --- a/metric.go +++ b/metric.go @@ -63,20 +63,34 @@ func NewMetric( }, nil } +// MetricParser is an object for Parsing incoming metrics. +type MetricParser struct { + // DefaultTags will be added to every parsed metric + DefaultTags map[string]string +} + +func NewMetricParser() *MetricParser { + return &MetricParser{} +} + // ParseMetrics returns a slice of Metrics from a text representation of a // metric (in line-protocol format) // with each metric separated by newlines. If any metrics fail to parse, // a non-nil error will be returned in addition to the metrics that parsed // successfully. -func ParseMetrics(buf []byte) ([]Metric, error) { +func (mp *MetricParser) Parse(buf []byte) ([]Metric, error) { // parse even if the buffer begins with a newline buf = bytes.TrimPrefix(buf, []byte("\n")) points, err := models.ParsePoints(buf) metrics := make([]Metric, len(points)) for i, point := range points { + tags := point.Tags() + for k, v := range mp.DefaultTags { + tags[k] = v + } // Ignore error here because it's impossible that a model.Point // wouldn't parse into client.Point properly - metrics[i], _ = NewMetric(point.Name(), point.Tags(), + metrics[i], _ = NewMetric(point.Name(), tags, point.Fields(), point.Time()) } return metrics, err diff --git a/metric_test.go b/metric_test.go index acf6dee994d72..2e88719c9a893 100644 --- a/metric_test.go +++ b/metric_test.go @@ -28,7 +28,7 @@ cpu,host usage_idle=99 ` func TestParseValidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validMs)) + metrics, err := NewMetricParser().Parse([]byte(validMs)) assert.NoError(t, err) assert.Len(t, metrics, 1) m := metrics[0] @@ -50,13 +50,13 @@ func TestParseValidMetrics(t *testing.T) { } func TestParseInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(invalidMs)) + metrics, err := NewMetricParser().Parse([]byte(invalidMs)) assert.Error(t, err) assert.Len(t, metrics, 0) } func TestParseValidAndInvalidMetrics(t *testing.T) { - metrics, err := ParseMetrics([]byte(validInvalidMs)) + metrics, err := NewMetricParser().Parse([]byte(validInvalidMs)) assert.Error(t, err) assert.Len(t, metrics, 3) } diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 52ab428f85a92..b3f652fdfd9a7 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -19,6 +19,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" + _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" _ "github.com/influxdata/telegraf/plugins/inputs/nsq" diff --git a/plugins/inputs/exec/exec.go b/plugins/inputs/exec/exec.go index c4bb634bafbde..baed125e8cb86 100644 --- a/plugins/inputs/exec/exec.go +++ b/plugins/inputs/exec/exec.go @@ -91,7 +91,7 @@ func (e *Exec) Gather(acc telegraf.Accumulator) error { acc.AddFields("exec", f.Fields, nil) case "influx": now := time.Now() - metrics, err := telegraf.ParseMetrics(out) + metrics, err := telegraf.NewMetricParser().Parse(out) for _, metric := range metrics { acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), now) } diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 499b2e50bc8e1..40d9fbd63b1c7 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -17,8 +17,10 @@ type Kafka struct { Topics []string ZookeeperPeers []string Consumer *consumergroup.ConsumerGroup - PointBuffer int - Offset string + MetricBuffer int + // TODO remove PointBuffer, legacy support + PointBuffer int + Offset string sync.Mutex @@ -26,7 +28,7 @@ type Kafka struct { in <-chan *sarama.ConsumerMessage // channel for all kafka consumer errors errs <-chan *sarama.ConsumerError - // channel for all incoming parsed kafka points + // channel for all incoming parsed kafka metrics metricC chan telegraf.Metric done chan struct{} @@ -42,8 +44,8 @@ var sampleConfig = ` zookeeper_peers = ["localhost:2181"] # the name of the consumer group consumer_group = "telegraf_metrics_consumers" - # Maximum number of points to buffer between collection intervals - point_buffer = 100000 + # Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 # Offset (must be either "oldest" or "newest") offset = "oldest" ` @@ -90,10 +92,13 @@ func (k *Kafka) Start() error { } k.done = make(chan struct{}) - if k.PointBuffer == 0 { - k.PointBuffer = 100000 + if k.PointBuffer == 0 && k.MetricBuffer == 0 { + k.MetricBuffer = 100000 + } else if k.PointBuffer > 0 { + // Legacy support of PointBuffer field TODO remove + k.MetricBuffer = k.PointBuffer } - k.metricC = make(chan telegraf.Metric, k.PointBuffer) + k.metricC = make(chan telegraf.Metric, k.MetricBuffer) // Start the kafka message reader go k.parser() @@ -112,7 +117,7 @@ func (k *Kafka) parser() { case err := <-k.errs: log.Printf("Kafka Consumer Error: %s\n", err.Error()) case msg := <-k.in: - metrics, err := telegraf.ParseMetrics(msg.Value) + metrics, err := telegraf.NewMetricParser().Parse(msg.Value) if err != nil { log.Printf("Could not parse kafka message: %s, error: %s", string(msg.Value), err.Error()) @@ -124,7 +129,7 @@ func (k *Kafka) parser() { continue default: log.Printf("Kafka Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -151,10 +156,10 @@ func (k *Kafka) Stop() { func (k *Kafka) Gather(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - npoints := len(k.metricC) - for i := 0; i < npoints; i++ { - point := <-k.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(k.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-k.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md new file mode 100644 index 0000000000000..422bfa03fbbbc --- /dev/null +++ b/plugins/inputs/mqtt_consumer/README.md @@ -0,0 +1,39 @@ +# mqtt_consumer Input Plugin + +The example plugin gathers metrics about example things + +### Configuration: + +``` +# Description +[[inputs.example]] + # SampleConfig +``` + +### Measurements & Fields: + + + +- measurement1 + - field1 (type, unit) + - field2 (float, percent) +- measurement2 + - field3 (integer, bytes) + +### Tags: + +- All measurements have the following tags: + - tag1 (optional description) + - tag2 +- measurement2 has the following tags: + - tag3 + +### Example Output: + +Give an example `-test` output here + +``` +$ ./telegraf -config telegraf.conf -input-filter example -test +measurement1,tag1=foo,tag2=bar field1=1i,field2=2.1 1453831884664956455 +measurement2,tag1=foo,tag2=bar,tag3=baz field3=1i 1453831884664956455 +``` diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go new file mode 100644 index 0000000000000..3d5b818ab9dd8 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -0,0 +1,211 @@ +package mqtt_consumer + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +) + +type MQTTConsumer struct { + Servers []string + Topics []string + Username string + Password string + MetricBuffer int + QoS int `toml:"qos"` + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + sync.Mutex + client *mqtt.Client + // channel for all incoming parsed mqtt metrics + metricC chan telegraf.Metric + done chan struct{} + in chan mqtt.Message +} + +var sampleConfig = ` + servers = ["localhost:1883"] + ### MQTT QoS, must be 0, 1, or 2 + qos = 0 + + ### Topics to subscribe to + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 + + ### username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ### Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ### Use SSL but skip chain & host verification + # insecure_skip_verify = false +` + +func (m *MQTTConsumer) SampleConfig() string { + return sampleConfig +} + +func (m *MQTTConsumer) Description() string { + return "Read line-protocol metrics from MQTT topic(s)" +} + +func (m *MQTTConsumer) Start() error { + m.Lock() + defer m.Unlock() + if m.QoS > 2 || m.QoS < 0 { + return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) + } + + opts, err := m.createOpts() + if err != nil { + return err + } + + m.client = mqtt.NewClient(opts) + if token := m.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + m.in = make(chan mqtt.Message, m.MetricBuffer) + m.done = make(chan struct{}) + if m.MetricBuffer == 0 { + m.MetricBuffer = 100000 + } + m.metricC = make(chan telegraf.Metric, m.MetricBuffer) + + topics := make(map[string]byte) + for _, topic := range m.Topics { + topics[topic] = byte(m.QoS) + } + subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) + subscribeToken.Wait() + if subscribeToken.Error() != nil { + return subscribeToken.Error() + } + + go m.parser() + + return nil +} + +func (m *MQTTConsumer) parser() { + for { + select { + case <-m.done: + return + case msg := <-m.in: + parser := telegraf.NewMetricParser() + parser.DefaultTags = make(map[string]string) + parser.DefaultTags["topic"] = msg.Topic() + metrics, err := parser.Parse(msg.Payload()) + if err != nil { + log.Printf("Could not parse MQTT message: %s, error: %s", + string(msg.Payload()), err.Error()) + } + + for _, metric := range metrics { + select { + case m.metricC <- metric: + continue + default: + log.Printf("MQTT Consumer buffer is full, dropping a metric." + + " You may want to increase the metric_buffer setting") + } + } + } + } +} + +func (m *MQTTConsumer) recvMessage(_ *mqtt.Client, msg mqtt.Message) { + m.in <- msg +} + +func (m *MQTTConsumer) Stop() { + m.Lock() + defer m.Unlock() + close(m.done) + m.client.Disconnect(200) +} + +func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { + m.Lock() + defer m.Unlock() + nmetrics := len(m.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-m.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) + } + return nil +} + +func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { + opts := mqtt.NewClientOptions() + + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + + tlsCfg, err := internal.GetTLSConfig( + m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) + if err != nil { + return nil, err + } + + scheme := "tcp" + if tlsCfg != nil { + scheme = "ssl" + opts.SetTLSConfig(tlsCfg) + } + + user := m.Username + if user == "" { + opts.SetUsername(user) + } + password := m.Password + if password != "" { + opts.SetPassword(password) + } + + if len(m.Servers) == 0 { + return opts, fmt.Errorf("could not get host infomations") + } + for _, host := range m.Servers { + server := fmt.Sprintf("%s://%s", scheme, host) + + opts.AddBroker(server) + } + opts.SetAutoReconnect(true) + // Setting KeepAlive to 0 disables it. + // TODO set KeepAlive to a real value (60s?) when this change is merged: + // https://git.eclipse.org/r/#/c/65850/ + opts.SetKeepAlive(time.Duration(0)) + return opts, nil +} + +func init() { + inputs.Add("mqtt_consumer", func() telegraf.Input { + return &MQTTConsumer{} + }) +} diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go new file mode 100644 index 0000000000000..816ca04d1b097 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -0,0 +1 @@ +package mqtt_consumer diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 5d2694ff3a205..61f0ef5571a74 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -11,9 +11,6 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" ) -const MaxRetryCount = 3 -const ClientIdPrefix = "telegraf" - type MQTT struct { Servers []string `toml:"servers"` Username string @@ -21,6 +18,7 @@ type MQTT struct { Database string Timeout internal.Duration TopicPrefix string + QoS int `toml:"qos"` // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -39,6 +37,8 @@ type MQTT struct { var sampleConfig = ` servers = ["localhost:1883"] # required. + ### MQTT QoS, must be 0, 1, or 2 + qos = 0 ### MQTT outputs send metrics to this topic format ### "///" @@ -61,6 +61,9 @@ func (m *MQTT) Connect() error { var err error m.Lock() defer m.Unlock() + if m.QoS > 2 || m.QoS < 0 { + return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS) + } m.opts, err = m.createOpts() if err != nil { @@ -124,7 +127,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { } func (m *MQTT) publish(topic, body string) error { - token := m.client.Publish(topic, 0, false, body) + token := m.client.Publish(topic, byte(m.QoS), false, body) token.Wait() if token.Error() != nil { return token.Error()