From 37d30f6a6f1d67e68836e65945bad0d4c136865d Mon Sep 17 00:00:00 2001 From: husharp Date: Mon, 3 Jun 2024 17:46:37 +0800 Subject: [PATCH] address comment Signed-off-by: husharp --- tools/pd-heartbeat-bench/main.go | 21 +++--- tools/pd-heartbeat-bench/metrics/util.go | 83 +++++++++++++++--------- 2 files changed, 62 insertions(+), 42 deletions(-) diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 16c72515555..44ded2a9f25 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -484,7 +484,6 @@ func main() { log.Fatal("initialize logger error", zap.Error(err)) } - metrics.InitMetric2Collect(cfg.MetricsAddr) maxVersion = cfg.InitEpochVer options := config.NewOptions(cfg) // let PD have enough time to start @@ -531,11 +530,11 @@ func main() { defer heartbeatTicker.Stop() var resolvedTSTicker = time.NewTicker(time.Second) defer resolvedTSTicker.Stop() + withMetric := metrics.InitMetric2Collect(cfg.MetricsAddr) for { select { case <-heartbeatTicker.C: if cfg.Round != 0 && regions.updateRound > cfg.Round { - metrics.OutputConclusion() exit(0) } rep := newReport(cfg) @@ -548,21 +547,17 @@ func main() { wg.Add(1) go regions.handleRegionHeartbeat(wg, streams[id], id, rep) } - go metrics.CollectMetrics(regions.updateRound, 1*time.Second) + if withMetric { + metrics.CollectMetrics(regions.updateRound, time.Second) + } wg.Wait() since := time.Since(startTime).Seconds() close(rep.Results()) regions.result(cfg.RegionCount, since) stats := <-r - log.Info("region heartbeat stats", zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), - zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), - zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), - zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), - zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), - zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), - zap.Uint64("max-epoch-version", maxVersion), - ) + log.Info("region heartbeat stats", + metrics.RegionFields(stats, zap.Uint64("max-epoch-version", maxVersion))...) log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since))) metrics.CollectRegionAndStoreStats(&stats, &since) regions.update(cfg, options) @@ -600,6 +595,7 @@ func main() { } func exit(code int) { + metrics.OutputConclusion() os.Exit(code) } @@ -695,7 +691,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { c.IndentedJSON(http.StatusOK, output) }) - engine.GET("metrics_collect", func(c *gin.Context) { + engine.GET("metrics-collect", func(c *gin.Context) { second := c.Query("second") if second == "" { c.String(http.StatusBadRequest, "missing second") @@ -706,7 +702,6 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { c.String(http.StatusBadRequest, "invalid second") return } - metrics.InitMetric2Collect(cfg.MetricsAddr) metrics.CollectMetrics(metrics.WarmUpRound, time.Duration(secondInt)*time.Second) c.IndentedJSON(http.StatusOK, "Successfully collect metrics") }) diff --git a/tools/pd-heartbeat-bench/metrics/util.go b/tools/pd-heartbeat-bench/metrics/util.go index b7883fb354b..bf5010e73da 100644 --- a/tools/pd-heartbeat-bench/metrics/util.go +++ b/tools/pd-heartbeat-bench/metrics/util.go @@ -31,16 +31,16 @@ import ( ) var ( - prometheusCli api.Client - avgMetrics2Collect []Metric - avgRegionStats report.Stats - avgStoreTime float64 - collectRound = 1.0 + prometheusCli api.Client + finalMetrics2Collect []Metric + avgRegionStats report.Stats + avgStoreTime float64 + collectRound = 1.0 metrics2Collect = []Metric{ - {promSQL: cpuMetric, name: "max cpu usage(%)"}, - {promSQL: memoryMetric, name: "max memory usage(G)"}, - {promSQL: goRoutineMetric, name: "max go routines"}, + {promSQL: cpuMetric, name: "max cpu usage(%)", max: true}, + {promSQL: memoryMetric, name: "max memory usage(G)", max: true}, + {promSQL: goRoutineMetric, name: "max go routines", max: true}, {promSQL: hbLatency99Metric, name: "99% Heartbeat Latency(ms)"}, {promSQL: hbLatencyAvgMetric, name: "Avg Heartbeat Latency(ms)"}, } @@ -50,11 +50,10 @@ var ( memoryMetric = `max_over_time(go_memstats_heap_inuse_bytes{job=~".*pd.*"}[1h])/1024/1024/1024` goRoutineMetric = `max_over_time(go_goroutines{job=~".*pd.*"}[1h])` hbLatency99Metric = `histogram_quantile(0.99, sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_bucket{}[1m])) by (le))` - hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m])) * 1000` + hbLatencyAvgMetric = `sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_sum{}[1m])) / sum(rate(pd_scheduler_handle_region_heartbeat_duration_seconds_count{}[1m]))` // Heartbeat Performance Duration BreakDown - hbBreakdownName = "Heartbeat Performance Duration BreakDown (Accumulation)(ms)" - breakdownNames = []string{ + breakdownNames = []string{ "AsyncHotStatsDuration", "CollectRegionStats", "Other", @@ -74,16 +73,18 @@ type Metric struct { promSQL string name string value float64 + // max indicates whether the metric is a max value + max bool } -func InitMetric2Collect(endpoint string) { +func InitMetric2Collect(endpoint string) (withMetric bool) { for _, name := range breakdownNames { metrics2Collect = append(metrics2Collect, Metric{ promSQL: hbBreakdownMetricByName(name), - name: fmt.Sprintf("%s with %s", hbBreakdownName, name), + name: name, }) } - avgMetrics2Collect = metrics2Collect + finalMetrics2Collect = metrics2Collect if j := strings.Index(endpoint, "//"); j == -1 { endpoint = "http://" + endpoint @@ -91,13 +92,20 @@ func InitMetric2Collect(endpoint string) { cu, err := url.Parse(endpoint) if err != nil { log.Error("parse prometheus url error", zap.Error(err)) - return + return false } prometheusCli, err = NewPrometheusClient(*cu) if err != nil { log.Error("create prometheus client error", zap.Error(err)) - return + return false + } + // check whether the prometheus is available + _, err = getMetric(prometheusCli, goRoutineMetric, time.Now()) + if err != nil { + log.Error("check prometheus availability error, please check the prometheus address", zap.Error(err)) + return false } + return true } func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { @@ -111,7 +119,7 @@ func NewPrometheusClient(prometheusURL url.URL) (api.Client, error) { return client, nil } -// wait for the first round to warm up +// WarmUpRound wait for the first round to warm up const WarmUpRound = 1 func CollectMetrics(curRound int, wait time.Duration) { @@ -143,11 +151,15 @@ func CollectMetrics(curRound int, wait time.Duration) { } for i := 0; i < len(metrics2Collect); i++ { metrics2Collect[i].value = getRes(i) - avgMetrics2Collect[i].value = (avgMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1) + if metrics2Collect[i].max { + finalMetrics2Collect[i].value = max(finalMetrics2Collect[i].value, metrics2Collect[i].value) + } else { + finalMetrics2Collect[i].value = (finalMetrics2Collect[i].value*collectRound + metrics2Collect[i].value) / (collectRound + 1) + } } collectRound += 1 - log.Info("metrics collected", zap.String("metrics", formatMetrics(metrics2Collect))) + log.Info("metrics collected", zap.Float64("round", collectRound), zap.String("metrics", formatMetrics(metrics2Collect))) } func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) { @@ -171,6 +183,14 @@ func getMetric(cli api.Client, query string, ts time.Time) ([]float64, error) { return value, nil } +func formatMetrics(ms []Metric) string { + res := "" + for _, m := range ms { + res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " " + } + return res +} + func CollectRegionAndStoreStats(regionStats *report.Stats, storeTime *float64) { if regionStats != nil && storeTime != nil { collect(*regionStats, *storeTime) @@ -191,16 +211,21 @@ func collect(regionStats report.Stats, storeTime float64) { avgStoreTime = average(avgStoreTime, storeTime) } -func formatMetrics(ms []Metric) string { - res := "" - for _, m := range ms { - res += "[" + m.name + "]" + " " + fmt.Sprintf("%.10f", m.value) + " " - } - return res +func OutputConclusion() { + logFields := RegionFields(avgRegionStats, + zap.Float64("avg store time", avgStoreTime), + zap.Float64("current round", collectRound), + zap.String("metrics", formatMetrics(finalMetrics2Collect))) + log.Info("final metrics collected", logFields...) } -func OutputConclusion() { - log.Info("average metrics", zap.Float64("avg store time", avgStoreTime), - zap.Any("avg region stats", avgRegionStats), - zap.String("metrics", formatMetrics(avgMetrics2Collect))) +func RegionFields(stats report.Stats, fields ...zap.Field) []zap.Field { + return append([]zap.Field{ + zap.String("total", fmt.Sprintf("%.4fs", stats.Total.Seconds())), + zap.String("slowest", fmt.Sprintf("%.4fs", stats.Slowest)), + zap.String("fastest", fmt.Sprintf("%.4fs", stats.Fastest)), + zap.String("average", fmt.Sprintf("%.4fs", stats.Average)), + zap.String("stddev", fmt.Sprintf("%.4fs", stats.Stddev)), + zap.String("rps", fmt.Sprintf("%.4f", stats.RPS)), + }, fields...) }