From 8ad1bc9ba93f0e7227bd639d2b91bf942a25779c Mon Sep 17 00:00:00 2001 From: Daniel Liu Date: Tue, 10 Dec 2024 16:08:02 +0800 Subject: [PATCH] metrics/influxdb: reuse code between v1 and v2 reporters (#26963) --- go.mod | 2 +- go.sum | 4 +- metrics/influxdb/influxdb.go | 332 ++++++++++----------------------- metrics/influxdb/influxdbv1.go | 145 ++++++++++++++ metrics/influxdb/influxdbv2.go | 143 +------------- 5 files changed, 254 insertions(+), 372 deletions(-) create mode 100644 metrics/influxdb/influxdbv1.go diff --git a/go.mod b/go.mod index 3160395d2a06f..28a24f8b6b381 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/holiman/uint256 v1.2.4 github.com/huin/goupnp v1.3.0 - github.com/influxdata/influxdb v1.7.9 github.com/jackpal/go-nat-pmp v1.0.2 github.com/julienschmidt/httprouter v1.3.0 github.com/karalabe/hid v1.0.0 @@ -51,6 +50,7 @@ require ( github.com/dop251/goja v0.0.0-20200721192441-a695b0cdd498 github.com/ethereum/c-kzg-4844 v0.4.0 github.com/influxdata/influxdb-client-go/v2 v2.4.0 + github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c github.com/kylelemons/godebug v1.1.0 github.com/mattn/go-isatty v0.0.17 github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible diff --git a/go.sum b/go.sum index 84d68e6c41cac..68019c92aba9e 100644 --- a/go.sum +++ b/go.sum @@ -99,10 +99,10 @@ github.com/holiman/uint256 v1.2.4/go.mod h1:EOMSn4q6Nyt9P6efbI3bueV4e1b3dGlUCXei github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= -github.com/influxdata/influxdb v1.7.9 h1:uSeBTNO4rBkbp1Be5FKRsAmglM9nlx25TzVQRQt1An4= -github.com/influxdata/influxdb v1.7.9/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k= github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c h1:qSHzRbhzK8RdXOsAdfDgO49TtqC1oZ+acxPrkfTxcCs= +github.com/influxdata/influxdb1-client v0.0.0-20220302092344-a9ab5670611c/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= diff --git a/metrics/influxdb/influxdb.go b/metrics/influxdb/influxdb.go index 9f331183f4139..1e7fb9e11dd48 100644 --- a/metrics/influxdb/influxdb.go +++ b/metrics/influxdb/influxdb.go @@ -2,242 +2,112 @@ package influxdb import ( "fmt" - uurl "net/url" - "time" - "github.com/XinFinOrg/XDPoSChain/log" "github.com/XinFinOrg/XDPoSChain/metrics" - "github.com/influxdata/influxdb/client" ) -type reporter struct { - reg metrics.Registry - interval time.Duration - - url uurl.URL - database string - username string - password string - namespace string - tags map[string]string - - client *client.Client - - cache map[string]int64 -} - -// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval. -func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) { - InfluxDBWithTags(r, d, url, database, username, password, namespace, nil) -} - -// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags -func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) { - u, err := uurl.Parse(url) - if err != nil { - log.Warn("unable to parse InfluxDB url %s. err=%v", url, err) - return - } - - rep := &reporter{ - reg: r, - interval: d, - url: *u, - database: database, - username: username, - password: password, - namespace: namespace, - tags: tags, - cache: make(map[string]int64), - } - if err := rep.makeClient(); err != nil { - log.Warn("unable to make InfluxDB client. err=%v", err) - return - } - - rep.run() -} - -func (r *reporter) makeClient() (err error) { - r.client, err = client.NewClient(client.Config{ - URL: r.url, - Username: r.username, - Password: r.password, - Timeout: 10 * time.Second, - }) - - return -} - -func (r *reporter) run() { - intervalTicker := time.NewTicker(r.interval) - pingTicker := time.NewTicker(time.Second * 5) - - defer intervalTicker.Stop() - defer pingTicker.Stop() - - for { - select { - case <-intervalTicker.C: - if err := r.send(); err != nil { - log.Warn("unable to send to InfluxDB. err=%v", err) - } - case <-pingTicker.C: - _, _, err := r.client.Ping() - if err != nil { - log.Warn("got error while sending a ping to InfluxDB, trying to recreate client. err=%v", err) - - if err = r.makeClient(); err != nil { - log.Warn("unable to make InfluxDB client. err=%v", err) - } - } +func readMeter(namespace, name string, i interface{}) (string, map[string]interface{}) { + switch metric := i.(type) { + case metrics.Counter: + measurement := fmt.Sprintf("%s%s.count", namespace, name) + fields := map[string]interface{}{ + "value": metric.Count(), } - } -} - -func (r *reporter) send() error { - var pts []client.Point - - r.reg.Each(func(name string, i interface{}) { - now := time.Now() - namespace := r.namespace - - switch metric := i.(type) { - case metrics.Counter: - count := metric.Count() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.count", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "value": count, - }, - Time: now, - }) - case metrics.CounterFloat64: - count := metric.Count() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.count", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "value": count, - }, - Time: now, - }) - case metrics.Gauge: - ms := metric.Snapshot() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "value": ms.Value(), - }, - Time: now, - }) - case metrics.GaugeFloat64: - ms := metric.Snapshot() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.gauge", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "value": ms.Value(), - }, - Time: now, - }) - case metrics.Histogram: - ms := metric.Snapshot() - if ms.Count() > 0 { - ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) - fields := map[string]interface{}{ - "count": ms.Count(), - "max": ms.Max(), - "mean": ms.Mean(), - "min": ms.Min(), - "stddev": ms.StdDev(), - "variance": ms.Variance(), - "p25": ps[0], - "p50": ps[1], - "p75": ps[2], - "p95": ps[3], - "p99": ps[4], - "p999": ps[5], - "p9999": ps[6], - } - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.histogram", namespace, name), - Tags: r.tags, - Fields: fields, - Time: now, - }) - } - case metrics.Meter: - ms := metric.Snapshot() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.meter", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "count": ms.Count(), - "m1": ms.Rate1(), - "m5": ms.Rate5(), - "m15": ms.Rate15(), - "mean": ms.RateMean(), - }, - Time: now, - }) - case metrics.Timer: - ms := metric.Snapshot() - ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.timer", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "count": ms.Count(), - "max": ms.Max(), - "mean": ms.Mean(), - "min": ms.Min(), - "stddev": ms.StdDev(), - "variance": ms.Variance(), - "p50": ps[0], - "p75": ps[1], - "p95": ps[2], - "p99": ps[3], - "p999": ps[4], - "p9999": ps[5], - "m1": ms.Rate1(), - "m5": ms.Rate5(), - "m15": ms.Rate15(), - "meanrate": ms.RateMean(), - }, - Time: now, - }) - case metrics.ResettingTimer: - t := metric.Snapshot() - - if len(t.Values()) > 0 { - ps := t.Percentiles([]float64{50, 95, 99}) - val := t.Values() - pts = append(pts, client.Point{ - Measurement: fmt.Sprintf("%s%s.span", namespace, name), - Tags: r.tags, - Fields: map[string]interface{}{ - "count": len(val), - "max": val[len(val)-1], - "mean": t.Mean(), - "min": val[0], - "p50": ps[0], - "p95": ps[1], - "p99": ps[2], - }, - Time: now, - }) - } + return measurement, fields + case metrics.CounterFloat64: + measurement := fmt.Sprintf("%s%s.count", namespace, name) + fields := map[string]interface{}{ + "value": metric.Count(), } - }) - - bps := client.BatchPoints{ - Points: pts, - Database: r.database, + return measurement, fields + case metrics.Gauge: + measurement := fmt.Sprintf("%s%s.gauge", namespace, name) + fields := map[string]interface{}{ + "value": metric.Snapshot().Value(), + } + return measurement, fields + case metrics.GaugeFloat64: + measurement := fmt.Sprintf("%s%s.gauge", namespace, name) + fields := map[string]interface{}{ + "value": metric.Snapshot().Value(), + } + return measurement, fields + case metrics.Histogram: + ms := metric.Snapshot() + if ms.Count() <= 0 { + break + } + ps := ms.Percentiles([]float64{0.25, 0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + measurement := fmt.Sprintf("%s%s.histogram", namespace, name) + fields := map[string]interface{}{ + "count": ms.Count(), + "max": ms.Max(), + "mean": ms.Mean(), + "min": ms.Min(), + "stddev": ms.StdDev(), + "variance": ms.Variance(), + "p25": ps[0], + "p50": ps[1], + "p75": ps[2], + "p95": ps[3], + "p99": ps[4], + "p999": ps[5], + "p9999": ps[6], + } + return measurement, fields + case metrics.Meter: + ms := metric.Snapshot() + measurement := fmt.Sprintf("%s%s.meter", namespace, name) + fields := map[string]interface{}{ + "count": ms.Count(), + "m1": ms.Rate1(), + "m5": ms.Rate5(), + "m15": ms.Rate15(), + "mean": ms.RateMean(), + } + return measurement, fields + case metrics.Timer: + ms := metric.Snapshot() + ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) + + measurement := fmt.Sprintf("%s%s.timer", namespace, name) + fields := map[string]interface{}{ + "count": ms.Count(), + "max": ms.Max(), + "mean": ms.Mean(), + "min": ms.Min(), + "stddev": ms.StdDev(), + "variance": ms.Variance(), + "p50": ps[0], + "p75": ps[1], + "p95": ps[2], + "p99": ps[3], + "p999": ps[4], + "p9999": ps[5], + "m1": ms.Rate1(), + "m5": ms.Rate5(), + "m15": ms.Rate15(), + "meanrate": ms.RateMean(), + } + return measurement, fields + case metrics.ResettingTimer: + t := metric.Snapshot() + if len(t.Values()) == 0 { + break + } + ps := t.Percentiles([]float64{50, 95, 99}) + val := t.Values() + measurement := fmt.Sprintf("%s%s.span", namespace, name) + fields := map[string]interface{}{ + "count": len(val), + "max": val[len(val)-1], + "mean": t.Mean(), + "min": val[0], + "p50": ps[0], + "p95": ps[1], + "p99": ps[2], + } + return measurement, fields } - - _, err := r.client.Write(bps) - return err + return "", nil } diff --git a/metrics/influxdb/influxdbv1.go b/metrics/influxdb/influxdbv1.go new file mode 100644 index 0000000000000..40ae9a3c10b7a --- /dev/null +++ b/metrics/influxdb/influxdbv1.go @@ -0,0 +1,145 @@ +package influxdb + +import ( + "fmt" + uurl "net/url" + "time" + + "github.com/XinFinOrg/XDPoSChain/log" + "github.com/XinFinOrg/XDPoSChain/metrics" + client "github.com/influxdata/influxdb1-client/v2" +) + +type reporter struct { + reg metrics.Registry + interval time.Duration + + url uurl.URL + database string + username string + password string + namespace string + tags map[string]string + + client client.Client + + cache map[string]int64 +} + +// InfluxDB starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval. +func InfluxDB(r metrics.Registry, d time.Duration, url, database, username, password, namespace string) { + InfluxDBWithTags(r, d, url, database, username, password, namespace, nil) +} + +// InfluxDBWithTags starts a InfluxDB reporter which will post the from the given metrics.Registry at each d interval with the specified tags +func InfluxDBWithTags(r metrics.Registry, d time.Duration, url, database, username, password, namespace string, tags map[string]string) { + u, err := uurl.Parse(url) + if err != nil { + log.Warn("Unable to parse InfluxDB", "url", url, "err", err) + return + } + + rep := &reporter{ + reg: r, + interval: d, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + log.Warn("Unable to make InfluxDB client", "err", err) + return + } + + rep.run() +} + +// InfluxDBWithTagsOnce runs once an InfluxDB reporter and post the given metrics.Registry with the specified tags +func InfluxDBWithTagsOnce(r metrics.Registry, url, database, username, password, namespace string, tags map[string]string) error { + u, err := uurl.Parse(url) + if err != nil { + return fmt.Errorf("unable to parse InfluxDB. url: %s, err: %v", url, err) + } + + rep := &reporter{ + reg: r, + url: *u, + database: database, + username: username, + password: password, + namespace: namespace, + tags: tags, + cache: make(map[string]int64), + } + if err := rep.makeClient(); err != nil { + return fmt.Errorf("unable to make InfluxDB client. err: %v", err) + } + + if err := rep.send(); err != nil { + return fmt.Errorf("unable to send to InfluxDB. err: %v", err) + } + + return nil +} + +func (r *reporter) makeClient() (err error) { + r.client, err = client.NewHTTPClient(client.HTTPConfig{ + Addr: r.url.String(), + Username: r.username, + Password: r.password, + Timeout: 10 * time.Second, + }) + + return +} + +func (r *reporter) run() { + intervalTicker := time.NewTicker(r.interval) + pingTicker := time.NewTicker(time.Second * 5) + + defer intervalTicker.Stop() + defer pingTicker.Stop() + + for { + select { + case <-intervalTicker.C: + if err := r.send(); err != nil { + log.Warn("Unable to send to InfluxDB", "err", err) + } + case <-pingTicker.C: + _, _, err := r.client.Ping(0) + if err != nil { + log.Warn("Got error while sending a ping to InfluxDB, trying to recreate client", "err", err) + + if err = r.makeClient(); err != nil { + log.Warn("Unable to make InfluxDB client", "err", err) + } + } + } + } +} + +func (r *reporter) send() error { + bps, err := client.NewBatchPoints( + client.BatchPointsConfig{ + Database: r.database, + }) + if err != nil { + return err + } + r.reg.Each(func(name string, i interface{}) { + now := time.Now() + measurement, fields := readMeter(r.namespace, name, i) + if fields == nil { + return + } + if p, err := client.NewPoint(measurement, r.tags, fields, now); err == nil { + bps.AddPoint(p) + } + }) + return r.client.Write(bps) +} diff --git a/metrics/influxdb/influxdbv2.go b/metrics/influxdb/influxdbv2.go index 8a4dbab3c3ca3..15ea660d865c7 100644 --- a/metrics/influxdb/influxdbv2.go +++ b/metrics/influxdb/influxdbv2.go @@ -2,7 +2,6 @@ package influxdb import ( "context" - "fmt" "time" "github.com/XinFinOrg/XDPoSChain/log" @@ -78,145 +77,13 @@ func (r *v2Reporter) run() { func (r *v2Reporter) send() { r.reg.Each(func(name string, i interface{}) { now := time.Now() - namespace := r.namespace - - switch metric := i.(type) { - - case metrics.Counter: - v := metric.Count() - - measurement := fmt.Sprintf("%s%s.count", namespace, name) - fields := map[string]interface{}{ - "value": v, - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.CounterFloat64: - v := metric.Count() - - measurement := fmt.Sprintf("%s%s.count", namespace, name) - fields := map[string]interface{}{ - "value": v, - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.Gauge: - ms := metric.Snapshot() - - measurement := fmt.Sprintf("%s%s.gauge", namespace, name) - fields := map[string]interface{}{ - "value": ms.Value(), - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.GaugeFloat64: - ms := metric.Snapshot() - - measurement := fmt.Sprintf("%s%s.gauge", namespace, name) - fields := map[string]interface{}{ - "value": ms.Value(), - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.Histogram: - ms := metric.Snapshot() - - if ms.Count() > 0 { - ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) - measurement := fmt.Sprintf("%s%s.histogram", namespace, name) - fields := map[string]interface{}{ - "count": ms.Count(), - "max": ms.Max(), - "mean": ms.Mean(), - "min": ms.Min(), - "stddev": ms.StdDev(), - "variance": ms.Variance(), - "p50": ps[0], - "p75": ps[1], - "p95": ps[2], - "p99": ps[3], - "p999": ps[4], - "p9999": ps[5], - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - } - - case metrics.Meter: - ms := metric.Snapshot() - - measurement := fmt.Sprintf("%s%s.meter", namespace, name) - fields := map[string]interface{}{ - "count": ms.Count(), - "m1": ms.Rate1(), - "m5": ms.Rate5(), - "m15": ms.Rate15(), - "mean": ms.RateMean(), - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.Timer: - ms := metric.Snapshot() - ps := ms.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999, 0.9999}) - - measurement := fmt.Sprintf("%s%s.timer", namespace, name) - fields := map[string]interface{}{ - "count": ms.Count(), - "max": ms.Max(), - "mean": ms.Mean(), - "min": ms.Min(), - "stddev": ms.StdDev(), - "variance": ms.Variance(), - "p50": ps[0], - "p75": ps[1], - "p95": ps[2], - "p99": ps[3], - "p999": ps[4], - "p9999": ps[5], - "m1": ms.Rate1(), - "m5": ms.Rate5(), - "m15": ms.Rate15(), - "meanrate": ms.RateMean(), - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - - case metrics.ResettingTimer: - t := metric.Snapshot() - - if len(t.Values()) > 0 { - ps := t.Percentiles([]float64{50, 95, 99}) - val := t.Values() - - measurement := fmt.Sprintf("%s%s.span", namespace, name) - fields := map[string]interface{}{ - "count": len(val), - "max": val[len(val)-1], - "mean": t.Mean(), - "min": val[0], - "p50": ps[0], - "p95": ps[1], - "p99": ps[2], - } - - pt := influxdb2.NewPoint(measurement, r.tags, fields, now) - r.write.WritePoint(pt) - } + measurement, fields := readMeter(r.namespace, name, i) + if fields == nil { + return } + pt := influxdb2.NewPoint(measurement, r.tags, fields, now) + r.write.WritePoint(pt) }) - // Force all unwritten data to be sent r.write.Flush() }