From 06ba68f033c9da2077c10368b9839515e6e5a376 Mon Sep 17 00:00:00 2001 From: Kirill Isakov Date: Sat, 13 Feb 2021 11:56:17 +0600 Subject: [PATCH] MQTT gateway: add support for CO2 measurements --- gateway/build.sh | 10 ++++++++++ gateway/mon/co2.go | 37 +++++++++++++++++++++++++++++++++++++ gateway/mqtt/mqtt.go | 44 +++++++++++++++++++++++++------------------- 3 files changed, 72 insertions(+), 19 deletions(-) create mode 100755 gateway/build.sh create mode 100644 gateway/mon/co2.go diff --git a/gateway/build.sh b/gateway/build.sh new file mode 100755 index 0000000..def2d51 --- /dev/null +++ b/gateway/build.sh @@ -0,0 +1,10 @@ +#!/usr/bin/bash + +set -euo pipefail + +cd "$(dirname "$0")" + +rm --force airmon + +podman run --rm -it -v .:/app golang:buster bash -c 'cd /app && go build .' + diff --git a/gateway/mon/co2.go b/gateway/mon/co2.go new file mode 100644 index 0000000..fddafe9 --- /dev/null +++ b/gateway/mon/co2.go @@ -0,0 +1,37 @@ +package mon + +import ( + "encoding/json" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" + "log" + "time" +) + +type co2 struct { + Device string `json:"dev"` + Time int64 `json:"time"` + Sensor string `json:"sens"` + Co2 uint16 `json:"co2"` +} + +func (t *co2) toPoint() *influxdb2Write.Point { + return influxdb2.NewPoint("co2", + map[string]string{ + "device": t.Device, + "sensor": t.Sensor, + }, + map[string]interface{}{ + "co2": t.Co2, + }, + time.Unix(t.Time, 0)) +} + +func ParseCarbonDioxide(data []byte) (*influxdb2Write.Point, error) { + var co2 co2 + if err := json.Unmarshal(data, &co2); err != nil { + log.Print("could not parse co2 json: ", err) + return nil, err + } + return co2.toPoint(), nil +} diff --git a/gateway/mqtt/mqtt.go b/gateway/mqtt/mqtt.go index 7092da1..4933541 100644 --- a/gateway/mqtt/mqtt.go +++ b/gateway/mqtt/mqtt.go @@ -54,30 +54,36 @@ type connHandler struct { write chan<- *influxdb2Write.Point } -func (h *connHandler) onConnect(client mqtt.Client) { - checkError := func(token mqtt.Token) { - if token.Wait() && token.Error() != nil { - log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic")) - } - } +type topic struct { + topic string + mapper func(data []byte) (*influxdb2Write.Point, error) +} - token := client.Subscribe("meas/temp", 0, func(client mqtt.Client, msg mqtt.Message) { - if point, err := mon.ParseTemperature(msg.Payload()); err == nil { - h.write <- point - } else { - log.Print("could not parse temperature: ", err) - } - }) - checkError(token) +var topics = []*topic{ + {"meas/part", mon.ParseParticulates}, + {"meas/temp", mon.ParseTemperature}, + {"meas/co2", mon.ParseCarbonDioxide}, +} - token = client.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) { - if point, err := mon.ParseParticulates(msg.Payload()); err == nil { - h.write <- point +func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Point) { + token := client.Subscribe(t.topic, 0, func(client mqtt.Client, msg mqtt.Message) { + if point, err := t.mapper(msg.Payload()); err == nil { + writeCh <- point } else { - log.Print("could not parse particulates: ", err) + log.Print("could not parse ", t.topic, " data: ", err) } }) - checkError(token) + + if token.Wait() && token.Error() != nil { + log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic "+t.topic)) + } +} + +func (h *connHandler) onConnect(client mqtt.Client) { + for _, t := range topics { + log.Print("subscribing to topic ", t.topic) + go subscribe(t, client, h.write) + } } func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error {