Skip to content

Commit

Permalink
Cleanups in MQTT gateway
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Dec 4, 2020
1 parent c326f17 commit f9ce460
Show file tree
Hide file tree
Showing 8 changed files with 440 additions and 346 deletions.
148 changes: 148 additions & 0 deletions gateway/airkaz/airkaz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package airkaz

import (
"encoding/json"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/pkg/errors"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"time"
)

var dataRe = regexp.MustCompile(`(?si)<script.*>.*sensors_data\s*=\s*(\[.+])</script`)

const timeFormat = "2006-01-02 15:04:05"

type Time struct {
time.Time
}

var stationLocation *time.Location

func init() {
var err error
if stationLocation, err = time.LoadLocation("Asia/Almaty"); err != nil {
log.Fatal("could not find airkaz timezone")
}
}

func (t *Time) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), `"`)
if s == "null" {
t.Time = time.Time{}
return nil
}
t.Time, err = time.ParseInLocation(timeFormat, s, stationLocation)
return err
}

type measurement struct {
Id int64 `json:"id,string"`
City string `json:"city"`
Name string `json:"name"`
Lat float64 `json:"lat,string"`
Lng float64 `json:"lng,string"`
Pm10Curr *float64 `json:"pm10,string"`
Pm10Day *float64 `json:"pm10ss,string"`
Pm25Curr *float64 `json:"pm25,string"`
Pm25Day *float64 `json:"pm25ss,string"`
TempCurr *float64 `json:"temp,string"`
TempDay *float64 `json:"tempss,string"`
Humid *float64 `json:"humid,string"`
Press *float64 `json:"press,string"`
Error int64 `json:"error,string"`
Status string `json:"status"`
Date Time `json:"date"`
Hour string `json:"hour"`
}

func Collect(write chan<- *influxdb2Write.Point) {
client := newProxiedClient()

lastUpdates := map[int64]Time{}

for {
if measurements, err := getResponse(client); err == nil {
log.Print("found ", len(measurements), " airkaz measurements")

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
continue
}
if last, ok := lastUpdates[meas.Id]; ok && last == meas.Date {
continue
}
lastUpdates[meas.Id] = meas.Date

go saveMeasurement(meas, write)
}
} else {
log.Print("could not get response from airkaz: ", err)
}

time.Sleep(5 * time.Minute)
}
}

func getResponse(client *http.Client) ([]measurement, error) {
resp, err := client.Get("https://airkaz.org/")
if err != nil {
return nil, errors.Wrap(err, "airkaz get failed")
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "could not read response: ")
}

matches := dataRe.FindSubmatch(body)
if matches == nil {
return nil, errors.Wrap(err, "measurement json not found in response")
}

var measurements []measurement
if err = json.Unmarshal(matches[1], &measurements); err != nil {
return nil, errors.Wrap(err, "could not parse response: ")
}

return measurements, nil
}

func saveMeasurement(meas measurement, write chan<- *influxdb2Write.Point) {
tags := map[string]string{
"city": meas.City,
"station": meas.Name,
}

save := func(kind string, fields map[string]interface{}) {
write <- influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)
}

if meas.Pm25Curr != nil && meas.Pm10Curr != nil {
save("airkaz:particulates", map[string]interface{}{
"pm25": uint16(*meas.Pm25Curr),
"pm10": uint16(*meas.Pm10Curr),
})
}

tempData := map[string]interface{}{}

if meas.TempCurr != nil {
tempData["temperature"] = *meas.TempCurr
}
if meas.Humid != nil {
tempData["humidity"] = *meas.Humid
}
if meas.Press != nil {
tempData["pressure"] = *meas.Press
}

if len(tempData) > 0 {
save("airkaz:temperature", tempData)
}
}
26 changes: 26 additions & 0 deletions gateway/airkaz/net.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package airkaz

import (
"context"
"golang.org/x/net/proxy"
"net"
"net/http"
"time"
)

func newProxiedClient() *http.Client {
proxyDialer := proxy.FromEnvironmentUsing(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
})
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return proxyDialer.Dial(network, addr)
},
MaxIdleConns: 1,
IdleConnTimeout: 1 * time.Minute,
TLSHandshakeTimeout: 10 * time.Second,
},
}
}
1 change: 1 addition & 0 deletions gateway/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ go 1.15
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/influxdata/influxdb-client-go/v2 v2.2.0
github.com/pkg/errors v0.9.1
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
)
61 changes: 61 additions & 0 deletions gateway/influx/influx.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package influx

import (
"context"
"errors"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"log"
)

type Settings struct {
Uri string
Org string
Bucket string
Token string
}

func (s *Settings) validate() error {
if s.Uri == "" {
return errors.New("InfluxDB URI not set")
}
if s.Org == "" {
return errors.New("InfluxDB organization not set")
}
if s.Bucket == "" {
return errors.New("InfluxDB bucket is empty")
}
if s.Token == "" {
log.Print("InfluxDB token is empty, set it if you see authentication errors")
}
return nil
}

func newClient(settings Settings) influxdb2Api.WriteAPIBlocking {
client := influxdb2.NewClient(settings.Uri, settings.Token)
writeApi := client.WriteAPIBlocking(settings.Org, settings.Bucket)
return writeApi
}

func NewWriter(settings Settings) (chan<- *influxdb2Write.Point, error) {
if err := settings.validate(); err != nil {
return nil, err
}

writeApi := newClient(settings)
writeCh := make(chan *influxdb2Write.Point, 2000)
ctx := context.Background()

go func() {
for point := range writeCh {
if err := writeApi.WritePoint(ctx, point); err == nil {
log.Print("point written")
} else {
log.Print("could not write point: ", err)
}
}
}()

return writeCh, nil
}
Loading

0 comments on commit f9ce460

Please sign in to comment.