From 8cefba972a5ede1e5886c56a5b36e13e1f0ffb1c Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Tue, 9 Feb 2016 15:03:46 -0700 Subject: [PATCH] MQTT Consumer Input plugin --- Godeps | 3 +- plugins/inputs/EXAMPLE_README.md | 2 +- plugins/inputs/all/all.go | 1 + plugins/inputs/kafka_consumer/README.md | 31 ++- .../inputs/kafka_consumer/kafka_consumer.go | 33 +-- plugins/inputs/mqtt_consumer/README.md | 48 ++++ plugins/inputs/mqtt_consumer/mqtt_consumer.go | 228 ++++++++++++++++++ .../mqtt_consumer/mqtt_consumer_test.go | 186 ++++++++++++++ plugins/inputs/nats_consumer/README.md | 19 +- plugins/outputs/mqtt/mqtt.go | 11 +- plugins/parsers/graphite/parser.go | 18 +- plugins/parsers/influx/parser.go | 4 + plugins/parsers/json/parser.go | 4 + plugins/parsers/registry.go | 5 + 14 files changed, 550 insertions(+), 43 deletions(-) create mode 100644 plugins/inputs/mqtt_consumer/README.md create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer.go create mode 100644 plugins/inputs/mqtt_consumer/mqtt_consumer_test.go diff --git a/Godeps b/Godeps index 0b9a1672761dd..005aee939d56e 100644 --- a/Godeps +++ b/Godeps @@ -1,4 +1,4 @@ -git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git dbd8d5c40a582eb9adacde36b47932b3a3ad0034 +git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git 617c801af238c3af2d9e72c5d4a0f02edad03ce5 github.com/Shopify/sarama d37c73f2b2bce85f7fa16b6a550d26c5372892ef github.com/Sirupsen/logrus f7f79f729e0fbe2fcc061db48a9ba0263f588252 github.com/amir/raidman 6a8e089bbe32e6b907feae5ba688841974b3c339 @@ -30,7 +30,6 @@ github.com/naoina/go-stringutil 6b638e95a32d0c1131db0e7fe83775cbea4a0d0b github.com/naoina/toml 751171607256bb66e64c9f0220c00662420c38e9 github.com/nats-io/nats 6a83f1a633cfbfd90aa648ac99fb38c06a8b40df github.com/nsqio/go-nsq 2118015c120962edc5d03325c680daf3163a8b5f -github.com/pborman/uuid dee7705ef7b324f27ceb85a121c61f2c2e8ce988 github.com/pmezard/go-difflib 792786c7400a136282c1664665ae0a8db921c6c2 github.com/prometheus/client_golang 67994f177195311c3ea3d4407ed0175e34a4256f github.com/prometheus/client_model fa8ad6fec33561be4280a8f0514318c79d7f6cb6 diff --git a/plugins/inputs/EXAMPLE_README.md b/plugins/inputs/EXAMPLE_README.md index 16aaac8effd5a..9207cd2ab1cee 100644 --- a/plugins/inputs/EXAMPLE_README.md +++ b/plugins/inputs/EXAMPLE_README.md @@ -4,7 +4,7 @@ The example plugin gathers metrics about example things ### Configuration: -``` +```toml # Description [[inputs.example]] # SampleConfig diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 794885129f252..335d41a329dec 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -21,6 +21,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mongodb" + _ "github.com/influxdata/telegraf/plugins/inputs/mqtt_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/mysql" _ "github.com/influxdata/telegraf/plugins/inputs/nats_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/nginx" diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index f0d15356fda85..4fdda0c3aa69a 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -1,4 +1,4 @@ -# Kafka Consumer +# Kafka Consumer Input Plugin The [Kafka](http://kafka.apache.org/) consumer plugin polls a specified Kafka topic and adds messages to InfluxDB. The plugin assumes messages follow the @@ -6,6 +6,29 @@ line protocol. [Consumer Group](http://godoc.org/github.com/wvanbergen/kafka/con is used to talk to the Kafka cluster so multiple instances of telegraf can read from the same topic in parallel. +## Configuration + +```toml +# Read metrics from Kafka topic(s) +[[inputs.kafka_consumer]] + ### topic(s) to consume + topics = ["telegraf"] + ### an array of Zookeeper connection strings + zookeeper_peers = ["localhost:2181"] + ### the name of the consumer group + consumer_group = "telegraf_metrics_consumers" + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 + ### Offset (must be either "oldest" or "newest") + offset = "oldest" + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + ## Testing Running integration tests requires running Zookeeper & Kafka. The following @@ -16,9 +39,3 @@ To start Kafka & Zookeeper: ``` docker run -d -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=`boot2docker ip || docker-machine ip ` --env ADVERTISED_PORT=9092 spotify/kafka ``` - -To run tests: - -``` -go test -``` diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 20ce8ef235d3e..9fa47dee9bf1e 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -19,8 +19,10 @@ type Kafka struct { Topics []string ZookeeperPeers []string Consumer *consumergroup.ConsumerGroup - PointBuffer int - Offset string + MetricBuffer int + // TODO remove PointBuffer, legacy support + PointBuffer int + Offset string parser parsers.Parser @@ -30,7 +32,7 @@ type Kafka struct { in <-chan *sarama.ConsumerMessage // channel for all kafka consumer errors errs <-chan *sarama.ConsumerError - // channel for all incoming parsed kafka points + // channel for all incoming parsed kafka metrics metricC chan telegraf.Metric done chan struct{} @@ -46,8 +48,8 @@ var sampleConfig = ` zookeeper_peers = ["localhost:2181"] ### the name of the consumer group consumer_group = "telegraf_metrics_consumers" - ### Maximum number of points to buffer between collection intervals - point_buffer = 100000 + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 ### Offset (must be either "oldest" or "newest") offset = "oldest" @@ -104,10 +106,13 @@ func (k *Kafka) Start() error { } k.done = make(chan struct{}) - if k.PointBuffer == 0 { - k.PointBuffer = 100000 + if k.PointBuffer == 0 && k.MetricBuffer == 0 { + k.MetricBuffer = 100000 + } else if k.PointBuffer > 0 { + // Legacy support of PointBuffer field TODO remove + k.MetricBuffer = k.PointBuffer } - k.metricC = make(chan telegraf.Metric, k.PointBuffer) + k.metricC = make(chan telegraf.Metric, k.MetricBuffer) // Start the kafka message reader go k.receiver() @@ -128,7 +133,7 @@ func (k *Kafka) receiver() { case msg := <-k.in: metrics, err := k.parser.Parse(msg.Value) if err != nil { - log.Printf("Could not parse kafka message: %s, error: %s", + log.Printf("KAFKA PARSE ERROR\nmessage: %s\nerror: %s", string(msg.Value), err.Error()) } @@ -139,7 +144,7 @@ func (k *Kafka) receiver() { continue default: log.Printf("Kafka Consumer buffer is full, dropping a metric." + - " You may want to increase the point_buffer setting") + " You may want to increase the metric_buffer setting") } } @@ -166,10 +171,10 @@ func (k *Kafka) Stop() { func (k *Kafka) Gather(acc telegraf.Accumulator) error { k.Lock() defer k.Unlock() - npoints := len(k.metricC) - for i := 0; i < npoints; i++ { - point := <-k.metricC - acc.AddFields(point.Name(), point.Fields(), point.Tags(), point.Time()) + nmetrics := len(k.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-k.metricC + acc.AddFields(metric.Name(), metric.Fields(), metric.Tags(), metric.Time()) } return nil } diff --git a/plugins/inputs/mqtt_consumer/README.md b/plugins/inputs/mqtt_consumer/README.md new file mode 100644 index 0000000000000..6f7fa911cb865 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/README.md @@ -0,0 +1,48 @@ +# MQTT Consumer Input Plugin + +The [MQTT](http://mqtt.org/) consumer plugin reads from +specified MQTT topics and adds messages to InfluxDB. +The plugin expects messages in the +[Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). + +### Configuration: + +```toml +# Read metrics from MQTT topic(s) +[[inputs.mqtt_consumer]] + servers = ["localhost:1883"] + ### MQTT QoS, must be 0, 1, or 2 + qos = 0 + + ### Topics to subscribe to + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 + + ### username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ### Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ### Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +``` + +### Tags: + +- All measurements are tagged with the incoming topic, ie +`topic=telegraf/host01/cpu` diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go new file mode 100644 index 0000000000000..8ca0d44b1a281 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go @@ -0,0 +1,228 @@ +package mqtt_consumer + +import ( + "fmt" + "log" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" + "github.com/influxdata/telegraf/plugins/parsers" + + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +) + +type MQTTConsumer struct { + Servers []string + Topics []string + Username string + Password string + MetricBuffer int + QoS int `toml:"qos"` + + parser parsers.Parser + + // Path to CA file + SSLCA string `toml:"ssl_ca"` + // Path to host cert file + SSLCert string `toml:"ssl_cert"` + // Path to cert key file + SSLKey string `toml:"ssl_key"` + // Use SSL but skip chain & host verification + InsecureSkipVerify bool + + sync.Mutex + client *mqtt.Client + // channel for all incoming parsed mqtt metrics + metricC chan telegraf.Metric + // channel for the topics of all incoming metrics (for tagging metrics) + topicC chan string + // channel of all incoming raw mqtt messages + in chan mqtt.Message + done chan struct{} +} + +var sampleConfig = ` + servers = ["localhost:1883"] + ### MQTT QoS, must be 0, 1, or 2 + qos = 0 + + ### Topics to subscribe to + topics = [ + "telegraf/host01/cpu", + "telegraf/+/mem", + "sensors/#", + ] + + ### Maximum number of metrics to buffer between collection intervals + metric_buffer = 100000 + + ### username and password to connect MQTT server. + # username = "telegraf" + # password = "metricsmetricsmetricsmetrics" + + ### Optional SSL Config + # ssl_ca = "/etc/telegraf/ca.pem" + # ssl_cert = "/etc/telegraf/cert.pem" + # ssl_key = "/etc/telegraf/key.pem" + ### Use SSL but skip chain & host verification + # insecure_skip_verify = false + + ### Data format to consume. This can be "json", "influx" or "graphite" + ### Each data format has it's own unique set of configuration options, read + ### more about them here: + ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md + data_format = "influx" +` + +func (m *MQTTConsumer) SampleConfig() string { + return sampleConfig +} + +func (m *MQTTConsumer) Description() string { + return "Read metrics from MQTT topic(s)" +} + +func (m *MQTTConsumer) SetParser(parser parsers.Parser) { + m.parser = parser +} + +func (m *MQTTConsumer) Start() error { + m.Lock() + defer m.Unlock() + if m.QoS > 2 || m.QoS < 0 { + return fmt.Errorf("MQTT Consumer, invalid QoS value: %d", m.QoS) + } + + opts, err := m.createOpts() + if err != nil { + return err + } + + m.client = mqtt.NewClient(opts) + if token := m.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + m.in = make(chan mqtt.Message, m.MetricBuffer) + m.done = make(chan struct{}) + if m.MetricBuffer == 0 { + m.MetricBuffer = 100000 + } + m.metricC = make(chan telegraf.Metric, m.MetricBuffer) + m.topicC = make(chan string, m.MetricBuffer) + + topics := make(map[string]byte) + for _, topic := range m.Topics { + topics[topic] = byte(m.QoS) + } + subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage) + subscribeToken.Wait() + if subscribeToken.Error() != nil { + return subscribeToken.Error() + } + + go m.receiver() + + return nil +} + +// receiver() reads all incoming messages from the consumer, and parses them into +// influxdb metric points. +func (m *MQTTConsumer) receiver() { + for { + select { + case <-m.done: + return + case msg := <-m.in: + topic := msg.Topic() + metrics, err := m.parser.Parse(msg.Payload()) + if err != nil { + log.Printf("MQTT PARSE ERROR\nmessage: %s\nerror: %s", + string(msg.Payload()), err.Error()) + } + + for _, metric := range metrics { + select { + case m.metricC <- metric: + m.topicC <- topic + default: + log.Printf("MQTT Consumer buffer is full, dropping a metric." + + " You may want to increase the metric_buffer setting") + } + } + } + } +} + +func (m *MQTTConsumer) recvMessage(_ *mqtt.Client, msg mqtt.Message) { + m.in <- msg +} + +func (m *MQTTConsumer) Stop() { + m.Lock() + defer m.Unlock() + close(m.done) + m.client.Disconnect(200) +} + +func (m *MQTTConsumer) Gather(acc telegraf.Accumulator) error { + m.Lock() + defer m.Unlock() + nmetrics := len(m.metricC) + for i := 0; i < nmetrics; i++ { + metric := <-m.metricC + topic := <-m.topicC + tags := metric.Tags() + tags["topic"] = topic + acc.AddFields(metric.Name(), metric.Fields(), tags, metric.Time()) + } + return nil +} + +func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) { + opts := mqtt.NewClientOptions() + + opts.SetClientID("Telegraf-Consumer-" + internal.RandomString(5)) + + tlsCfg, err := internal.GetTLSConfig( + m.SSLCert, m.SSLKey, m.SSLCA, m.InsecureSkipVerify) + if err != nil { + return nil, err + } + + scheme := "tcp" + if tlsCfg != nil { + scheme = "ssl" + opts.SetTLSConfig(tlsCfg) + } + + user := m.Username + if user == "" { + opts.SetUsername(user) + } + password := m.Password + if password != "" { + opts.SetPassword(password) + } + + if len(m.Servers) == 0 { + return opts, fmt.Errorf("could not get host infomations") + } + for _, host := range m.Servers { + server := fmt.Sprintf("%s://%s", scheme, host) + + opts.AddBroker(server) + } + opts.SetAutoReconnect(true) + opts.SetKeepAlive(time.Second * 60) + return opts, nil +} + +func init() { + inputs.Add("mqtt_consumer", func() telegraf.Input { + return &MQTTConsumer{} + }) +} diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go new file mode 100644 index 0000000000000..be216dfbbbbd6 --- /dev/null +++ b/plugins/inputs/mqtt_consumer/mqtt_consumer_test.go @@ -0,0 +1,186 @@ +package mqtt_consumer + +import ( + "testing" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/parsers" + "github.com/influxdata/telegraf/testutil" + + "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" +) + +const ( + testMsg = "cpu_load_short,host=server01 value=23422.0 1422568543702900257" + testMsgGraphite = "cpu.load.short.graphite 23422 1454780029" + testMsgJSON = "{\"a\": 5, \"b\": {\"c\": 6}}\n" + invalidMsg = "cpu_load_short,host=server01 1422568543702900257" + metricBuffer = 5 +) + +func newTestMQTTConsumer() (*MQTTConsumer, chan mqtt.Message) { + in := make(chan mqtt.Message, metricBuffer) + n := &MQTTConsumer{ + Topics: []string{"telegraf"}, + Servers: []string{"localhost:1883"}, + MetricBuffer: metricBuffer, + in: in, + done: make(chan struct{}), + metricC: make(chan telegraf.Metric, metricBuffer), + topicC: make(chan string, metricBuffer), + } + return n, in +} + +// Test that the parser parses NATS messages into metrics +func TestRunParser(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- mqttMsg(testMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } +} + +// Test that the parser ignores invalid messages +func TestRunParserInvalidMsg(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- mqttMsg(invalidMsg) + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != 0 { + t.Errorf("got %v, expected %v", a, 0) + } +} + +// Test that metrics are dropped when we hit the buffer limit +func TestRunParserRespectsBuffer(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + for i := 0; i < metricBuffer+1; i++ { + in <- mqttMsg(testMsg) + } + time.Sleep(time.Millisecond) + + if a := len(n.metricC); a != metricBuffer { + t.Errorf("got %v, expected %v", a, metricBuffer) + } +} + +// Test that the parser parses line format messages into metrics +func TestRunParserAndGather(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewInfluxParser() + go n.receiver() + in <- mqttMsg(testMsg) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses graphite format messages into metrics +func TestRunParserAndGatherGraphite(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewGraphiteParser("_", []string{}, nil) + go n.receiver() + in <- mqttMsg(testMsgGraphite) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "cpu_load_short_graphite", + map[string]interface{}{"value": float64(23422)}) +} + +// Test that the parser parses json format messages into metrics +func TestRunParserAndGatherJSON(t *testing.T) { + n, in := newTestMQTTConsumer() + defer close(n.done) + + n.parser, _ = parsers.NewJSONParser("nats_json_test", []string{}, nil) + go n.receiver() + in <- mqttMsg(testMsgJSON) + time.Sleep(time.Millisecond) + + acc := testutil.Accumulator{} + n.Gather(&acc) + + if a := len(acc.Metrics); a != 1 { + t.Errorf("got %v, expected %v", a, 1) + } + acc.AssertContainsFields(t, "nats_json_test", + map[string]interface{}{ + "a": float64(5), + "b_c": float64(6), + }) +} + +func mqttMsg(val string) mqtt.Message { + return &message{ + topic: "telegraf/unit_test", + payload: []byte(val), + } +} + +// Take the message struct from the paho mqtt client library for returning +// a test message interface. +type message struct { + duplicate bool + qos byte + retained bool + topic string + messageID uint16 + payload []byte +} + +func (m *message) Duplicate() bool { + return m.duplicate +} + +func (m *message) Qos() byte { + return m.qos +} + +func (m *message) Retained() bool { + return m.retained +} + +func (m *message) Topic() string { + return m.topic +} + +func (m *message) MessageID() uint16 { + return m.messageID +} + +func (m *message) Payload() []byte { + return m.payload +} diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index b2d027039f86f..31d13297e8d80 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -1,14 +1,15 @@ -# NATS Consumer +# NATS Consumer Input Plugin -The [NATS](http://www.nats.io/about/) consumer plugin reads from +The [NATS](http://www.nats.io/about/) consumer plugin reads from specified NATS subjects and adds messages to InfluxDB. The plugin expects messages -in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). +in the [Telegraf Input Data Formats](https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md). A [Queue Group](http://www.nats.io/documentation/concepts/nats-queueing/) is used when subscribing to subjects so multiple instances of telegraf can read from a NATS cluster in parallel. ## Configuration -``` + +```toml # Read metrics from NATS subject(s) [[inputs.nats_consumer]] ### urls of NATS servers @@ -21,18 +22,10 @@ from a NATS cluster in parallel. queue_group = "telegraf_consumers" ### Maximum number of metrics to buffer between collection intervals metric_buffer = 100000 - + ### Data format to consume. This can be "json", "influx" or "graphite" ### Each data format has it's own unique set of configuration options, read ### more about them here: ### https://github.com/influxdata/telegraf/blob/master/DATA_FORMATS_INPUT.md data_format = "influx" ``` - -## Testing - -To run tests: - -``` -go test -``` \ No newline at end of file diff --git a/plugins/outputs/mqtt/mqtt.go b/plugins/outputs/mqtt/mqtt.go index 5d2694ff3a205..61f0ef5571a74 100644 --- a/plugins/outputs/mqtt/mqtt.go +++ b/plugins/outputs/mqtt/mqtt.go @@ -11,9 +11,6 @@ import ( "github.com/influxdata/telegraf/plugins/outputs" ) -const MaxRetryCount = 3 -const ClientIdPrefix = "telegraf" - type MQTT struct { Servers []string `toml:"servers"` Username string @@ -21,6 +18,7 @@ type MQTT struct { Database string Timeout internal.Duration TopicPrefix string + QoS int `toml:"qos"` // Path to CA file SSLCA string `toml:"ssl_ca"` @@ -39,6 +37,8 @@ type MQTT struct { var sampleConfig = ` servers = ["localhost:1883"] # required. + ### MQTT QoS, must be 0, 1, or 2 + qos = 0 ### MQTT outputs send metrics to this topic format ### "///" @@ -61,6 +61,9 @@ func (m *MQTT) Connect() error { var err error m.Lock() defer m.Unlock() + if m.QoS > 2 || m.QoS < 0 { + return fmt.Errorf("MQTT Output, invalid QoS value: %d", m.QoS) + } m.opts, err = m.createOpts() if err != nil { @@ -124,7 +127,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error { } func (m *MQTT) publish(topic, body string) error { - token := m.client.Publish(topic, 0, false, body) + token := m.client.Publish(topic, byte(m.QoS), false, body) token.Wait() if token.Error() != nil { return token.Error() diff --git a/plugins/parsers/graphite/parser.go b/plugins/parsers/graphite/parser.go index 74ccd81cb5007..5e88150642ebb 100644 --- a/plugins/parsers/graphite/parser.go +++ b/plugins/parsers/graphite/parser.go @@ -29,6 +29,10 @@ type GraphiteParser struct { matcher *matcher } +func (p *GraphiteParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + func NewGraphiteParser( separator string, templates []string, @@ -104,13 +108,14 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) { metrics := make([]telegraf.Metric, 0) + var errStr string buffer := bytes.NewBuffer(buf) reader := bufio.NewReader(buffer) for { // Read up to the next newline. buf, err := reader.ReadBytes('\n') if err == io.EOF { - return metrics, nil + break } if err != nil && err != io.EOF { return metrics, err @@ -118,10 +123,19 @@ func (p *GraphiteParser) Parse(buf []byte) ([]telegraf.Metric, error) { // Trim the buffer, even though there should be no padding line := strings.TrimSpace(string(buf)) - if metric, err := p.ParseLine(line); err == nil { + metric, err := p.ParseLine(line) + + if err == nil { metrics = append(metrics, metric) + } else { + errStr += err.Error() + "\n" } } + + if errStr != "" { + return metrics, fmt.Errorf(errStr) + } + return metrics, nil } // Parse performs Graphite parsing of a single line. diff --git a/plugins/parsers/influx/parser.go b/plugins/parsers/influx/parser.go index d5a1b8db5c805..345e60b2e3b27 100644 --- a/plugins/parsers/influx/parser.go +++ b/plugins/parsers/influx/parser.go @@ -55,3 +55,7 @@ func (p *InfluxParser) ParseLine(line string) (telegraf.Metric, error) { return metrics[0], nil } + +func (p *InfluxParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} diff --git a/plugins/parsers/json/parser.go b/plugins/parsers/json/parser.go index d8aa93e01dfd7..e5172ac97b116 100644 --- a/plugins/parsers/json/parser.go +++ b/plugins/parsers/json/parser.go @@ -67,6 +67,10 @@ func (p *JSONParser) ParseLine(line string) (telegraf.Metric, error) { return metrics[0], nil } +func (p *JSONParser) SetDefaultTags(tags map[string]string) { + p.DefaultTags = tags +} + type JSONFlattener struct { Fields map[string]interface{} } diff --git a/plugins/parsers/registry.go b/plugins/parsers/registry.go index 083d497e6f048..982b6bb80c511 100644 --- a/plugins/parsers/registry.go +++ b/plugins/parsers/registry.go @@ -28,6 +28,11 @@ type Parser interface { // ie, "cpu.usage.idle 90" // and parses it into a telegraf metric. ParseLine(line string) (telegraf.Metric, error) + + // SetDefaultTags tells the parser to add all of the given tags + // to each parsed metric. + // NOTE: do _not_ modify the map after you've passed it here!! + SetDefaultTags(tags map[string]string) } // Config is a struct that covers the data types needed for all parser types,