Skip to content

Commit

Permalink
Auto reconnect MQTT gateway on connection failure
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Jan 30, 2021
1 parent f9ce460 commit 7312205
Showing 1 changed file with 29 additions and 18 deletions.
47 changes: 29 additions & 18 deletions gateway/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *Settings) validate() error {
return nil
}

func newMqttClient(settings *Settings) mqtt.Client {
func newMqttClient(settings *Settings, onConn mqtt.OnConnectHandler) mqtt.Client {
opts := mqtt.NewClientOptions()
opts.SetResumeSubs(true)
opts.AddBroker(settings.Broker)
Expand All @@ -40,6 +40,8 @@ func newMqttClient(settings *Settings) mqtt.Client {
opts.SetPassword(settings.Pass)
}

opts.SetOnConnectHandler(onConn)

opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) {
log.Print("mqtt msg received:", string(msg.Payload()))
})
Expand All @@ -48,38 +50,47 @@ func newMqttClient(settings *Settings) mqtt.Client {
return mqttClient
}

func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error {
if err := settings.validate(); err != nil {
return err
}

client := newMqttClient(settings)
type connHandler struct {
write chan<- *influxdb2Write.Point
}

token := client.Connect()
if token.Wait() && token.Error() != nil {
return errors.Wrap(token.Error(), "mqtt connect failed")
func (h *connHandler) onConnect(client mqtt.Client) {
checkError := func(token mqtt.Token) {
if token.Wait() && token.Error() != nil {
log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic"))
}
}

client.Subscribe("meas/temp", 0, func(client mqtt.Client, msg mqtt.Message) {
token := client.Subscribe("meas/temp", 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := mon.ParseTemperature(msg.Payload()); err == nil {
write <- point
h.write <- point
} else {
log.Print("could not parse temperature: ", err)
}
})
if token.Wait() && token.Error() != nil {
return errors.Wrap(token.Error(), "could not subscribe to mqtt topic")
}
checkError(token)

client.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) {
token = client.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := mon.ParseParticulates(msg.Payload()); err == nil {
write <- point
h.write <- point
} else {
log.Print("could not parse particulates: ", err)
}
})
checkError(token)
}

func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error {
if err := settings.validate(); err != nil {
return err
}

handler := connHandler{write}
client := newMqttClient(settings, handler.onConnect)

token := client.Connect()
if token.Wait() && token.Error() != nil {
return errors.Wrap(token.Error(), "could not subscribe to mqtt topic")
return errors.Wrap(token.Error(), "mqtt connect failed")
}

return nil
Expand Down

0 comments on commit 7312205

Please sign in to comment.