Skip to content

Commit

Permalink
Add support for ceb pollution data source
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 14, 2021
1 parent 339a205 commit dec5130
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 36 deletions.
42 changes: 8 additions & 34 deletions gateway/airkaz/airkaz.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,19 @@ package airkaz
import (
"encoding/json"
"github.com/hg/airmon/influx"
"github.com/hg/airmon/net"
"github.com/hg/airmon/tm"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/pkg/errors"
"io/ioutil"
"log"
"net/http"
"regexp"
"strings"
"time"
)

var dataRe = regexp.MustCompile(`(?si)<script.*>.*sensors_data\s*=\s*(\[.+])</script`)

const timeFormat = "2006-01-02 15:04:05"

type Time struct {
time.Time
}

var stationLocation *time.Location

func init() {
var err error
if stationLocation, err = time.LoadLocation("Asia/Almaty"); err != nil {
log.Fatal("could not find airkaz timezone")
}
}

func (t *Time) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), `"`)
if s == "null" {
t.Time = time.Time{}
return nil
}
t.Time, err = time.ParseInLocation(timeFormat, s, stationLocation)
return err
}

type measurement struct {
Id int64 `json:"id,string"`
City string `json:"city"`
Expand All @@ -56,21 +32,19 @@ type measurement struct {
Press *float64 `json:"press,string"`
Error int64 `json:"error,string"`
Status string `json:"status"`
Date Time `json:"date"`
Date tm.Time `json:"date"`
Hour string `json:"hour"`
}

func Collect(sender *influx.MeasurementSender) {
client := newProxiedClient()

lastUpdates := map[int64]Time{}
client := net.NewProxiedClient()
lastUpdates := map[int64]tm.Time{}

for {

if measurements, err := getResponse(client); err == nil {
log.Print("found ", len(measurements), " airkaz measurements")

toSave := make([]*measurement, len(measurements))
toSave := make([]measurement, len(measurements))

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
Expand All @@ -80,7 +54,7 @@ func Collect(sender *influx.MeasurementSender) {
continue
}
lastUpdates[meas.Id] = meas.Date
toSave = append(toSave, &meas)
toSave = append(toSave, meas)
}

go func() {
Expand Down Expand Up @@ -121,7 +95,7 @@ func getResponse(client *http.Client) ([]measurement, error) {
return measurements, nil
}

func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) {
func saveMeasurement(meas measurement, sender *influx.MeasurementSender) {
tags := map[string]string{
"city": meas.City,
"station": meas.Name,
Expand Down
101 changes: 101 additions & 0 deletions gateway/ceb/ceb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package ceb

import (
"encoding/json"
"github.com/hg/airmon/influx"
"github.com/hg/airmon/net"
"github.com/hg/airmon/tm"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/pkg/errors"
"io/ioutil"
"log"
"net/http"
"strings"
"time"
)

type measurement struct {
Id string `json:"id"`
Title string `json:"post_title"`
Address string `json:"address"`
Lat float64 `json:"lat,string"`
Lon float64 `json:"lon,string"`
Pollutant string `json:"desc"`
PollutantFull string `json:"substance"`
ValueMg float64 `json:"value"`
Date tm.Time `json:"cdate"`
}

func Collect(sender *influx.MeasurementSender) {
client := net.NewProxiedClient()
lastAt := time.Time{}

for {
if measurements, err := getResponse(client); err == nil {
latest := time.Time{}
toSave := make([]measurement, len(measurements))

log.Print("found ", len(measurements), " ceb measurements")

for _, meas := range measurements {
if meas.Date.After(lastAt) {
latest = meas.Date.Time
toSave = append(toSave, meas)
}
}

if !latest.IsZero() {
lastAt = latest
}

go func() {
for _, ms := range toSave {
saveMeasurement(ms, sender)
}
}()
} else {
log.Print("could not load data: ", err)
}

time.Sleep(5 * time.Minute)
}
}

func saveMeasurement(ms measurement, sender *influx.MeasurementSender) {
endOfFormula := strings.Index(ms.PollutantFull, "-")
if endOfFormula <= 0 {
return
}

tags := map[string]string{
"station": ms.Title,
"address": ms.Address,
"formula": ms.PollutantFull[:endOfFormula],
"pollutant": ms.Pollutant,
}

fields := map[string]interface{}{
"level_ug": ms.ValueMg * 1000,
"lat": ms.Lat,
"lon": ms.Lon,
}

sender.Send(influxdb2.NewPoint("ceb", tags, fields, ms.Date.Time))
}

func getResponse(client *http.Client) ([]measurement, error) {
resp, err := client.Get("https://ceb-uk.kz/map/ajax.php?markers")
if err != nil {
return nil, errors.Wrap(err, "data fetch failed")
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "could not read response")
}

var measurements []measurement
err = json.Unmarshal(body, &measurements)
return measurements, err
}
2 changes: 2 additions & 0 deletions gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"github.com/hg/airmon/airkaz"
"github.com/hg/airmon/ceb"
"github.com/hg/airmon/influx"
"github.com/hg/airmon/mqtt"
"log"
Expand All @@ -28,6 +29,7 @@ func main() {
}

go airkaz.Collect(sender)
go ceb.Collect(sender)

if err = mqtt.StartMqtt(&mqts, sender); err != nil {
log.Fatal("could not create MQTT client: ", err)
Expand Down
4 changes: 2 additions & 2 deletions gateway/airkaz/net.go → gateway/net/net.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package airkaz
package net

import (
"context"
Expand All @@ -8,7 +8,7 @@ import (
"time"
)

func newProxiedClient() *http.Client {
func NewProxiedClient() *http.Client {
proxyDialer := proxy.FromEnvironmentUsing(&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
Expand Down
32 changes: 32 additions & 0 deletions gateway/tm/tm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package tm

import (
"log"
"strings"
"time"
)

const timeFormat = "2006-01-02 15:04:05"

type Time struct {
time.Time
}

var location *time.Location

func init() {
var err error
if location, err = time.LoadLocation("Asia/Almaty"); err != nil {
log.Fatal("could not find timezone")
}
}

func (t *Time) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), `"`)
if s == "null" {
t.Time = time.Time{}
return nil
}
t.Time, err = time.ParseInLocation(timeFormat, s, location)
return err
}

0 comments on commit dec5130

Please sign in to comment.