Skip to content

Commit

Permalink
MQTT gateway: add support for CO2 measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 13, 2021
1 parent 1356b17 commit 06ba68f
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 19 deletions.
10 changes: 10 additions & 0 deletions gateway/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/usr/bin/bash

set -euo pipefail

cd "$(dirname "$0")"

rm --force airmon

podman run --rm -it -v .:/app golang:buster bash -c 'cd /app && go build .'

37 changes: 37 additions & 0 deletions gateway/mon/co2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package mon

import (
"encoding/json"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
"time"
)

type co2 struct {
Device string `json:"dev"`
Time int64 `json:"time"`
Sensor string `json:"sens"`
Co2 uint16 `json:"co2"`
}

func (t *co2) toPoint() *influxdb2Write.Point {
return influxdb2.NewPoint("co2",
map[string]string{
"device": t.Device,
"sensor": t.Sensor,
},
map[string]interface{}{
"co2": t.Co2,
},
time.Unix(t.Time, 0))
}

func ParseCarbonDioxide(data []byte) (*influxdb2Write.Point, error) {
var co2 co2
if err := json.Unmarshal(data, &co2); err != nil {
log.Print("could not parse co2 json: ", err)
return nil, err
}
return co2.toPoint(), nil
}
44 changes: 25 additions & 19 deletions gateway/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,30 +54,36 @@ type connHandler struct {
write chan<- *influxdb2Write.Point
}

func (h *connHandler) onConnect(client mqtt.Client) {
checkError := func(token mqtt.Token) {
if token.Wait() && token.Error() != nil {
log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic"))
}
}
type topic struct {
topic string
mapper func(data []byte) (*influxdb2Write.Point, error)
}

token := client.Subscribe("meas/temp", 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := mon.ParseTemperature(msg.Payload()); err == nil {
h.write <- point
} else {
log.Print("could not parse temperature: ", err)
}
})
checkError(token)
var topics = []*topic{
{"meas/part", mon.ParseParticulates},
{"meas/temp", mon.ParseTemperature},
{"meas/co2", mon.ParseCarbonDioxide},
}

token = client.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := mon.ParseParticulates(msg.Payload()); err == nil {
h.write <- point
func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Point) {
token := client.Subscribe(t.topic, 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := t.mapper(msg.Payload()); err == nil {
writeCh <- point
} else {
log.Print("could not parse particulates: ", err)
log.Print("could not parse ", t.topic, " data: ", err)
}
})
checkError(token)

if token.Wait() && token.Error() != nil {
log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic "+t.topic))
}
}

func (h *connHandler) onConnect(client mqtt.Client) {
for _, t := range topics {
log.Print("subscribing to topic ", t.topic)
go subscribe(t, client, h.write)
}
}

func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error {
Expand Down

0 comments on commit 06ba68f

Please sign in to comment.