diff --git a/gateway/ceb/ceb.go b/gateway/ceb/ceb.go index cc09206..4df42c5 100644 --- a/gateway/ceb/ceb.go +++ b/gateway/ceb/ceb.go @@ -2,6 +2,7 @@ package ceb import ( "strings" + "sync" "time" "github.com/hg/airmon/influx" @@ -25,9 +26,53 @@ type measurement struct { Date tm.Time `json:"cdate"` } +const dataUrl = "https://ceb-uk.kz/map/ajax.php?markers" const timeFilename = "ceb-times.json" +const staleMaxUpdates = 3 + +type repeat struct { + level float64 + count int +} + +type staleDetector struct { + sync.Mutex + values map[string]*repeat +} + +func newDetector() *staleDetector { + return &staleDetector{ + values: map[string]*repeat{}, + } +} + +func (s *staleDetector) isStale(station string, pollutant string, level float64) bool { + s.Lock() + defer s.Unlock() + + key := station + ":" + pollutant + + rep, ok := s.values[key] + if !ok { + rep = &repeat{} + s.values[key] = rep + } + + if rep.level == level { + rep.count++ + if rep.count >= staleMaxUpdates { + return true + } + } else { + rep.level = level + rep.count = 0 + } + + return false +} type collector struct { + stale *staleDetector sender *influx.MeasurementSender client *net.Client lastAt time.Time @@ -38,6 +83,7 @@ var log = logger.Get(logger.Ceb) func Collect(sender *influx.MeasurementSender) { col := collector{ sender: sender, + stale: newDetector(), client: net.NewProxiedClient(), lastAt: loadLastAt(), } @@ -45,7 +91,7 @@ func Collect(sender *influx.MeasurementSender) { if err := col.run(); err != nil { log.Error("could not get ceb measurements", zap.Error(err)) } - time.Sleep(15 * time.Minute) + time.Sleep(10 * time.Minute) } } @@ -82,12 +128,12 @@ func (c *collector) run() error { } func loadLastAt() time.Time { - var tm time.Time - if err := storage.Load(timeFilename, &tm); err != nil { + var t time.Time + if err := storage.Load(timeFilename, &t); err != nil { log.Error("could not load last ceb time", zap.Error(err)) - tm = time.Time{} + t = time.Time{} } - return tm + return t } func saveLastAt(tm time.Time) { @@ -102,15 +148,27 @@ func (c *collector) saveMeasurement(ms *measurement) { return } + station := ms.Title + pollutant := ms.Pollutant + level := ms.ValueMg * 1000 + + if c.stale.isStale(station, pollutant, level) { + log.Error("value marked as stale and will not be saved", + zap.String("station", ms.Title), + zap.String("pollutant", ms.Pollutant), + zap.Float64("level", ms.ValueMg)) + return + } + tags := map[string]string{ - "station": ms.Title, + "station": station, "address": ms.Address, "formula": ms.PollutantFull[:endOfFormula], - "pollutant": ms.Pollutant, + "pollutant": pollutant, } fields := map[string]interface{}{ - "level_ug": ms.ValueMg * 1000, + "level_ug": level, "lat": ms.Lat, "lon": ms.Lon, } @@ -119,6 +177,6 @@ func (c *collector) saveMeasurement(ms *measurement) { } func (c *collector) getResponse() (measurements []measurement, err error) { - err = c.client.GetJSON("https://ceb-uk.kz/map/ajax.php?markers", &measurements) + err = c.client.GetJSON(dataUrl, &measurements) return } diff --git a/gateway/ceb/ceb_test.go b/gateway/ceb/ceb_test.go new file mode 100644 index 0000000..3004530 --- /dev/null +++ b/gateway/ceb/ceb_test.go @@ -0,0 +1,35 @@ +package ceb + +import "testing" + +const station = "fake-station" +const co2 = "co2" +const so2 = "so2" + +func TestStaleDetector(t *testing.T) { + d := newDetector() + + for i := 0; i < staleMaxUpdates; i++ { + if d.isStale(station, co2, 100) { + t.Fatal("value should not be stale") + } + } + + if d.isStale(station, so2, 50) { + t.Fatal("value uses other pollutant and should not be stale") + } + + for i := 0; i < staleMaxUpdates*2; i++ { + if d.isStale(station, so2, float64(i)) { + t.Fatal("value is changing and should not be stale") + } + } + + if !d.isStale(station, co2, 100) { + t.Fatal("value should be stale") + } + + if d.isStale(station, co2, 42) { + t.Fatal("value has changed and should not be stale") + } +} diff --git a/gateway/go.mod b/gateway/go.mod index ed5b6c7..1c8f571 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -7,6 +7,6 @@ require ( github.com/influxdata/influxdb-client-go/v2 v2.2.0 github.com/pkg/errors v0.9.1 go.uber.org/multierr v1.6.0 // indirect - go.uber.org/zap v1.16.0 // indirect + go.uber.org/zap v1.16.0 golang.org/x/net v0.0.0-20201110031124-69a78807bb2b ) diff --git a/gateway/go.sum b/gateway/go.sum index a178834..3a2331d 100644 --- a/gateway/go.sum +++ b/gateway/go.sum @@ -1,6 +1,8 @@ +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/deepmap/oapi-codegen v1.3.13 h1:9HKGCsdJqE4dnrQ8VerFS0/1ZOJPmAhN+g8xgp8y3K4= github.com/deepmap/oapi-codegen v1.3.13/go.mod h1:WAmG5dWY8/PYHt4vKxlt90NsbHMAOCiteYKZMiIRfOo= @@ -12,14 +14,15 @@ github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeME github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= -github.com/influxdata/influxdb-client-go v1.4.0 h1:+KavOkwhLClHFfYcJMHHnTL5CZQhXJzOm5IKHI9BqJk= github.com/influxdata/influxdb-client-go/v2 v2.2.0 h1:2R/le0s/MZpHtc+ijuXKe2c4KGN14M85mWtGlmg6vec= github.com/influxdata/influxdb-client-go/v2 v2.2.0/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/labstack/echo/v4 v4.1.11/go.mod h1:i541M3Fj6f76NZtHSj7TXnyM8n2gaodfvfxNnFqi74g= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= @@ -32,11 +35,13 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= @@ -55,6 +60,7 @@ golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -79,12 +85,15 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM= honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=