Skip to content

Commit

Permalink
MQTT gateway: minor cleanups and optimizations
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 13, 2021
1 parent 9613627 commit 339a205
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 61 deletions.
20 changes: 14 additions & 6 deletions gateway/airkaz/airkaz.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package airkaz

import (
"encoding/json"
"github.com/hg/airmon/influx"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/pkg/errors"
"io/ioutil"
"log"
Expand Down Expand Up @@ -60,15 +60,18 @@ type measurement struct {
Hour string `json:"hour"`
}

func Collect(write chan<- *influxdb2Write.Point) {
func Collect(sender *influx.MeasurementSender) {
client := newProxiedClient()

lastUpdates := map[int64]Time{}

for {

if measurements, err := getResponse(client); err == nil {
log.Print("found ", len(measurements), " airkaz measurements")

toSave := make([]*measurement, len(measurements))

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
continue
Expand All @@ -77,9 +80,14 @@ func Collect(write chan<- *influxdb2Write.Point) {
continue
}
lastUpdates[meas.Id] = meas.Date

go saveMeasurement(meas, write)
toSave = append(toSave, &meas)
}

go func() {
for _, ms := range toSave {
saveMeasurement(ms, sender)
}
}()
} else {
log.Print("could not get response from airkaz: ", err)
}
Expand Down Expand Up @@ -113,14 +121,14 @@ func getResponse(client *http.Client) ([]measurement, error) {
return measurements, nil
}

func saveMeasurement(meas measurement, write chan<- *influxdb2Write.Point) {
func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) {
tags := map[string]string{
"city": meas.City,
"station": meas.Name,
}

save := func(kind string, fields map[string]interface{}) {
write <- influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)
sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time))
}

if meas.Pm25Curr != nil && meas.Pm10Curr != nil {
Expand Down
59 changes: 43 additions & 16 deletions gateway/influx/influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
"time"
)

type Settings struct {
Expand Down Expand Up @@ -34,28 +35,54 @@ func (s *Settings) validate() error {

func newClient(settings Settings) influxdb2Api.WriteAPIBlocking {
client := influxdb2.NewClient(settings.Uri, settings.Token)
writeApi := client.WriteAPIBlocking(settings.Org, settings.Bucket)
return writeApi
return client.WriteAPIBlocking(settings.Org, settings.Bucket)
}

func NewWriter(settings Settings) (chan<- *influxdb2Write.Point, error) {
if err := settings.validate(); err != nil {
return nil, err
type MeasurementSender struct {
api influxdb2Api.WriteAPIBlocking
ch chan *influxdb2Write.Point
}

func (ms *MeasurementSender) Send(point *influxdb2Write.Point) bool {
for retry := 0; ; retry++ {
select {
case ms.ch <- point:
return true

case <-time.After(5 * time.Second):
log.Print("timed out while trying to send measurement")

if retry >= 3 {
log.Print("could not send measurement, discarding point")
return false
}
<-ms.ch // drop the oldest measurement and retry
}
}
}

writeApi := newClient(settings)
writeCh := make(chan *influxdb2Write.Point, 2000)
func (ms *MeasurementSender) receive() {
ctx := context.Background()

go func() {
for point := range writeCh {
if err := writeApi.WritePoint(ctx, point); err == nil {
log.Print("point written")
} else {
log.Print("could not write point: ", err)
}
for point := range ms.ch {
if err := ms.api.WritePoint(ctx, point); err == nil {
log.Print("point written")
} else {
log.Print("could not write point: ", err)
}
}()
}
}

func NewWriter(settings Settings) (*MeasurementSender, error) {
if err := settings.validate(); err != nil {
return nil, err
}

sender := &MeasurementSender{
ch: make(chan *influxdb2Write.Point, 2000),
api: newClient(settings),
}
go sender.receive()

return writeCh, nil
return sender, nil
}
6 changes: 3 additions & 3 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ func main() {

flag.Parse()

writeCh, err := influx.NewWriter(infs)
sender, err := influx.NewWriter(infs)
if err != nil {
log.Fatal("could not prepare InfluxDB writer: ", err)
}

go airkaz.Collect(writeCh)
go airkaz.Collect(sender)

if err = mqtt.StartMqtt(&mqts, writeCh); err != nil {
if err = mqtt.StartMqtt(&mqts, sender); err != nil {
log.Fatal("could not create MQTT client: ", err)
}

Expand Down
14 changes: 5 additions & 9 deletions gateway/mon/co2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
"time"
)

Expand All @@ -15,7 +14,7 @@ type co2 struct {
Co2 uint16 `json:"co2"`
}

func (t *co2) toPoint() *influxdb2Write.Point {
func (t *co2) ToPoint() *influxdb2Write.Point {
return influxdb2.NewPoint("co2",
map[string]string{
"device": t.Device,
Expand All @@ -27,11 +26,8 @@ func (t *co2) toPoint() *influxdb2Write.Point {
time.Unix(t.Time, 0))
}

func ParseCarbonDioxide(data []byte) (*influxdb2Write.Point, error) {
var co2 co2
if err := json.Unmarshal(data, &co2); err != nil {
log.Print("could not parse co2 json: ", err)
return nil, err
}
return co2.toPoint(), nil
func ParseCarbonDioxide(data []byte) (PointSource, error) {
ms := &co2{}
err := json.Unmarshal(data, ms)
return ms, err
}
14 changes: 5 additions & 9 deletions gateway/mon/pm.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
"time"
)

Expand All @@ -30,7 +29,7 @@ type particulates struct {
} `json:"cnt"`
}

func (t *particulates) toPoint() *influxdb2Write.Point {
func (t *particulates) ToPoint() *influxdb2Write.Point {
return influxdb2.NewPoint("particulates",
map[string]string{
"device": t.Device,
Expand All @@ -53,11 +52,8 @@ func (t *particulates) toPoint() *influxdb2Write.Point {
time.Unix(t.Time, 0))
}

func ParseParticulates(data []byte) (*influxdb2Write.Point, error) {
var part particulates
if err := json.Unmarshal(data, &part); err != nil {
log.Print("could not parse particulates json: ", err)
return nil, err
}
return part.toPoint(), nil
func ParseParticulates(data []byte) (PointSource, error) {
ms := &particulates{}
err := json.Unmarshal(data, ms)
return ms, err
}
14 changes: 5 additions & 9 deletions gateway/mon/temperature.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/json"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
"time"
)

Expand All @@ -15,7 +14,7 @@ type temperature struct {
Temperature float64 `json:"temp"`
}

func (t *temperature) toPoint() *influxdb2Write.Point {
func (t *temperature) ToPoint() *influxdb2Write.Point {
return influxdb2.NewPoint("temperature",
map[string]string{
"device": t.Device,
Expand All @@ -27,11 +26,8 @@ func (t *temperature) toPoint() *influxdb2Write.Point {
time.Unix(t.Time, 0))
}

func ParseTemperature(data []byte) (*influxdb2Write.Point, error) {
var temp temperature
if err := json.Unmarshal(data, &temp); err != nil {
log.Print("could not parse temp json: ", err)
return nil, err
}
return temp.toPoint(), nil
func ParseTemperature(data []byte) (PointSource, error) {
ms := &temperature{}
err := json.Unmarshal(data, ms)
return ms, err
}
9 changes: 9 additions & 0 deletions gateway/mon/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package mon

import (
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
)

type PointSource interface {
ToPoint() *influxdb2Write.Point
}
18 changes: 9 additions & 9 deletions gateway/mqtt/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package mqtt

import (
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/hg/airmon/influx"
"github.com/hg/airmon/mon"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/pkg/errors"
"log"
"strings"
Expand Down Expand Up @@ -51,12 +51,12 @@ func newMqttClient(settings *Settings, onConn mqtt.OnConnectHandler) mqtt.Client
}

type connHandler struct {
write chan<- *influxdb2Write.Point
sender *influx.MeasurementSender
}

type topic struct {
topic string
mapper func(data []byte) (*influxdb2Write.Point, error)
mapper func(data []byte) (mon.PointSource, error)
}

var topics = []*topic{
Expand All @@ -65,10 +65,10 @@ var topics = []*topic{
{"meas/co2", mon.ParseCarbonDioxide},
}

func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Point) {
func subscribe(t *topic, client mqtt.Client, sender *influx.MeasurementSender) {
token := client.Subscribe(t.topic, 0, func(client mqtt.Client, msg mqtt.Message) {
if point, err := t.mapper(msg.Payload()); err == nil {
writeCh <- point
if ms, err := t.mapper(msg.Payload()); err == nil {
sender.Send(ms.ToPoint())
} else {
log.Print("could not parse ", t.topic, " data: ", err)
}
Expand All @@ -82,16 +82,16 @@ func subscribe(t *topic, client mqtt.Client, writeCh chan<- *influxdb2Write.Poin
func (h *connHandler) onConnect(client mqtt.Client) {
for _, t := range topics {
log.Print("subscribing to topic ", t.topic)
go subscribe(t, client, h.write)
go subscribe(t, client, h.sender)
}
}

func StartMqtt(settings *Settings, write chan<- *influxdb2Write.Point) error {
func StartMqtt(settings *Settings, sender *influx.MeasurementSender) error {
if err := settings.validate(); err != nil {
return err
}

handler := connHandler{write}
handler := connHandler{sender}
client := newMqttClient(settings, handler.onConnect)

token := client.Connect()
Expand Down

0 comments on commit 339a205

Please sign in to comment.