diff --git a/gateway/airkaz/airkaz.go b/gateway/airkaz/airkaz.go index 4ee4bb2..4a10188 100644 --- a/gateway/airkaz/airkaz.go +++ b/gateway/airkaz/airkaz.go @@ -3,12 +3,13 @@ package airkaz import ( "encoding/json" "github.com/hg/airmon/influx" + "github.com/hg/airmon/logger" "github.com/hg/airmon/net" "github.com/hg/airmon/storage" "github.com/hg/airmon/tm" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/pkg/errors" - "log" + "go.uber.org/zap" "regexp" "time" ) @@ -45,6 +46,8 @@ type collector struct { const lastUpdatesFilename = "airkaz-times.json" +var log = logger.Get(logger.Airkaz) + func Collect(sender *influx.MeasurementSender) { col := collector{ sender: sender, @@ -58,7 +61,7 @@ func Collect(sender *influx.MeasurementSender) { for { if err := col.run(); err != nil { - log.Print("could not save airkaz data: ", err) + log.Error("could not save airkaz data", zap.Error(err)) } time.Sleep(5 * time.Minute) } @@ -70,7 +73,7 @@ func loadLastUpdates() *lastUpdates { err := storage.Load(lastUpdatesFilename, &lu) if err != nil { lu = nil - log.Print("could not load airkaz update times: ", err) + log.Error("could not load airkaz update times", zap.Error(err)) } return lu @@ -78,7 +81,7 @@ func loadLastUpdates() *lastUpdates { func (c *collector) saveLastUpdates() { if err := storage.Save(lastUpdatesFilename, c.lastUpdates); err != nil { - log.Print("could not save airkaz update times: ", err) + log.Error("could not save airkaz update times", zap.Error(err)) } } @@ -88,7 +91,7 @@ func (c *collector) run() error { return err } - log.Print("found ", len(measurements), " airkaz measurements") + log.Info("measurements loaded", zap.Int("count", len(measurements))) toSave := make([]measurement, len(measurements)) diff --git a/gateway/ceb/ceb.go b/gateway/ceb/ceb.go index c1938fd..6580c11 100644 --- a/gateway/ceb/ceb.go +++ b/gateway/ceb/ceb.go @@ -2,11 +2,12 @@ package ceb import ( "github.com/hg/airmon/influx" + "github.com/hg/airmon/logger" "github.com/hg/airmon/net" "github.com/hg/airmon/storage" "github.com/hg/airmon/tm" influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "log" + "go.uber.org/zap" "strings" "time" ) @@ -31,6 +32,8 @@ type collector struct { lastAt time.Time } +var log = logger.Get(logger.Ceb) + func Collect(sender *influx.MeasurementSender) { col := collector{ sender: sender, @@ -39,7 +42,7 @@ func Collect(sender *influx.MeasurementSender) { } for { if err := col.run(); err != nil { - log.Print("could not get ceb measurements: ", err) + log.Error("could not get ceb measurements", zap.Error(err)) } time.Sleep(5 * time.Minute) } @@ -54,7 +57,7 @@ func (c *collector) run() error { latest := time.Time{} toSave := make([]measurement, len(measurements)) - log.Print("found ", len(measurements), " ceb measurements") + log.Info("loaded measurements", zap.Int("count", len(measurements))) for _, ms := range measurements { if ms.Date.After(c.lastAt) { @@ -73,12 +76,14 @@ func (c *collector) run() error { c.saveMeasurement(&ms) } }() + + return nil } func loadLastAt() time.Time { var tm time.Time if err := storage.Load(timeFilename, &tm); err != nil { - log.Print("could not load last ceb time: ", err) + log.Error("could not load last ceb time", zap.Error(err)) tm = time.Time{} } return tm @@ -86,7 +91,7 @@ func loadLastAt() time.Time { func saveLastAt(tm time.Time) { if err := storage.Save(timeFilename, tm); err != nil { - log.Print("could not save ceb last time: ", err) + log.Error("could not save ceb last time", zap.Error(err)) } } diff --git a/gateway/go.mod b/gateway/go.mod index 879201f..ed5b6c7 100644 --- a/gateway/go.mod +++ b/gateway/go.mod @@ -6,5 +6,7 @@ require ( github.com/eclipse/paho.mqtt.golang v1.2.0 github.com/influxdata/influxdb-client-go/v2 v2.2.0 github.com/pkg/errors v0.9.1 + go.uber.org/multierr v1.6.0 // indirect + go.uber.org/zap v1.16.0 // indirect golang.org/x/net v0.0.0-20201110031124-69a78807bb2b ) diff --git a/gateway/go.sum b/gateway/go.sum index bd2ee1b..a178834 100644 --- a/gateway/go.sum +++ b/gateway/go.sum @@ -1,3 +1,4 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -10,11 +11,13 @@ github.com/getkin/kin-openapi v0.13.0/go.mod h1:WGRs2ZMM1Q8LR1QBEwUxC6RJEfaBcD0s github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/influxdata/influxdb-client-go v1.4.0 h1:+KavOkwhLClHFfYcJMHHnTL5CZQhXJzOm5IKHI9BqJk= github.com/influxdata/influxdb-client-go/v2 v2.2.0 h1:2R/le0s/MZpHtc+ijuXKe2c4KGN14M85mWtGlmg6vec= github.com/influxdata/influxdb-client-go/v2 v2.2.0/go.mod h1:fa/d1lAdUHxuc1jedx30ZfNG573oQTQmUni3N6pcW+0= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -30,16 +33,31 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM= +go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -57,10 +75,16 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/gateway/influx/influx.go b/gateway/influx/influx.go index 7423c58..f4a8a3f 100644 --- a/gateway/influx/influx.go +++ b/gateway/influx/influx.go @@ -3,14 +3,18 @@ package influx import ( "context" "errors" + "flag" + "github.com/hg/airmon/logger" influxdb2 "github.com/influxdata/influxdb-client-go/v2" influxdb2Api "github.com/influxdata/influxdb-client-go/v2/api" influxdb2Write "github.com/influxdata/influxdb-client-go/v2/api/write" - "log" + "go.uber.org/zap" "os" "time" ) +var log = logger.Get(logger.Influx) + type Settings struct { Uri string Org string @@ -18,6 +22,13 @@ type Settings struct { Token string } +func (s *Settings) AddFlags() { + flag.StringVar(&s.Uri, "influx.uri", "http://localhost:8086", "InfluxDB server URI") + flag.StringVar(&s.Org, "influx.org", "home", "InfluxDB organization name") + flag.StringVar(&s.Bucket, "influx.bucket", "airmon", "InfluxDB server URI") + flag.StringVar(&s.Token, "influx.token", "", "InfluxDB access token") +} + func (s *Settings) SetFromEnvironment() { if s.Uri == "" { s.Uri = os.Getenv("INFLUX_URI") @@ -44,7 +55,7 @@ func (s *Settings) validate() error { return errors.New("InfluxDB bucket is empty") } if s.Token == "" { - log.Print("InfluxDB token is empty, set it if you see authentication errors") + log.Warn("token is empty, set it if you see authentication errors") } return nil } @@ -57,6 +68,7 @@ func newClient(settings Settings) influxdb2Api.WriteAPIBlocking { type MeasurementSender struct { api influxdb2Api.WriteAPIBlocking ch chan *influxdb2Write.Point + ctx context.Context } func (ms *MeasurementSender) Send(point *influxdb2Write.Point) bool { @@ -66,10 +78,10 @@ func (ms *MeasurementSender) Send(point *influxdb2Write.Point) bool { return true case <-time.After(5 * time.Second): - log.Print("timed out while trying to send measurement") + log.Error("timed out while trying to send measurement") if retry >= 3 { - log.Print("could not send measurement, discarding point") + log.Error("could not send measurement, discarding point") return false } <-ms.ch // drop the oldest measurement and retry @@ -78,18 +90,16 @@ func (ms *MeasurementSender) Send(point *influxdb2Write.Point) bool { } func (ms *MeasurementSender) receive() { - ctx := context.Background() - for point := range ms.ch { - if err := ms.api.WritePoint(ctx, point); err == nil { - log.Print("point written") + if err := ms.api.WritePoint(ms.ctx, point); err == nil { + log.Debug("point written") } else { - log.Print("could not write point: ", err) + log.Error("could not write point", zap.Error(err)) } } } -func NewWriter(settings Settings) (*MeasurementSender, error) { +func NewSender(settings Settings) (*MeasurementSender, error) { if err := settings.validate(); err != nil { return nil, err } @@ -97,6 +107,7 @@ func NewWriter(settings Settings) (*MeasurementSender, error) { sender := &MeasurementSender{ ch: make(chan *influxdb2Write.Point, 2000), api: newClient(settings), + ctx: context.Background(), } go sender.receive() diff --git a/gateway/kazhydromet/kazhydromet.go b/gateway/kazhydromet/kazhydromet.go index 7b63061..f54c9a8 100644 --- a/gateway/kazhydromet/kazhydromet.go +++ b/gateway/kazhydromet/kazhydromet.go @@ -2,16 +2,21 @@ package kazhydromet import ( "github.com/hg/airmon/influx" + "github.com/hg/airmon/logger" "github.com/hg/airmon/net" "github.com/hg/airmon/storage" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/pkg/errors" - "log" + "go.uber.org/zap" "strconv" "time" ) -const timeFilename = "kazhydromet-times.json" +var log = logger.Get(logger.Kazhydromet) + +const ( + timeFilename = "kazhydromet-times.json" +) type measurement struct { Id string `json:"id"` @@ -82,7 +87,7 @@ func Collect(sender *influx.MeasurementSender) { err = c.saveData(entries) } if err != nil { - log.Print("could not save kazhydromet data: ", err) + log.Error("could not save kazhydromet data", zap.Error(err)) } time.Sleep(time.Hour) } @@ -91,7 +96,7 @@ func Collect(sender *influx.MeasurementSender) { func saveTime(ms measurementTimes) { err := storage.Save(timeFilename, ms) if err != nil { - log.Print("could not save kazhydromet measurement times: ", err) + log.Error("could not save kazhydromet measurement times", zap.Error(err)) } } @@ -103,7 +108,7 @@ func loadSavedTime() measurementTimes { return mt } - log.Print("could not parse kazhydromet measurement times: ", err) + log.Error("could not parse kazhydromet measurement times", zap.Error(err)) return nil } @@ -143,7 +148,8 @@ func (c *collector) loadData() ([]entry, error) { if err != nil { return nil, err } - log.Print("loaded ", len(measurements), " kazhydromet measurements") + + log.Info("loaded measurements", zap.Int("count", len(measurements))) var entries []entry for _, meas := range measurements { @@ -153,20 +159,20 @@ func (c *collector) loadData() ([]entry, error) { if refreshedStations { // We have already refreshed station data in this iteration. // It's an invalid station id, there's nothing we can do. - log.Print("station id ", meas.StationId, " not found") + log.Error("station not found", zap.Int64("id", meas.StationId)) continue } refreshedStations = true if err = c.loadStations(); err != nil { - log.Print("could not load station data: ", err) + log.Error("could not load station data", zap.Error(err)) continue } stat = c.stations[meas.StationId] if stat == nil { - log.Print("station id ", meas.StationId, " not found") + log.Error("station not found", zap.Int64("id", meas.StationId)) continue } } diff --git a/gateway/logger/logger.go b/gateway/logger/logger.go new file mode 100644 index 0000000..66544eb --- /dev/null +++ b/gateway/logger/logger.go @@ -0,0 +1,38 @@ +package logger + +import ( + "go.uber.org/zap" + "sync" +) + +var mu = sync.Mutex{} +var loggers = make(map[string]*zap.Logger) + +const ( + Main = "main" + Airkaz = "airkaz" + Ceb = "ceb" + Influx = "influx" + Kazhydromet = "kazhydromet" + Time = "time" + Mqtt = "mqtt" + Net = "net" + Storage = "storage" +) + +func Get(name string) *zap.Logger { + if logger, ok := loggers[name]; ok { + return logger + } + + mu.Lock() + defer mu.Unlock() + + if logger, ok := loggers[name]; ok { + return logger + } + + logger, _ := zap.NewProduction() + loggers[name] = logger.Named(name) + return logger +} diff --git a/gateway/main.go b/gateway/main.go index a304404..870bd12 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -2,27 +2,26 @@ package main import ( "flag" + "fmt" "github.com/hg/airmon/airkaz" "github.com/hg/airmon/ceb" "github.com/hg/airmon/influx" "github.com/hg/airmon/kazhydromet" + "github.com/hg/airmon/logger" "github.com/hg/airmon/mqtt" - "log" + "go.uber.org/zap" "os" "runtime/pprof" ) func main() { - var mqts mqtt.Settings - flag.StringVar(&mqts.Broker, "mqtt.broker", "tcp://localhost:1883", "the broker URI") - flag.StringVar(&mqts.User, "mqtt.user", "", "MQTT username") - flag.StringVar(&mqts.Pass, "mqtt.pass", "", "MQTT password") + log := logger.Get(logger.Main) - var infs influx.Settings - flag.StringVar(&infs.Uri, "influx.uri", "http://localhost:8086", "InfluxDB server URI") - flag.StringVar(&infs.Org, "influx.org", "home", "InfluxDB organization name") - flag.StringVar(&infs.Bucket, "influx.bucket", "airmon", "InfluxDB server URI") - flag.StringVar(&infs.Token, "influx.token", "", "InfluxDB access token") + mqts := mqtt.Settings{} + mqts.AddFlags() + + infs := influx.Settings{} + infs.AddFlags() profile := flag.String("profile", "", "write cpu profile to this file") @@ -33,7 +32,7 @@ func main() { if *profile != "" { f, err := os.Create(*profile) if err != nil { - log.Fatal("could not create cpu profile file: ", err) + log.Fatal("could not create cpu profile file", zap.Error(err)) } defer f.Close() @@ -41,9 +40,9 @@ func main() { defer pprof.StopCPUProfile() } - sender, err := influx.NewWriter(infs) + sender, err := influx.NewSender(infs) if err != nil { - log.Fatal("could not prepare InfluxDB writer: ", err) + log.Fatal("could not prepare InfluxDB writer", zap.Error(err)) } go kazhydromet.Collect(sender) @@ -51,10 +50,10 @@ func main() { go ceb.Collect(sender) if err = mqtt.StartMqtt(&mqts, sender); err != nil { - log.Fatal("could not create MQTT client: ", err) + log.Fatal("could not create MQTT client", zap.Error(err)) } - log.Print("started, press Ctrl+C to terminate") + fmt.Fprint(os.Stderr, "started, press Ctrl+C to terminate") select {} } diff --git a/gateway/mqtt/mqtt.go b/gateway/mqtt/mqtt.go index 4da7907..a99707e 100644 --- a/gateway/mqtt/mqtt.go +++ b/gateway/mqtt/mqtt.go @@ -1,21 +1,31 @@ package mqtt import ( + "flag" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/hg/airmon/influx" + "github.com/hg/airmon/logger" "github.com/hg/airmon/mon" "github.com/pkg/errors" - "log" + "go.uber.org/zap" "os" "strings" ) +var log = logger.Get(logger.Mqtt) + type Settings struct { Broker string User string Pass string } +func (s *Settings) AddFlags() { + flag.StringVar(&s.Broker, "mqtt.broker", "tcp://localhost:1883", "the broker URI") + flag.StringVar(&s.User, "mqtt.user", "", "MQTT username") + flag.StringVar(&s.Pass, "mqtt.pass", "", "MQTT password") +} + func (s *Settings) SetFromEnvironment() { if s.Broker == "" { s.Broker = os.Getenv("MQTT_BROKER") @@ -33,10 +43,11 @@ func (s *Settings) validate() error { return errors.New("MQTT broker is empty") } if s.User != "" { - log.Print("using MQTT username ", s.User) + log.Info("using username", zap.String("username", s.User)) } if s.Pass != "" { - log.Print("using MQTT password ", strings.Repeat("*", len(s.Pass))) + log.Info("using MQTT password", + zap.String("password", strings.Repeat("*", len(s.Pass)))) } return nil } @@ -56,7 +67,7 @@ func newMqttClient(settings *Settings, onConn mqtt.OnConnectHandler) mqtt.Client opts.SetOnConnectHandler(onConn) opts.SetDefaultPublishHandler(func(client mqtt.Client, msg mqtt.Message) { - log.Print("mqtt msg received:", string(msg.Payload())) + log.Info("message received", zap.String("message", string(msg.Payload()))) }) mqttClient := mqtt.NewClient(opts) @@ -83,18 +94,22 @@ func subscribe(t *topic, client mqtt.Client, sender *influx.MeasurementSender) { if ms, err := t.mapper(msg.Payload()); err == nil { sender.Send(ms.ToPoint()) } else { - log.Print("could not parse ", t.topic, " data: ", err) + log.Error("could not parse data", + zap.String("topic", t.topic), + zap.Error(err)) } }) if token.Wait() && token.Error() != nil { - log.Print(errors.Wrap(token.Error(), "could not subscribe to mqtt topic "+t.topic)) + log.Error("could not subscribe to mqtt", + zap.String("topic", t.topic), + zap.Error(token.Error())) } } func (h *connHandler) onConnect(client mqtt.Client) { for _, t := range topics { - log.Print("subscribing to topic ", t.topic) + log.Info("subscribing to topic", zap.String("topic", t.topic)) go subscribe(t, client, h.sender) } } diff --git a/gateway/net/net.go b/gateway/net/net.go index b8c2f3c..5b06f7b 100644 --- a/gateway/net/net.go +++ b/gateway/net/net.go @@ -3,7 +3,9 @@ package net import ( "context" "encoding/json" + "github.com/hg/airmon/logger" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/net/proxy" "io/ioutil" "math/rand" @@ -22,6 +24,8 @@ var userAgents = []string{ "Mozilla/5.0 (Android 11; Mobile; rv:68.0) Gecko/68.0 Firefox/85.0", } +var log = logger.Get(logger.Net) + type Client struct { client *http.Client } @@ -76,6 +80,7 @@ func (c *Client) Get(uri string) ([]byte, error) { resp, err := c.client.Do(req) if err != nil { + log.Error("get failed", zap.Error(err)) return nil, errors.Wrap(err, "airkaz get failed") } defer resp.Body.Close() @@ -88,5 +93,10 @@ func (c *Client) GetJSON(url string, buf interface{}) error { if err == nil { err = json.Unmarshal(data, buf) } + if err != nil { + log.Error("getJson failed", + zap.String("url", url), + zap.Error(err)) + } return err } diff --git a/gateway/settings/settings.go b/gateway/settings/settings.go deleted file mode 100644 index 0e66fdf..0000000 --- a/gateway/settings/settings.go +++ /dev/null @@ -1 +0,0 @@ -package settings diff --git a/gateway/storage/storage.go b/gateway/storage/storage.go index 204de26..1ca2bd1 100644 --- a/gateway/storage/storage.go +++ b/gateway/storage/storage.go @@ -2,11 +2,15 @@ package storage import ( "encoding/json" + "github.com/hg/airmon/logger" + "go.uber.org/zap" "io/ioutil" "os" "path" ) +var log = logger.Get(logger.Storage) + func getPath(filename string) (string, error) { dir := os.Getenv("XDG_CONFIG_HOME") if dir == "" { @@ -15,7 +19,12 @@ func getPath(filename string) (string, error) { dir = path.Join(dir, "airmon") err := os.MkdirAll(dir, 0750) + if err != nil { + log.Error("could not create data directory", + zap.String("path", dir), + zap.Error(err)) + return "", err } @@ -23,17 +32,28 @@ func getPath(filename string) (string, error) { } func Save(filename string, data interface{}) error { - serialized, err := json.Marshal(data) + fullPath, err := getPath(filename) if err != nil { return err } - fullPath, err := getPath(filename) + serialized, err := json.Marshal(data) if err != nil { + log.Error("could not serialize data", + zap.Any("data", data), + zap.Error(err)) + return err } - return ioutil.WriteFile(fullPath, serialized, 0640) + err = ioutil.WriteFile(fullPath, serialized, 0640) + if err != nil { + log.Error("could not write file", + zap.String("path", fullPath), + zap.ByteString("data", serialized)) + } + + return err } func Load(filename string, data interface{}) error { @@ -44,8 +64,16 @@ func Load(filename string, data interface{}) error { serialized, err := ioutil.ReadFile(fullPath) if err != nil { + log.Error("could not read file", zap.String("path", fullPath)) return err } - return json.Unmarshal(serialized, data) + err = json.Unmarshal(serialized, data) + if err != nil { + log.Error("could not deserialize data", + zap.String("path", fullPath), + zap.ByteString("data", serialized)) + } + + return err } diff --git a/gateway/tm/tm.go b/gateway/tm/tm.go index 33d01f5..da23436 100644 --- a/gateway/tm/tm.go +++ b/gateway/tm/tm.go @@ -1,12 +1,18 @@ package tm import ( - "log" + "github.com/hg/airmon/logger" + "go.uber.org/zap" "strings" "time" ) -const timeFormat = "2006-01-02 15:04:05" +const ( + timeFormat = "2006-01-02 15:04:05" + tzName = "Asia/Almaty" +) + +var log = logger.Get(logger.Time) type Time struct { time.Time @@ -16,8 +22,8 @@ var location *time.Location func init() { var err error - if location, err = time.LoadLocation("Asia/Almaty"); err != nil { - log.Fatal("could not find timezone") + if location, err = time.LoadLocation(tzName); err != nil { + log.Fatal("could not find timezone", zap.String("timezone", tzName)) } }