From 339a205298ad3cc45ad11af3c9689cb112f7d8ed Mon Sep 17 00:00:00 2001 From: Kirill Isakov Date: Sun, 14 Feb 2021 01:55:32 +0600 Subject: [PATCH] MQTT gateway: minor cleanups and optimizations --- gateway/airkaz/airkaz.go | 20 +++++++++---- gateway/influx/influx.go | 59 +++++++++++++++++++++++++++----------- gateway/main.go | 6 ++-- gateway/mon/co2.go | 14 ++++----- gateway/mon/pm.go | 14 ++++----- gateway/mon/temperature.go | 14 ++++----- gateway/mon/types.go | 9 ++++++ gateway/mqtt/mqtt.go | 18 ++++++------ 8 files changed, 93 insertions(+), 61 deletions(-) create mode 100644 gateway/mon/types.go diff --git a/gateway/airkaz/airkaz.go b/gateway/airkaz/airkaz.go index 3944ce4..d79eb63 100644 --- a/gateway/airkaz/airkaz.go +++ b/gateway/airkaz/airkaz.go @@ -2,8 +2,8 @@ package airkaz import ( "encoding/json" + "github.com/hg/airmon/influx" influxdb2 "github.com/influxdata/influxdb-client-go/v2" - influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/pkg/errors" "io/ioutil" "log" @@ -60,15 +60,18 @@ type measurement struct { Hour string `json:"hour"` } -func Collect(write chan<- *influxdb2Write.Point) { +func Collect(sender *influx.MeasurementSender) { client := newProxiedClient() lastUpdates := map[int64]Time{} for { + if measurements, err := getResponse(client); err == nil { log.Print("found ", len(measurements), " airkaz measurements") + toSave := make([]*measurement, len(measurements)) + for _, meas := range measurements { if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" { continue @@ -77,9 +80,14 @@ func Collect(write chan<- *influxdb2Write.Point) { continue } lastUpdates[meas.Id] = meas.Date - - go saveMeasurement(meas, write) + toSave = append(toSave, &meas) } + + go func() { + for _, ms := range toSave { + saveMeasurement(ms, sender) + } + }() } else { log.Print("could not get response from airkaz: ", err) } @@ -113,14 +121,14 @@ func getResponse(client *http.Client) ([]measurement, error) { return measurements, nil } -func saveMeasurement(meas measurement, write chan<- *influxdb2Write.Point) { +func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) { tags := map[string]string{ "city": meas.City, "station": meas.Name, } save := func(kind string, fields map[string]interface{}) { - write <- influxdb2.NewPoint(kind, tags, fields, meas.Date.Time) + sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)) } if meas.Pm25Curr != nil && meas.Pm10Curr != nil { diff --git a/gateway/influx/influx.go b/gateway/influx/influx.go index 29a9c6e..28cba6e 100644 --- a/gateway/influx/influx.go +++ b/gateway/influx/influx.go @@ -7,6 +7,7 @@ import ( influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" "log" + "time" ) type Settings struct { @@ -34,28 +35,54 @@ func (s *Settings) validate() error { func newClient(settings Settings) influxdb2Api.WriteAPIBlocking { client := influxdb2.NewClient(settings.Uri, settings.Token) - writeApi := client.WriteAPIBlocking(settings.Org, settings.Bucket) - return writeApi + return client.WriteAPIBlocking(settings.Org, settings.Bucket) } -func NewWriter(settings Settings) (chan<- *influxdb2Write.Point, error) { - if err := settings.validate(); err != nil { - return nil, err +type MeasurementSender struct { + api influxdb2Api.WriteAPIBlocking + ch chan *influxdb2Write.Point +} + +func (ms *MeasurementSender) Send(point *influxdb2Write.Point) bool { + for retry := 0; ; retry++ { + select { + case ms.ch <- point: + return true + + case <-time.After(5 * time.Second): + log.Print("timed out while trying to send measurement") + + if retry >= 3 { + log.Print("could not send measurement, discarding point") + return false + } + <-ms.ch // drop the oldest measurement and retry + } } +} - writeApi := newClient(settings) - writeCh := make(chan *influxdb2Write.Point, 2000) +func (ms *MeasurementSender) receive() { ctx := context.Background() - go func() { - for point := range writeCh { - if err := writeApi.WritePoint(ctx, point); err == nil { - log.Print("point written") - } else { - log.Print("could not write point: ", err) - } + for point := range ms.ch { + if err := ms.api.WritePoint(ctx, point); err == nil { + log.Print("point written") + } else { + log.Print("could not write point: ", err) } - }() + } +} + +func NewWriter(settings Settings) (*MeasurementSender, error) { + if err := settings.validate(); err != nil { + return nil, err + } + + sender := &MeasurementSender{ + ch: make(chan *influxdb2Write.Point, 2000), + api: newClient(settings), + } + go sender.receive() - return writeCh, nil + return sender, nil } diff --git a/gateway/main.go b/gateway/main.go index fde92ba..7697e52 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -22,14 +22,14 @@ func main() { flag.Parse() - writeCh, err := influx.NewWriter(infs) + sender, err := influx.NewWriter(infs) if err != nil { log.Fatal("could not prepare InfluxDB writer: ", err) } - go airkaz.Collect(writeCh) + go airkaz.Collect(sender) - if err = mqtt.StartMqtt(&mqts, writeCh); err != nil { + if err = mqtt.StartMqtt(&mqts, sender); err != nil { log.Fatal("could not create MQTT client: ", err) } diff --git a/gateway/mon/co2.go b/gateway/mon/co2.go index fddafe9..39918d2 100644 --- a/gateway/mon/co2.go +++ b/gateway/mon/co2.go @@ -4,7 +4,6 @@ import ( "encoding/json" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" - "log" "time" ) @@ -15,7 +14,7 @@ type co2 struct { Co2 uint16 `json:"co2"` } -func (t *co2) toPoint() *influxdb2Write.Point { +func (t *co2) ToPoint() *influxdb2Write.Point { return influxdb2.NewPoint("co2", map[string]string{ "device": t.Device, @@ -27,11 +26,8 @@ func (t *co2) toPoint() *influxdb2Write.Point { time.Unix(t.Time, 0)) } -func ParseCarbonDioxide(data []byte) (*influxdb2Write.Point, error) { - var co2 co2 - if err := json.Unmarshal(data, &co2); err != nil { - log.Print("could not parse co2 json: ", err) - return nil, err - } - return co2.toPoint(), nil +func ParseCarbonDioxide(data []byte) (PointSource, error) { + ms := &co2{} + err := json.Unmarshal(data, ms) + return ms, err } diff --git a/gateway/mon/pm.go b/gateway/mon/pm.go index 0fefe2b..76a2b20 100644 --- a/gateway/mon/pm.go +++ b/gateway/mon/pm.go @@ -4,7 +4,6 @@ import ( "encoding/json" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" - "log" "time" ) @@ -30,7 +29,7 @@ type particulates struct { } `json:"cnt"` } -func (t *particulates) toPoint() *influxdb2Write.Point { +func (t *particulates) ToPoint() *influxdb2Write.Point { return influxdb2.NewPoint("particulates", map[string]string{ "device": t.Device, @@ -53,11 +52,8 @@ func (t *particulates) toPoint() *influxdb2Write.Point { time.Unix(t.Time, 0)) } -func ParseParticulates(data []byte) (*influxdb2Write.Point, error) { - var part particulates - if err := json.Unmarshal(data, &part); err != nil { - log.Print("could not parse particulates json: ", err) - return nil, err - } - return part.toPoint(), nil +func ParseParticulates(data []byte) (PointSource, error) { + ms := &particulates{} + err := json.Unmarshal(data, ms) + return ms, err } diff --git a/gateway/mon/temperature.go b/gateway/mon/temperature.go index c39c2b8..1402fe8 100644 --- a/gateway/mon/temperature.go +++ b/gateway/mon/temperature.go @@ -4,7 +4,6 @@ import ( "encoding/json" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" - "log" "time" ) @@ -15,7 +14,7 @@ type temperature struct { Temperature float64 `json:"temp"` } -func (t *temperature) toPoint() *influxdb2Write.Point { +func (t *temperature) ToPoint() *influxdb2Write.Point { return influxdb2.NewPoint("temperature", map[string]string{ "device": t.Device, @@ -27,11 +26,8 @@ func (t *temperature) toPoint() *influxdb2Write.Point { time.Unix(t.Time, 0)) } -func ParseTemperature(data []byte) (*influxdb2Write.Point, error) { - var temp temperature - if err := json.Unmarshal(data, &temp); err != nil { - log.Print("could not parse temp json: ", err) - return nil, err - } - return temp.toPoint(), nil +func ParseTemperature(data []byte) (PointSource, error) { + ms := &temperature{} + err := json.Unmarshal(data, ms) + return ms, err } diff --git a/gateway/mon/types.go b/gateway/mon/types.go new file mode 100644 index 0000000..04e1283 --- /dev/null +++ b/gateway/mon/types.go @@ -0,0 +1,9 @@ +package mon + +import ( + influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +type PointSource interface { + ToPoint() *influxdb2Write.Point +} diff --git a/gateway/mqtt/mqtt.go b/gateway/mqtt/mqtt.go index 4933541..9ecac3e 100644 --- a/gateway/mqtt/mqtt.go +++ b/gateway/mqtt/mqtt.go @@ -2,8 +2,8 @@ package mqtt import ( mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/hg/airmon/influx" "github.com/hg/airmon/mon" - influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" "github.com/pkg/errors" "log" "strings" @@ -51,12 +51,12 @@ func newMqttClient(settings *Settings, onConn mqtt.OnConnectHandler) mqtt.Client } type connHandler struct { - write chan<- *influxdb2Write.Point + sender *influx.MeasurementSender } type topic struct { topic string - mapper func(data []byte) (*influxdb2Write.Point, error) + mapper func(data []byte) (mon.PointSource, error) } var topics = []*topic{ @@ -65,10 +65,10 @@ var topics = []*topic{ {"meas/co2", mon.ParseCarbonDioxide}, } -func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Point) { +func subscribe(t *topic, client mqtt.Client, sender *influx.MeasurementSender) { token := client.Subscribe(t.topic, 0, func(client mqtt.Client, msg mqtt.Message) { - if point, err := t.mapper(msg.Payload()); err == nil { - writeCh <- point + if ms, err := t.mapper(msg.Payload()); err == nil { + sender.Send(ms.ToPoint()) } else { log.Print("could not parse ", t.topic, " data: ", err) } @@ -82,16 +82,16 @@ func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Poin func (h *connHandler) onConnect(client mqtt.Client) { for _, t := range topics { log.Print("subscribing to topic ", t.topic) - go subscribe(t, client, h.write) + go subscribe(t, client, h.sender) } } -func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error { +func StartMqtt(settings *Settings, sender *influx.MeasurementSender) error { if err := settings.validate(); err != nil { return err } - handler := connHandler{write} + handler := connHandler{sender} client := newMqttClient(settings, handler.onConnect) token := client.Connect()