Skip to content

Commit

Permalink
Save last measurement times in kazhydromet source
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 17, 2021
1 parent 9b6a764 commit f64fb16
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 24 deletions.
79 changes: 55 additions & 24 deletions gateway/kazhydromet/kazhydromet.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ package kazhydromet
import (
"github.com/hg/airmon/influx"
"github.com/hg/airmon/net"
"github.com/hg/airmon/storage"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/pkg/errors"
"log"
"strconv"
"time"
)

const timeFilename = "kazhydromet-times.json"

type measurement struct {
Id string `json:"id"`
Value float64 `json:"value"`
Expand Down Expand Up @@ -47,43 +50,71 @@ type entry struct {
st *station
}

type measTime struct {
LastAt map[string]time.Time `json:"lastAt"`
LastRunAt time.Time `json:"lastRunAt"`
}

type collector struct {
client *net.Client
sender *influx.MeasurementSender
stations map[int64]*station
lastAt map[string]time.Time
lastRunAt time.Time
client *net.Client
sender *influx.MeasurementSender
stations map[int64]*station
time *measTime
}

func lastMapKey(ms *measurement) string {
return strconv.FormatInt(ms.StationId, 10) + "/" + ms.Code
}

func (c *collector) run() error {
entries, err := c.loadData()
if err != nil {
return err
}
return c.saveData(entries)
}

func Collect(sender *influx.MeasurementSender) {
c := collector{
client: net.NewProxiedClient(),
sender: sender,
stations: make(map[int64]*station),
lastAt: make(map[string]time.Time),
time: &measTime{
LastAt: make(map[string]time.Time),
},
}

if savedTime := loadSavedTime(); savedTime != nil {
c.time = savedTime
}

for {
if err := c.run(); err != nil {
log.Println("could not save kazhydromet data: ", err)
entries, err := c.loadData()
if err == nil {
go saveTime(c.time)
err = c.saveData(entries)
}
if err != nil {
log.Print("could not save kazhydromet data: ", err)
}
time.Sleep(20 * time.Minute)
}
}

func saveTime(ms *measTime) {
err := storage.Save(timeFilename, ms)
if err != nil {
log.Print("could not save kazhydromet measurement times: ", err)
}
}

func loadSavedTime() *measTime {
var mt *measTime

err := storage.Load(timeFilename, &mt)
if err == nil {
return mt
}

log.Print("could not parse kazhydromet measurement times: ", err)
return nil
}

func (c *collector) saveData(entries []entry) error {
failed := 0

for _, ent := range entries {
tags := map[string]string{
"city": ent.st.CityRu,
Expand All @@ -102,12 +133,12 @@ func (c *collector) saveData(entries []entry) error {

point := influxdb2.NewPoint("kazhydromet", tags, fields, ent.ms.Date)

if ok := c.sender.Send(point); !ok {
return errors.New("could not save point to db")
if !c.sender.Send(point) {
failed++
}
}

return nil
return errors.New("could not save " + strconv.Itoa(failed) + " points to db")
}

func (c *collector) loadData() ([]entry, error) {
Expand Down Expand Up @@ -146,10 +177,10 @@ func (c *collector) loadData() ([]entry, error) {

key := lastMapKey(meas)

if lastAt, ok := c.lastAt[key]; ok && !meas.Date.After(lastAt) {
if lastAt, ok := c.time.LastAt[key]; ok && !meas.Date.After(lastAt) {
continue
}
c.lastAt[key] = meas.Date
c.time.LastAt[key] = meas.Date

entries = append(entries, entry{ms: meas, st: stat})
}
Expand All @@ -161,15 +192,15 @@ func (c *collector) loadMeasurements() ([]*measurement, error) {
startedAt := time.Now()

url := "http://atmosphera.kz:4004/averages"
if !c.lastRunAt.IsZero() {
url += "?after=" + c.lastRunAt.UTC().Format(time.RFC3339)
if !c.time.LastRunAt.IsZero() {
url += "?after=" + c.time.LastRunAt.UTC().Format(time.RFC3339)
}

var measurements []*measurement

err := c.client.GetJSON(url, &measurements)
if err == nil {
c.lastRunAt = startedAt
c.time.LastRunAt = startedAt
}

return measurements, err
Expand Down
51 changes: 51 additions & 0 deletions gateway/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package storage

import (
"encoding/json"
"io/ioutil"
"os"
"path"
)

func getPath(filename string) (string, error) {
dir := os.Getenv("XDG_CONFIG_HOME")
if dir == "" {
dir = "/var/lib"
}
dir = path.Join(dir, "airmon")

err := os.MkdirAll(dir, 0750)
if err != nil {
return "", err
}

return path.Join(dir, filename), nil
}

func Save(filename string, data interface{}) error {
serialized, err := json.Marshal(data)
if err != nil {
return err
}

fullPath, err := getPath(filename)
if err != nil {
return err
}

return ioutil.WriteFile(fullPath, serialized, 0640)
}

func Load(filename string, data interface{}) error {
fullPath, err := getPath(filename)
if err != nil {
return err
}

serialized, err := ioutil.ReadFile(fullPath)
if err != nil {
return err
}

return json.Unmarshal(serialized, data)
}

0 comments on commit f64fb16

Please sign in to comment.