Skip to content

Commit

Permalink
Add option to set retain flag on messages in mqtt output (influxdata#…
Browse files Browse the repository at this point in the history
  • Loading branch information
p-kraszewski authored and bitcharmer committed Oct 18, 2019
1 parent c0ff540 commit 083a417
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 2 deletions.
7 changes: 6 additions & 1 deletion plugins/outputs/mqtt/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
# tls_key = "/etc/telegraf/key.pem"
## Use TLS but skip chain & host verification
# insecure_skip_verify = false

## When true, metrics will be sent in one MQTT message per flush. Otherwise,
## metrics are written one metric per MQTT message.
# batch = false

## When true, metric will have RETAIN flag set, making broker cache entries until someone
## actually reads it
# retain = flase

## Data format to output.
# data_format = "influx"
Expand All @@ -56,4 +60,5 @@ This plugin writes to a [MQTT Broker](http://http://mqtt.org/) acting as a mqtt
* `tls_cert`: TLS CERT
* `tls_key`: TLS key
* `insecure_skip_verify`: Use TLS but skip chain & host verification (default: false)
* `retain`: Set `retain` flag when publishing, instructing server to cache metric until someone reads it (default: false)
* `data_format`: [About Telegraf data formats](https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md)
7 changes: 6 additions & 1 deletion plugins/outputs/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ var sampleConfig = `
## When true, metrics will be sent in one MQTT message per flush. Otherwise,
## metrics are written one metric per MQTT message.
# batch = false
## When true, metric will have RETAIN flag set, making broker cache entries until someone
## actually reads it
# retain = flase
## Data format to output.
## Each data format has its own unique set of configuration options, read
Expand All @@ -68,6 +72,7 @@ type MQTT struct {
ClientID string `toml:"client_id"`
tls.ClientConfig
BatchMessage bool `toml:"batch"`
Retain bool

client paho.Client
opts *paho.ClientOptions
Expand Down Expand Up @@ -174,7 +179,7 @@ func (m *MQTT) Write(metrics []telegraf.Metric) error {
}

func (m *MQTT) publish(topic string, body []byte) error {
token := m.client.Publish(topic, byte(m.QoS), false, body)
token := m.client.Publish(topic, byte(m.QoS), m.Retain, body)
token.WaitTimeout(m.Timeout.Duration)
if token.Error() != nil {
return token.Error()
Expand Down

0 comments on commit 083a417

Please sign in to comment.