Skip to content

Commit

Permalink
Fix external data collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Feb 18, 2021
1 parent 30a8072 commit 10d2f33
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 65 deletions.
97 changes: 65 additions & 32 deletions gateway/ceb/ceb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ceb
import (
"github.com/hg/airmon/influx"
"github.com/hg/airmon/net"
"github.com/hg/airmon/storage"
"github.com/hg/airmon/tm"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"log"
Expand All @@ -22,42 +23,74 @@ type measurement struct {
Date tm.Time `json:"cdate"`
}

func Collect(sender *influx.MeasurementSender) {
client := net.NewProxiedClient()
lastAt := time.Time{}
const timeFilename = "ceb-times.json"

type collector struct {
sender *influx.MeasurementSender
client *net.Client
lastAt time.Time
}

func Collect(sender *influx.MeasurementSender) {
col := collector{
sender: sender,
client: net.NewProxiedClient(),
lastAt: loadLastAt(),
}
for {
if measurements, err := getResponse(client); err == nil {
latest := time.Time{}
toSave := make([]*measurement, len(measurements))

log.Print("found ", len(measurements), " ceb measurements")

for _, meas := range measurements {
if meas.Date.After(lastAt) {
latest = meas.Date.Time
toSave = append(toSave, meas)
}
}

if !latest.IsZero() {
lastAt = latest
}

go func() {
for _, ms := range toSave {
saveMeasurement(ms, sender)
}
}()
} else {
log.Print("could not load data: ", err)
if err := col.run(); err != nil {
log.Print("could not get ceb measurements: ", err)
}

time.Sleep(5 * time.Minute)
}
}

func saveMeasurement(ms *measurement, sender *influx.MeasurementSender) {
func (c *collector) run() error {
measurements, err := c.getResponse()
if err != nil {
return err
}

latest := time.Time{}
toSave := make([]measurement, len(measurements))

log.Print("found ", len(measurements), " ceb measurements")

for _, ms := range measurements {
if ms.Date.After(c.lastAt) {
latest = ms.Date.Time
toSave = append(toSave, ms)
}
}

if !latest.IsZero() {
go saveLastAt(latest)
c.lastAt = latest
}

go func() {
for _, ms := range toSave {
c.saveMeasurement(&ms)
}
}()
}

func loadLastAt() time.Time {
var tm time.Time
if err := storage.Load(timeFilename, &tm); err != nil {
log.Print("could not load last ceb time: ", err)
tm = time.Time{}
}
return tm
}

func saveLastAt(tm time.Time) {
if err := storage.Save(timeFilename, tm); err != nil {
log.Print("could not save ceb last time: ", err)
}
}

func (c *collector) saveMeasurement(ms *measurement) {
endOfFormula := strings.Index(ms.PollutantFull, "-")
if endOfFormula <= 0 {
return
Expand All @@ -76,10 +109,10 @@ func saveMeasurement(ms *measurement, sender *influx.MeasurementSender) {
"lon": ms.Lon,
}

sender.Send(influxdb2.NewPoint("ceb", tags, fields, ms.Date.Time))
c.sender.Send(influxdb2.NewPoint("ceb", tags, fields, ms.Date.Time))
}

func getResponse(client *net.Client) (measurements []*measurement, err error) {
err = client.GetJSON("https://ceb-uk.kz/map/ajax.php?markers", &measurements)
func (c *collector) getResponse() (measurements []measurement, err error) {
err = c.client.GetJSON("https://ceb-uk.kz/map/ajax.php?markers", &measurements)
return
}
47 changes: 15 additions & 32 deletions gateway/kazhydromet/kazhydromet.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,13 @@ type entry struct {
st *station
}

type measTime struct {
LastAt map[string]time.Time `json:"lastAt"`
LastRunAt time.Time `json:"lastRunAt"`
}
type measurementTimes map[string]time.Time

type collector struct {
client *net.Client
sender *influx.MeasurementSender
stations map[int64]*station
time *measTime
times measurementTimes
}

func lastMapKey(ms *measurement) string {
Expand All @@ -71,37 +68,35 @@ func Collect(sender *influx.MeasurementSender) {
client: net.NewProxiedClient(),
sender: sender,
stations: make(map[int64]*station),
time: &measTime{
LastAt: make(map[string]time.Time),
},
times: make(measurementTimes),
}

if savedTime := loadSavedTime(); savedTime != nil {
c.time = savedTime
c.times = savedTime
}

for {
entries, err := c.loadData()
if err == nil {
go saveTime(c.time)
go saveTime(c.times)
err = c.saveData(entries)
}
if err != nil {
log.Print("could not save kazhydromet data: ", err)
}
time.Sleep(20 * time.Minute)
time.Sleep(time.Hour)
}
}

func saveTime(ms *measTime) {
func saveTime(ms measurementTimes) {
err := storage.Save(timeFilename, ms)
if err != nil {
log.Print("could not save kazhydromet measurement times: ", err)
}
}

func loadSavedTime() *measTime {
var mt *measTime
func loadSavedTime() measurementTimes {
var mt measurementTimes

err := storage.Load(timeFilename, &mt)
if err == nil {
Expand Down Expand Up @@ -148,6 +143,7 @@ func (c *collector) loadData() ([]entry, error) {
if err != nil {
return nil, err
}
log.Print("loaded ", len(measurements), " kazhydromet measurements")

var entries []entry
for _, meas := range measurements {
Expand Down Expand Up @@ -176,34 +172,21 @@ func (c *collector) loadData() ([]entry, error) {
}

key := lastMapKey(meas)

if lastAt, ok := c.time.LastAt[key]; ok && !meas.Date.After(lastAt) {
if lastAt, ok := c.times[key]; ok && !meas.Date.After(lastAt) {
continue
}
c.time.LastAt[key] = meas.Date
c.times[key] = meas.Date

entries = append(entries, entry{ms: meas, st: stat})
}

return entries, nil
}

func (c *collector) loadMeasurements() ([]*measurement, error) {
startedAt := time.Now()

func (c *collector) loadMeasurements() (measurements []*measurement, err error) {
url := "http://atmosphera.kz:4004/averages"
if !c.time.LastRunAt.IsZero() {
url += "?after=" + c.time.LastRunAt.UTC().Format(time.RFC3339)
}

var measurements []*measurement

err := c.client.GetJSON(url, &measurements)
if err == nil {
c.time.LastRunAt = startedAt
}

return measurements, err
err = c.client.GetJSON(url, &measurements)
return
}

func (c *collector) loadStations() error {
Expand Down
2 changes: 1 addition & 1 deletion gateway/net/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Client) Get(uri string) ([]byte, error) {

func (c *Client) GetJSON(url string, buf interface{}) error {
data, err := c.Get(url)
if err != nil {
if err == nil {
err = json.Unmarshal(data, buf)
}
return err
Expand Down

0 comments on commit 10d2f33

Please sign in to comment.