diff --git a/tools/go.mod b/tools/go.mod index caafef12b87..aef95dfa660 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -26,6 +26,7 @@ require ( github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.51.1 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.4 @@ -132,7 +133,6 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.51.1 // indirect github.com/prometheus/procfs v0.13.0 // indirect github.com/rs/cors v1.7.0 // indirect github.com/samber/lo v1.37.0 // indirect diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index 12455d78658..dc5a2a6a047 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -53,6 +53,7 @@ type Config struct { ReportRatio float64 `toml:"report-ratio" json:"report-ratio"` Sample bool `toml:"sample" json:"sample"` Round int `toml:"round" json:"round"` + MetricsAddr string `toml:"metrics-addr" json:"metrics-addr"` } // NewConfig return a set of settings. @@ -69,6 +70,7 @@ func NewConfig() *Config { fs.StringVar(&cfg.Security.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format") fs.StringVar(&cfg.Security.KeyPath, "key", "", "path of file that contains X509 key in PEM format") fs.Uint64Var(&cfg.InitEpochVer, "epoch-ver", 1, "the initial epoch version value") + fs.StringVar(&cfg.MetricsAddr, "metrics-addr", "127.0.0.1:9090", "the address to pull metrics") return cfg } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 77ae6354bff..9d71be1129b 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -23,6 +23,7 @@ import ( "net/http" "os" "os/signal" + "strconv" "sync" "sync/atomic" "syscall" @@ -46,6 +47,7 @@ import ( "github.com/tikv/pd/pkg/statistics" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-heartbeat-bench/config" + "github.com/tikv/pd/tools/pd-heartbeat-bench/metrics" "go.etcd.io/etcd/pkg/report" "go.uber.org/zap" ) @@ -528,6 +530,7 @@ func main() { defer heartbeatTicker.Stop() var resolvedTSTicker = time.NewTicker(time.Second) defer resolvedTSTicker.Stop() + withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr) for { select { case <-heartbeatTicker.C: @@ -544,21 +547,19 @@ func main() { wg.Add(1) go regions.handleRegionHeartbeat(wg, streams[id], id, rep) } + if withMetric { + metrics.CollectMetrics(regions.updateRound, time.Second) + } wg.Wait() since := time.Since(startTime).Seconds() close(rep.Results()) regions.result(cfg.RegionCount, since) stats := <-r - log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), - zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), - zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), - zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), - zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), - zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), - zap.Uint64("max-epoch-version", maxVersion), - ) + log.Info("region heartbeat stats", + metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...) log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) + metrics.CollectRegionAndStoreStats(&stats, &since) regions.update(cfg, options) go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update. case <-resolvedTSTicker.C: @@ -594,6 +595,7 @@ func main() { } func exit(code int) { + metrics.OutputConclusion() os.Exit(code) } @@ -689,6 +691,21 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { c.IndentedJSON(http.StatusOK, output) }) + engine.GET("metrics-collect", func(c *gin.Context) { + second := c.Query("second") + if second == "" { + c.String(http.StatusBadRequest, "missing second") + return + } + secondInt, err := strconv.Atoi(second) + if err != nil { + c.String(http.StatusBadRequest, "invalid second") + return + } + metrics.CollectMetrics(metrics.WarmUpRound, time.Duration(secondInt)*time.Second) + c.IndentedJSON(http.StatusOK, "Successfully collect metrics") + }) + engine.Run(cfg.StatusAddr) } diff --git a/tools/pd-heartbeat-bench/metrics/util.go b/tools/pd-heartbeat-bench/metrics/util.go new file mode 100644 index 00000000000..bf5010e73da --- /dev/null +++ b/tools/pd-heartbeat-bench/metrics/util.go @@ -0,0 +1,231 @@ +// Copyright 2024 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "context" + "fmt" + "math" + "net/url" + "strings" + "time" + + "github.com/pingcap/log" + "github.com/prometheus/client_golang/api" + v1 "github.com/prometheus/client_golang/api/prometheus/v1" + "github.com/prometheus/common/model" + "go.etcd.io/etcd/pkg/report" + "go.uber.org/zap" +) + +var ( + prometheusCli api.Client + finalMetrics2Collect []Metric + avgRegionStats report.Stats + avgStoreTime float64 + collectRound = 1.0 + + metrics2Collect = []Metric{ + {promSQL: cpuMetric, name: "max cpu usage(%)", max: true}, + {promSQL: memoryMetric, name: "max memory usage(G)", max: true}, + {promSQL: goRoutineMetric, name: "max go routines", max: true}, + {promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"}, + {promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"}, + } + + // Prometheus SQL + cpuMetric = `max_over_time(irate(process_cpu_seconds_total{job=~".*pd.*"}[30s])[1h:30s]) * 100` + memoryMetric = `max_over_time(go_memstats_heap_inuse_bytes{job=~".*pd.*"}[1h])/1024/1024/1024` + goRoutineMetric = `max_over_time(go_goroutines{job=~".*pd.*"}[1h])` + hbLatency99Metric = `histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{}[1m])) by (le))` + hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))` + + // Heartbeat Performance Duration BreakDown + breakdownNames = []string{ + "AsyncHotStatsDuration", + "CollectRegionStats", + "Other", + "PreCheck", + "RegionGuide", + "SaveCache_CheckOverlaps", + "SaveCache_InvalidRegion", + "SaveCache_SetRegion", + "SaveCache_UpdateSubTree", + } + hbBreakdownMetricByName = func(name string) string { + return fmt.Sprintf(`sum(rate(pd_core_region_heartbeat_breakdown_handle_duration_seconds_sum{name="%s"}[1m]))`, name) + } +) + +type Metric struct { + promSQL string + name string + value float64 + // max indicates whether the metric is a max value + max bool +} + +func InitMetric2Collect(endpoint string) (withMetric bool) { + for _, name := range breakdownNames { + metrics2Collect = append(metrics2Collect, Metric{ + promSQL: hbBreakdownMetricByName(name), + name: name, + }) + } + finalMetrics2Collect = metrics2Collect + + if j := strings.Index(endpoint, "//"); j == -1 { + endpoint = "http://" + endpoint + } + cu, err := url.Parse(endpoint) + if err != nil { + log.Error("parse prometheus url error", zap.Error(err)) + return false + } + prometheusCli, err = NewPrometheusClient(*cu) + if err != nil { + log.Error("create prometheus client error", zap.Error(err)) + return false + } + // check whether the prometheus is available + _, err = getMetric(prometheusCli, goRoutineMetric, time.Now()) + if err != nil { + log.Error("check prometheus availability error, please check the prometheus address", zap.Error(err)) + return false + } + return true +} + +func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { + client, err := api.NewClient(api.Config{ + Address: prometheusURL.String(), + }) + if err != nil { + return nil, err + } + + return client, nil +} + +// WarmUpRound wait for the first round to warm up +const WarmUpRound = 1 + +func CollectMetrics(curRound int, wait time.Duration) { + if curRound < WarmUpRound { + return + } + // retry 5 times to get average value + res := make([]struct { + sum float64 + count int + }, len(metrics2Collect)) + for i := 0; i < 5; i++ { + for j, m := range metrics2Collect { + r, err := getMetric(prometheusCli, m.promSQL, time.Now()) + if err != nil { + log.Error("get metric error", zap.String("name", m.name), zap.String("prom sql", m.promSQL), zap.Error(err)) + } else if len(r) > 0 { + res[j].sum += r[0] + res[j].count += 1 + } + } + time.Sleep(wait) + } + getRes := func(index int) float64 { + if res[index].count == 0 { + return 0 + } + return res[index].sum / float64(res[index].count) + } + for i := 0; i < len(metrics2Collect); i++ { + metrics2Collect[i].value = getRes(i) + if metrics2Collect[i].max { + finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value) + } else { + finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1) + } + } + + collectRound += 1 + log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect))) +} + +func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) { + httpAPI := v1.NewAPI(cli) + val, _, err := httpAPI.Query(context.Background(), query, ts) + if err != nil { + return nil, err + } + valMatrix := val.(model.Vector) + if len(valMatrix) == 0 { + return nil, nil + } + var value []float64 + for i := range valMatrix { + value = append(value, float64(valMatrix[i].Value)) + // judge whether exceeded float maximum value + if math.IsNaN(value[i]) { + return nil, fmt.Errorf("prometheus query result exceeded float maximum value, result=%s", valMatrix[i].String()) + } + } + return value, nil +} + +func formatMetrics(ms []Metric) string { + res := "" + for _, m := range ms { + res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " " + } + return res +} + +func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) { + if regionStats != nil && storeTime != nil { + collect(*regionStats, *storeTime) + } +} + +func collect(regionStats report.Stats, storeTime float64) { + average := func(avg, new float64) float64 { + return (avg*collectRound + new) / (collectRound + 1) + } + + avgRegionStats.Total = time.Duration(average(float64(avgRegionStats.Total), float64(regionStats.Total))) + avgRegionStats.Average = average(avgRegionStats.Average, regionStats.Average) + avgRegionStats.Stddev = average(avgRegionStats.Stddev, regionStats.Stddev) + avgRegionStats.Fastest = average(avgRegionStats.Fastest, regionStats.Fastest) + avgRegionStats.Slowest = average(avgRegionStats.Slowest, regionStats.Slowest) + avgRegionStats.RPS = average(avgRegionStats.RPS, regionStats.RPS) + avgStoreTime = average(avgStoreTime, storeTime) +} + +func OutputConclusion() { + logFields := RegionFields(avgRegionStats, + zap.Float64("avg store time", avgStoreTime), + zap.Float64("current round", collectRound), + zap.String("metrics", formatMetrics(finalMetrics2Collect))) + log.Info("final metrics collected", logFields...) +} + +func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field { + return append([]zap.Field{ + zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), + zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), + zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), + zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), + zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), + zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), + }, fields...) +}