Skip to content

Commit

Permalink
Add outside air pollution data collection
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Dec 3, 2020
1 parent 930257e commit c326f17
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 1 deletion.
2 changes: 1 addition & 1 deletion gateway/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ go 1.15
require (
github.com/eclipse/paho.mqtt.golang v1.2.0
github.com/influxdata/influxdb-client-go/v2 v2.2.0
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b // indirect
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
)
163 changes: 163 additions & 0 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@ import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"regexp"
"strings"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api"
influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
"golang.org/x/net/proxy"
)

var reAirkaz = regexp.MustCompile(`<script>.*sensors_data\s*=\s*(\[.+\])</script>`)

type temperature struct {
Device string `json:"dev"`
Time int64 `json:"time"`
Expand Down Expand Up @@ -114,7 +122,160 @@ func saveParticulates(write influxdb2Api.WriteAPIBlocking, data []byte) error {
return nil
}

const airkazTimeFormat = "2006-01-02 15:04:05"

type AirkazTime struct {
time.Time
}

var airkazLocation *time.Location

func (t *AirkazTime) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), `"`)
if s == "null" {
t.Time = time.Time{}
return nil
}
t.Time, err = time.ParseInLocation(airkazTimeFormat, s, airkazLocation)
return err
}

type AirkazInfo struct {
Id int64 `json:"id,string"`
City string `json:"city"`
Name string `json:"name"`
Lat float64 `json:"lat,string"`
Lng float64 `json:"lng,string"`
Pm10Curr *float64 `json:"pm10,string"`
Pm10Day *float64 `json:"pm10ss,string"`
Pm25Curr *float64 `json:"pm25,string"`
Pm25Day *float64 `json:"pm25ss,string"`
TempCurr *float64 `json:"temp,string"`
TempDay *float64 `json:"tempss,string"`
Humid *float64 `json:"humid,string"`
Press *float64 `json:"press,string"`
Error int64 `json:"error,string"`
Status string `json:"status"`
Date AirkazTime `json:"date"`
Hour string `json:"hour"`
}

func newProxiedClient() *http.Client {
proxyDialer := proxy.FromEnvironmentUsing(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
})
return &http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return proxyDialer.Dial(network, addr)
},
MaxIdleConns: 1,
IdleConnTimeout: 1 * time.Minute,
TLSHandshakeTimeout: 10 * time.Second,
},
}
}

func saveAirkazPoints(meas AirkazInfo, write influxdb2Api.WriteAPIBlocking) {
tags := map[string]string{
"city": meas.City,
"station": meas.Name,
}

save := func(kind string, fields map[string]interface{}) {
point := influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)
err := write.WritePoint(context.Background(), point)

if err == nil {
log.Print("airkaz ", kind, " point written: ", meas.City, " -> ", meas.Name)
} else {
log.Print("could not save airkaz ", kind, " point: ", err)
}
}

if meas.Pm25Curr != nil && meas.Pm10Curr != nil {
save("airkaz:particulates", map[string]interface{}{
"pm25": uint16(*meas.Pm25Curr),
"pm10": uint16(*meas.Pm10Curr),
})
}

tempData := map[string]interface{}{}

if meas.TempCurr != nil {
tempData["temperature"] = *meas.TempCurr
}
if meas.Humid != nil {
tempData["humidity"] = *meas.Humid
}
if meas.Press != nil {
tempData["pressure"] = *meas.Press
}

if len(tempData) > 0 {
save("airkaz:temperature", tempData)
}
}

func collectAirkaz(write influxdb2Api.WriteAPIBlocking) {
client := newProxiedClient()

lastUpdates := map[int64]AirkazTime{}

for {
time.Sleep(5 * time.Second)

resp, err := client.Get("https://airkaz.org/")
if err != nil {
log.Print("airkaz get failed: ", err)
continue
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Print("could not read response: ", err)
continue
}

matches := reAirkaz.FindSubmatch(body)
if matches == nil {
log.Print("measurement json not found in response")
continue
}

var measurements []AirkazInfo
if err = json.Unmarshal(matches[1], &measurements); err != nil {
log.Print("could not parse response: ", err)
continue
}

log.Print("found ", len(measurements), " airkaz measurements")

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
continue
}
if last, ok := lastUpdates[meas.Id]; ok && last == meas.Date {
continue
}
lastUpdates[meas.Id] = meas.Date

go saveAirkazPoints(meas, write)
}

time.Sleep(5 * time.Minute)
}
}

func main() {
var err error
airkazLocation, err = time.LoadLocation("Asia/Almaty")
if err != nil {
log.Fatal("could not find airkaz timezone")
}

mqttBroker := flag.String("mqtt.broker", "tcp://localhost:1883", "the broker URI")
mqttUser := flag.String("mqtt.user", "", "MQTT username")
mqttPassword := flag.String("mqtt.pass", "", "MQTT password")
Expand Down Expand Up @@ -160,6 +321,8 @@ func main() {
log.Fatal("empty broker URI")
}

go collectAirkaz(write)

opts := mqtt.NewClientOptions()
opts.SetResumeSubs(true)
opts.AddBroker(*mqttBroker)
Expand Down

0 comments on commit c326f17

Please sign in to comment.