diff --git a/gateway/airkaz/airkaz.go b/gateway/airkaz/airkaz.go index 484718f..4ee4bb2 100644 --- a/gateway/airkaz/airkaz.go +++ b/gateway/airkaz/airkaz.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/hg/airmon/influx" "github.com/hg/airmon/net" + "github.com/hg/airmon/storage" "github.com/hg/airmon/tm" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/pkg/errors" @@ -34,42 +35,87 @@ type measurement struct { Hour string `json:"hour"` } +type lastUpdates map[int64]tm.Time + +type collector struct { + sender *influx.MeasurementSender + client *net.Client + lastUpdates lastUpdates +} + +const lastUpdatesFilename = "airkaz-times.json" + func Collect(sender *influx.MeasurementSender) { - client := net.NewProxiedClient() - lastUpdates := map[int64]tm.Time{} + col := collector{ + sender: sender, + client: net.NewProxiedClient(), + lastUpdates: lastUpdates{}, + } + + if last := loadLastUpdates(); last != nil { + col.lastUpdates = *last + } for { - if measurements, err := getResponse(client); err == nil { - log.Print("found ", len(measurements), " airkaz measurements") - - toSave := make([]*measurement, len(measurements)) - - for _, meas := range measurements { - if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" { - continue - } - if last, ok := lastUpdates[meas.Id]; ok && last == meas.Date { - continue - } - lastUpdates[meas.Id] = meas.Date - toSave = append(toSave, meas) - } - - go func() { - for _, ms := range toSave { - saveMeasurement(ms, sender) - } - }() - } else { - log.Print("could not get response from airkaz: ", err) + if err := col.run(); err != nil { + log.Print("could not save airkaz data: ", err) } - time.Sleep(5 * time.Minute) } } -func getResponse(client *net.Client) ([]*measurement, error) { - body, err := client.Get("https://airkaz.org/") +func loadLastUpdates() *lastUpdates { + var lu *lastUpdates + + err := storage.Load(lastUpdatesFilename, &lu) + if err != nil { + lu = nil + log.Print("could not load airkaz update times: ", err) + } + + return lu +} + +func (c *collector) saveLastUpdates() { + if err := storage.Save(lastUpdatesFilename, c.lastUpdates); err != nil { + log.Print("could not save airkaz update times: ", err) + } +} + +func (c *collector) run() error { + measurements, err := c.getResponse() + if err != nil { + return err + } + + log.Print("found ", len(measurements), " airkaz measurements") + + toSave := make([]measurement, len(measurements)) + + for _, meas := range measurements { + if meas.Error != 0 || meas.Status != "active" || meas.Hour != "now" { + continue + } + if last, ok := c.lastUpdates[meas.Id]; ok && !meas.Date.After(last.Time) { + continue + } + c.lastUpdates[meas.Id] = meas.Date + toSave = append(toSave, meas) + } + + go c.saveLastUpdates() + + go func() { + for _, ms := range toSave { + c.saveMeasurement(&ms) + } + }() + + return nil +} + +func (c *collector) getResponse() ([]measurement, error) { + body, err := c.client.Get("https://airkaz.org/") if err != nil { return nil, err } @@ -79,7 +125,7 @@ func getResponse(client *net.Client) ([]*measurement, error) { return nil, errors.Wrap(err, "measurement json not found in response") } - var measurements []*measurement + var measurements []measurement if err = json.Unmarshal(matches[1], &measurements); err != nil { return nil, errors.Wrap(err, "could not parse response: ") } @@ -87,25 +133,28 @@ func getResponse(client *net.Client) ([]*measurement, error) { return measurements, nil } -func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) { +func (c *collector) saveMeasurement(meas *measurement) { tags := map[string]string{ "city": meas.City, "station": meas.Name, } save := func(kind string, fields map[string]interface{}) { - sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)) + c.sender.Send(influxdb2.NewPoint(kind, tags, fields, meas.Date.Time)) } - if meas.Pm25Curr != nil && meas.Pm10Curr != nil { - save("airkaz:particulates", map[string]interface{}{ - "pm25": uint16(*meas.Pm25Curr), - "pm10": uint16(*meas.Pm10Curr), - }) + pmData := make(map[string]interface{}) + if meas.Pm25Curr != nil { + pmData["pm25"] = uint16(*meas.Pm25Curr) + } + if meas.Pm10Curr != nil { + pmData["pm10"] = uint16(*meas.Pm10Curr) + } + if len(pmData) > 0 { + save("airkaz:particulates", pmData) } - tempData := map[string]interface{}{} - + tempData := make(map[string]interface{}) if meas.TempCurr != nil { tempData["temperature"] = *meas.TempCurr } @@ -115,7 +164,6 @@ func saveMeasurement(meas *measurement, sender *influx.MeasurementSender) { if meas.Press != nil { tempData["pressure"] = *meas.Press } - if len(tempData) > 0 { save("airkaz:temperature", tempData) } diff --git a/gateway/settings/settings.go b/gateway/settings/settings.go index 17843cf..0e66fdf 100644 --- a/gateway/settings/settings.go +++ b/gateway/settings/settings.go @@ -1,3 +1 @@ package settings - - diff --git a/gateway/tm/tm.go b/gateway/tm/tm.go index 34c511a..33d01f5 100644 --- a/gateway/tm/tm.go +++ b/gateway/tm/tm.go @@ -21,6 +21,14 @@ func init() { } } +func (t *Time) MarshalJSON() ([]byte, error) { + if t == nil { + return []byte("null"), nil + } + formatted := t.Time.Format(timeFormat) + return []byte(formatted), nil +} + func (t *Time) UnmarshalJSON(b []byte) (err error) { s := strings.Trim(string(b), `"`) if s == "null" {