Skip to content

Commit

Permalink
Minor cleanups in airkaz collector
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 18, 2021
1 parent f64fb16 commit 30a8072
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 41 deletions.
126 changes: 87 additions & 39 deletions gateway/airkaz/airkaz.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"github.com/hg/airmon/influx"
"github.com/hg/airmon/net"
"github.com/hg/airmon/storage"
"github.com/hg/airmon/tm"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/pkg/errors"
Expand Down Expand Up @@ -34,42 +35,87 @@ type measurement struct {
Hour string `json:"hour"`
}

type lastUpdates map[int64]tm.Time

type collector struct {
sender *influx.MeasurementSender
client *net.Client
lastUpdates lastUpdates
}

const lastUpdatesFilename = "airkaz-times.json"

func Collect(sender *influx.MeasurementSender) {
client := net.NewProxiedClient()
lastUpdates := map[int64]tm.Time{}
col := collector{
sender: sender,
client: net.NewProxiedClient(),
lastUpdates: lastUpdates{},
}

if last := loadLastUpdates(); last != nil {
col.lastUpdates = *last
}

for {
if measurements, err := getResponse(client); err == nil {
log.Print("found ", len(measurements), " airkaz measurements")

toSave := make([]*measurement, len(measurements))

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
continue
}
if last, ok := lastUpdates[meas.Id]; ok && last == meas.Date {
continue
}
lastUpdates[meas.Id] = meas.Date
toSave = append(toSave, meas)
}

go func() {
for _, ms := range toSave {
saveMeasurement(ms, sender)
}
}()
} else {
log.Print("could not get response from airkaz: ", err)
if err := col.run(); err != nil {
log.Print("could not save airkaz data: ", err)
}

time.Sleep(5 * time.Minute)
}
}

func getResponse(client *net.Client) ([]*measurement, error) {
body, err := client.Get("https://airkaz.org/")
func loadLastUpdates() *lastUpdates {
var lu *lastUpdates

err := storage.Load(lastUpdatesFilename, &lu)
if err != nil {
lu = nil
log.Print("could not load airkaz update times: ", err)
}

return lu
}

func (c *collector) saveLastUpdates() {
if err := storage.Save(lastUpdatesFilename, c.lastUpdates); err != nil {
log.Print("could not save airkaz update times: ", err)
}
}

func (c *collector) run() error {
measurements, err := c.getResponse()
if err != nil {
return err
}

log.Print("found ", len(measurements), " airkaz measurements")

toSave := make([]measurement, len(measurements))

for _, meas := range measurements {
if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" {
continue
}
if last, ok := c.lastUpdates[meas.Id]; ok && !meas.Date.After(last.Time) {
continue
}
c.lastUpdates[meas.Id] = meas.Date
toSave = append(toSave, meas)
}

go c.saveLastUpdates()

go func() {
for _, ms := range toSave {
c.saveMeasurement(&ms)
}
}()

return nil
}

func (c *collector) getResponse() ([]measurement, error) {
body, err := c.client.Get("https://airkaz.org/")
if err != nil {
return nil, err
}
Expand All @@ -79,33 +125,36 @@ func getResponse(client *net.Client) ([]*measurement, error) {
return nil, errors.Wrap(err, "measurement json not found in response")
}

var measurements []*measurement
var measurements []measurement
if err = json.Unmarshal(matches[1], &measurements); err != nil {
return nil, errors.Wrap(err, "could not parse response: ")
}

return measurements, nil
}

func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) {
func (c *collector) saveMeasurement(meas *measurement) {
tags := map[string]string{
"city": meas.City,
"station": meas.Name,
}

save := func(kind string, fields map[string]interface{}) {
sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time))
c.sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time))
}

if meas.Pm25Curr != nil && meas.Pm10Curr != nil {
save("airkaz:particulates", map[string]interface{}{
"pm25": uint16(*meas.Pm25Curr),
"pm10": uint16(*meas.Pm10Curr),
})
pmData := make(map[string]interface{})
if meas.Pm25Curr != nil {
pmData["pm25"] = uint16(*meas.Pm25Curr)
}
if meas.Pm10Curr != nil {
pmData["pm10"] = uint16(*meas.Pm10Curr)
}
if len(pmData) > 0 {
save("airkaz:particulates", pmData)
}

tempData := map[string]interface{}{}

tempData := make(map[string]interface{})
if meas.TempCurr != nil {
tempData["temperature"] = *meas.TempCurr
}
Expand All @@ -115,7 +164,6 @@ func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) {
if meas.Press != nil {
tempData["pressure"] = *meas.Press
}

if len(tempData) > 0 {
save("airkaz:temperature", tempData)
}
Expand Down
2 changes: 0 additions & 2 deletions gateway/settings/settings.go
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
package settings


8 changes: 8 additions & 0 deletions gateway/tm/tm.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ func init() {
}
}

func (t *Time) MarshalJSON() ([]byte, error) {
if t == nil {
return []byte("null"), nil
}
formatted := t.Time.Format(timeFormat)
return []byte(formatted), nil
}

func (t *Time) UnmarshalJSON(b []byte) (err error) {
s := strings.Trim(string(b), `"`)
if s == "null" {
Expand Down

0 comments on commit 30a8072

Please sign in to comment.