Skip to content

Commit

Permalink
[Filebeat] Refactor mqtt input (elastic#16014)
Browse files Browse the repository at this point in the history
* Refactor mqtt input

* Fix: comment

* Add unit tests

* Test: input run

* Fix Test: run and stop

* Test: backoff

* Adjust code after review
  • Loading branch information
mtojek committed Feb 5, 2020
1 parent 66845f0 commit be8e14c
Show file tree
Hide file tree
Showing 6 changed files with 792 additions and 270 deletions.
192 changes: 19 additions & 173 deletions filebeat/input/mqtt/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,191 +18,37 @@
package mqtt

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"io/ioutil"
"strings"
"time"
libmqtt "github.com/eclipse/paho.mqtt.golang"

"gopkg.in/vmihailenco/msgpack.v2"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

MQTT "github.com/eclipse/paho.mqtt.golang"
"github.com/elastic/beats/libbeat/outputs"
)

func (input *mqttInput) newTLSConfig() (*tls.Config, error) {
config := input.config

// Import trusted certificates from CAfile.pem.
// Alternatively, manually add CA certificates to
// default openssl CA bundle.
certpool := x509.NewCertPool()
if config.CA != "" {
logp.Info("[MQTT] Set the CA")
pemCerts, err := ioutil.ReadFile(config.CA)
if err != nil {
return nil, err
}
certpool.AppendCertsFromPEM(pemCerts)
}
func createClientOptions(config mqttInputConfig, onConnectHandler func(client libmqtt.Client)) (*libmqtt.ClientOptions, error) {
clientOptions := libmqtt.NewClientOptions().
SetClientID(config.ClientID).
SetUsername(config.Username).
SetPassword(config.Password).
SetConnectRetry(true).
SetOnConnectHandler(onConnectHandler)

tlsconfig := &tls.Config{
// RootCAs = certs used to verify server cert.
RootCAs: certpool,
// ClientAuth = whether to request cert from server.
// Since the server is set up for SSL, this happens
// anyways.
ClientAuth: tls.NoClientCert,
// ClientCAs = certs used to validate client cert.
ClientCAs: nil,
// InsecureSkipVerify = verify that cert contents
// match server. IP matches what is in cert etc.
InsecureSkipVerify: true,
for _, host := range config.Hosts {
clientOptions.AddBroker(host)
}

// Import client certificate/key pair
if config.ClientCert != "" && config.ClientKey != "" {
logp.Info("[MQTT] Set the Certs")
cert, err := tls.LoadX509KeyPair(config.ClientCert, config.ClientKey)
if config.TLS != nil {
tlsConfig, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
}

// Certificates = list of certs client sends to server.
tlsconfig.Certificates = []tls.Certificate{cert}
}

// Create tls.Config with desired tls properties
return tlsconfig, nil
}

// Prepare MQTT client
func (input *mqttInput) setupMqttClient() error {
c := input.config

logp.Info("[MQTT] Connect to broker URL: %s", c.Host)

mqttClientOpt := MQTT.NewClientOptions()
mqttClientOpt.SetClientID(c.ClientID)
mqttClientOpt.AddBroker(c.Host)

mqttClientOpt.SetMaxReconnectInterval(1 * time.Second)
mqttClientOpt.SetConnectionLostHandler(input.connectionLostHandler)
mqttClientOpt.SetOnConnectHandler(input.subscribeOnConnect)
mqttClientOpt.SetAutoReconnect(true)

if c.Username != "" {
logp.Info("[MQTT] Broker username: %s", c.Username)
mqttClientOpt.SetUsername(c.Username)
}

if c.Password != "" {
mqttClientOpt.SetPassword(c.Password)
}

if c.SSL == true {
logp.Info("[MQTT] Configure session to use SSL")
tlsconfig, err := input.newTLSConfig()
if err != nil {
return err
}
mqttClientOpt.SetTLSConfig(tlsconfig)
}

input.client = MQTT.NewClient(mqttClientOpt)
return nil
}

func (input *mqttInput) connect() error {
if token := input.client.Connect(); token.WaitTimeout(input.config.WaitClose) && token.Error() != nil {
logp.Err("MQTT Failed to connect")
return token.Error()
}
logp.Info("MQTT Client connected: %t", input.client.IsConnected())
return nil
}

func (input *mqttInput) subscribeOnConnect(client MQTT.Client) {
subscriptions := prepareSubscriptionsForTopics(input.config.Topics, input.config.QoS)

// Mqtt client - Subscribe to every topic in the config file, and bind with message handler
if token := input.client.SubscribeMultiple(subscriptions, input.onMessage); token.WaitTimeout(input.config.WaitClose) && token.Error() != nil {
logp.Error(token.Error())
}
logp.Info("MQTT Subscribed to configured topics")
}

// Mqtt message handler
func (input *mqttInput) onMessage(client MQTT.Client, msg MQTT.Message) {
logp.Debug("MQTT", "MQTT message received: %s", string(msg.Payload()))
var beatEvent beat.Event
eventFields := make(common.MapStr)

// default case
var mqtt = make(common.MapStr)
eventFields["message"] = string(msg.Payload())
if input.config.DecodePayload {
mqtt["fields"] = decodeBytes(msg.Payload())
}

eventFields["is_system_topic"] = strings.HasPrefix(msg.Topic(), "$")
eventFields["topic"] = msg.Topic()

mqtt["id"] = msg.MessageID()
mqtt["retained"] = msg.Retained()
eventFields["mqtt"] = mqtt

// Finally sending the message to elasticsearch
beatEvent.Fields = eventFields
isSent := input.outlet.OnEvent(beatEvent)

logp.Debug("MQTT", "Event sent: %t", isSent)
}

// connectionLostHandler will try to reconnect when connection is lost
func (input *mqttInput) connectionLostHandler(client MQTT.Client, reason error) {
logp.Warn("[MQTT] Connection lost: %s", reason.Error())

//Rerun the input
input.Run()
}

// decodeBytes will try to decode the bytes in the following order
// 1.) Check for msgpack format
// 2.) Check for json format
// 3.) If every check fails, it will
// return the the string representation
func decodeBytes(payload []byte) common.MapStr {
event := make(common.MapStr)

// A msgpack payload must be a json-like object
err := msgpack.Unmarshal(payload, &event)
if err == nil {
logp.Debug("MQTT", "Payload decoded - msgpack")
return event
clientOptions.SetTLSConfig(tlsConfig.BuildModuleConfig(""))
}

err = json.Unmarshal(payload, &event)
if err == nil {
logp.Debug("MQTT", "Payload decoded - as json")
return event
}

logp.Debug("MQTT", "decoded - as text")
return event
return clientOptions, nil
}

// ParseTopics will parse the config file and return a map with topic:QoS
func prepareSubscriptionsForTopics(topics []string, qos int) map[string]byte {
subscriptions := make(map[string]byte)
for _, value := range topics {
// Finally, filling the subscriptions map
subscriptions[value] = byte(qos)
logp.Info("Subscribe to %v with QoS %v", value, qos)
func createClientSubscriptions(config mqttInputConfig) map[string]byte {
subscriptions := map[string]byte{}
for _, topic := range config.Topics {
subscriptions[topic] = byte(config.QoS)
}
return subscriptions
}
Loading

0 comments on commit be8e14c

Please sign in to comment.