From ca8fd3db3e33c183a7f39887161a89be33474066 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 24 Jan 2024 15:45:21 +0800 Subject: [PATCH] tools: support tls and refactor (#7745) ref tikv/pd#7703 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- tools/pd-heartbeat-bench/config-template.toml | 6 +- tools/pd-heartbeat-bench/config/config.go | 51 +++++----- tools/pd-heartbeat-bench/main.go | 98 +++++++++---------- 3 files changed, 74 insertions(+), 81 deletions(-) diff --git a/tools/pd-heartbeat-bench/config-template.toml b/tools/pd-heartbeat-bench/config-template.toml index d2a0fa844fe..4964535a772 100644 --- a/tools/pd-heartbeat-bench/config-template.toml +++ b/tools/pd-heartbeat-bench/config-template.toml @@ -7,9 +7,9 @@ key-length = 56 replica = 3 leader-update-ratio = 0.06 -epoch-update-ratio = 0.04 -space-update-ratio = 0.15 -flow-update-ratio = 0.35 +epoch-update-ratio = 0.0 +space-update-ratio = 0.0 +flow-update-ratio = 0.0 no-update-ratio = 0.0 sample = false diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index 74c8159ced9..90254014d82 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -1,7 +1,6 @@ package config import ( - "math" "sync/atomic" "github.com/BurntSushi/toml" @@ -21,7 +20,7 @@ const ( defaultEpochUpdateRatio = 0.04 defaultSpaceUpdateRatio = 0.15 defaultFlowUpdateRatio = 0.35 - defaultNoUpdateRatio = 0 + defaultReportRatio = 1 defaultRound = 0 defaultSample = false @@ -39,6 +38,8 @@ type Config struct { Logger *zap.Logger LogProps *log.ZapProperties + Security configutil.SecurityConfig `toml:"security" json:"security"` + StoreCount int `toml:"store-count" json:"store-count"` RegionCount int `toml:"region-count" json:"region-count"` KeyLength int `toml:"key-length" json:"key-length"` @@ -47,7 +48,7 @@ type Config struct { EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"` SpaceUpdateRatio float64 `toml:"space-update-ratio" json:"space-update-ratio"` FlowUpdateRatio float64 `toml:"flow-update-ratio" json:"flow-update-ratio"` - NoUpdateRatio float64 `toml:"no-update-ratio" json:"no-update-ratio"` + ReportRatio float64 `toml:"report-ratio" json:"report-ratio"` Sample bool `toml:"sample" json:"sample"` Round int `toml:"round" json:"round"` } @@ -62,6 +63,9 @@ func NewConfig() *Config { fs.StringVar(&cfg.PDAddr, "pd-endpoints", "127.0.0.1:2379", "pd address") fs.StringVar(&cfg.Log.File.Filename, "log-file", "", "log file path") fs.StringVar(&cfg.StatusAddr, "status-addr", "127.0.0.1:20180", "status address") + fs.StringVar(&cfg.Security.CAPath, "cacert", "", "path of file that contains list of trusted TLS CAs") + fs.StringVar(&cfg.Security.CertPath, "cert", "", "path of file that contains X509 certificate in PEM format") + fs.StringVar(&cfg.Security.KeyPath, "key", "", "path of file that contains X509 key in PEM format") return cfg } @@ -133,8 +137,8 @@ func (c *Config) Adjust(meta *toml.MetaData) { if !meta.IsDefined("flow-update-ratio") { configutil.AdjustFloat64(&c.FlowUpdateRatio, defaultFlowUpdateRatio) } - if !meta.IsDefined("no-update-ratio") { - configutil.AdjustFloat64(&c.NoUpdateRatio, defaultNoUpdateRatio) + if !meta.IsDefined("report-ratio") { + configutil.AdjustFloat64(&c.ReportRatio, defaultReportRatio) } if !meta.IsDefined("sample") { c.Sample = defaultSample @@ -143,24 +147,20 @@ func (c *Config) Adjust(meta *toml.MetaData) { // Validate is used to validate configurations func (c *Config) Validate() error { - if c.LeaderUpdateRatio < 0 || c.LeaderUpdateRatio > 1 { - return errors.Errorf("leader-update-ratio must be in [0, 1]") - } - if c.EpochUpdateRatio < 0 || c.EpochUpdateRatio > 1 { - return errors.Errorf("epoch-update-ratio must be in [0, 1]") + if c.ReportRatio < 0 || c.ReportRatio > 1 { + return errors.Errorf("report-ratio must be in [0, 1]") } - if c.SpaceUpdateRatio < 0 || c.SpaceUpdateRatio > 1 { - return errors.Errorf("space-update-ratio must be in [0, 1]") + if c.LeaderUpdateRatio > c.ReportRatio || c.LeaderUpdateRatio < 0 { + return errors.Errorf("leader-update-ratio can not be negative or larger than report-ratio") } - if c.FlowUpdateRatio < 0 || c.FlowUpdateRatio > 1 { - return errors.Errorf("flow-update-ratio must be in [0, 1]") + if c.EpochUpdateRatio > c.ReportRatio || c.EpochUpdateRatio < 0 { + return errors.Errorf("epoch-update-ratio can not be negative or larger than report-ratio") } - if c.NoUpdateRatio < 0 || c.NoUpdateRatio > 1 { - return errors.Errorf("no-update-ratio must be in [0, 1]") + if c.SpaceUpdateRatio > c.ReportRatio || c.SpaceUpdateRatio < 0 { + return errors.Errorf("space-update-ratio can not be negative or larger than report-ratio") } - max := math.Max(c.LeaderUpdateRatio, math.Max(c.EpochUpdateRatio, math.Max(c.SpaceUpdateRatio, c.FlowUpdateRatio))) - if max+c.NoUpdateRatio > 1 { - return errors.Errorf("sum of update-ratio must be in [0, 1]") + if c.FlowUpdateRatio > c.ReportRatio || c.FlowUpdateRatio < 0 { + return errors.Errorf("flow-update-ratio can not be negative or larger than report-ratio") } return nil } @@ -174,11 +174,12 @@ func (c *Config) Clone() *Config { // Options is the option of the heartbeat-bench. type Options struct { + ReportRatio atomic.Value + LeaderUpdateRatio atomic.Value EpochUpdateRatio atomic.Value SpaceUpdateRatio atomic.Value FlowUpdateRatio atomic.Value - NoUpdateRatio atomic.Value } // NewOptions creates a new option. @@ -188,7 +189,7 @@ func NewOptions(cfg *Config) *Options { o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) - o.NoUpdateRatio.Store(cfg.NoUpdateRatio) + o.ReportRatio.Store(cfg.ReportRatio) return o } @@ -212,9 +213,9 @@ func (o *Options) GetFlowUpdateRatio() float64 { return o.FlowUpdateRatio.Load().(float64) } -// GetNoUpdateRatio returns the no update ratio. -func (o *Options) GetNoUpdateRatio() float64 { - return o.NoUpdateRatio.Load().(float64) +// GetReportRatio returns the report ratio. +func (o *Options) GetReportRatio() float64 { + return o.ReportRatio.Load().(float64) } // SetOptions sets the option. @@ -223,5 +224,5 @@ func (o *Options) SetOptions(cfg *Config) { o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) - o.NoUpdateRatio.Store(cfg.NoUpdateRatio) + o.ReportRatio.Store(cfg.ReportRatio) } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index 0e1af0de9ca..2a81c3e01df 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -22,7 +22,6 @@ import ( "net/http" "os" "os/signal" - "strings" "sync" "sync/atomic" "syscall" @@ -38,12 +37,12 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/spf13/pflag" + "github.com/tikv/pd/client/grpcutil" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/tools/pd-heartbeat-bench/config" "go.etcd.io/etcd/pkg/report" "go.uber.org/zap" - "google.golang.org/grpc" ) const ( @@ -57,19 +56,16 @@ const ( var clusterID uint64 -func trimHTTPPrefix(str string) string { - str = strings.TrimPrefix(str, "http://") - str = strings.TrimPrefix(str, "https://") - return str -} - -func newClient(cfg *config.Config) pdpb.PDClient { - addr := trimHTTPPrefix(cfg.PDAddr) - cc, err := grpc.Dial(addr, grpc.WithInsecure()) +func newClient(ctx context.Context, cfg *config.Config) (pdpb.PDClient, error) { + tlsConfig, err := cfg.Security.ToTLSConfig() if err != nil { - log.Fatal("failed to create gRPC connection", zap.Error(err)) + return nil, err } - return pdpb.NewPDClient(cc) + cc, err := grpcutil.GetClientConn(ctx, cfg.PDAddr, tlsConfig) + if err != nil { + return nil, err + } + return pdpb.NewPDClient(cc), nil } func initClusterID(ctx context.Context, cli pdpb.PDClient) { @@ -255,31 +251,32 @@ func (rs *Regions) init(cfg *config.Config, options *config.Options) []int { func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes []int) { rs.updateRound += 1 - rs.updateLeader = pick(indexes, cfg, options.GetLeaderUpdateRatio()) - rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio()) - rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio()) - rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio()) - updatedRegionsMap := make(map[int]*pdpb.RegionHeartbeatRequest) - var awakenRegions []*pdpb.RegionHeartbeatRequest + reportRegions := pick(indexes, cfg.RegionCount, options.GetReportRatio()) + reportCount := len(reportRegions) + rs.updateLeader = pick(reportRegions, reportCount, options.GetLeaderUpdateRatio()) + rs.updateEpoch = pick(reportRegions, reportCount, options.GetEpochUpdateRatio()) + rs.updateSpace = pick(reportRegions, reportCount, options.GetSpaceUpdateRatio()) + rs.updateFlow = pick(reportRegions, reportCount, options.GetFlowUpdateRatio()) + var ( + updatedStatisticsMap = make(map[int]*pdpb.RegionHeartbeatRequest) + awakenRegions []*pdpb.RegionHeartbeatRequest + ) // update leader for _, i := range rs.updateLeader { region := rs.regions[i] region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica] - updatedRegionsMap[i] = region } // update epoch for _, i := range rs.updateEpoch { region := rs.regions[i] region.Region.RegionEpoch.Version += 1 - updatedRegionsMap[i] = region } // update space for _, i := range rs.updateSpace { region := rs.regions[i] region.ApproximateSize = uint64(bytesUnit * rand.Float64()) region.ApproximateKeys = uint64(keysUint * rand.Float64()) - updatedRegionsMap[i] = region } // update flow for _, i := range rs.updateFlow { @@ -292,25 +289,34 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [ Get: uint64(queryUnit * rand.Float64()), Put: uint64(queryUnit * rand.Float64()), } - updatedRegionsMap[i] = region + updatedStatisticsMap[i] = region } // update interval for _, region := range rs.regions { region.Interval.StartTimestamp = region.Interval.EndTimestamp region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval } - for _, region := range updatedRegionsMap { + for _, i := range reportRegions { + region := rs.regions[i] + // reset the statistics of the region which is not updated + if _, exist := updatedStatisticsMap[i]; !exist { + region.BytesWritten = 0 + region.BytesRead = 0 + region.KeysWritten = 0 + region.KeysRead = 0 + region.QueryStats = &pdpb.QueryStats{} + } awakenRegions = append(awakenRegions, region) } - noUpdatedRegions := pickNoUpdatedRegions(indexes, cfg, options.GetNoUpdateRatio(), updatedRegionsMap) - for _, i := range noUpdatedRegions { - awakenRegions = append(awakenRegions, rs.regions[i]) - } + rs.awakenRegions.Store(awakenRegions) } func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient { - cli := newClient(cfg) + cli, err := newClient(ctx, cfg) + if err != nil { + log.Fatal("create client error", zap.Error(err)) + } stream, err := cli.RegionHeartbeat(ctx) if err != nil { log.Fatal("create stream error", zap.Error(err)) @@ -359,7 +365,7 @@ func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_Regi return } } - log.Info("store finish one round region heartbeat", zap.Uint64("store-id", storeID), zap.Duration("cost-time", time.Since(start))) + log.Info("store finish one round region heartbeat", zap.Uint64("store-id", storeID), zap.Duration("cost-time", time.Since(start)), zap.Int("reported-region-count", len(regions))) } // Stores contains store stats with lock. @@ -425,28 +431,11 @@ func (s *Stores) update(rs *Regions) { } } -func pick(slice []int, cfg *config.Config, ratio float64) []int { - rand.Shuffle(cfg.RegionCount, func(i, j int) { +func pick(slice []int, total int, ratio float64) []int { + rand.Shuffle(total, func(i, j int) { slice[i], slice[j] = slice[j], slice[i] }) - return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...) -} - -func pickNoUpdatedRegions(slice []int, cfg *config.Config, ratio float64, updatedMap map[int]*pdpb.RegionHeartbeatRequest) []int { - if ratio == 0 { - return nil - } - rand.Shuffle(cfg.RegionCount, func(i, j int) { - slice[i], slice[j] = slice[j], slice[i] - }) - NoUpdatedRegionsNum := int(float64(cfg.RegionCount) * ratio) - res := make([]int, 0, NoUpdatedRegionsNum) - for i := 0; len(res) < NoUpdatedRegionsNum; i++ { - if _, ok := updatedMap[slice[i]]; !ok { - res = append(res, slice[i]) - } - } - return res + return append(slice[:0:0], slice[0:int(float64(total)*ratio)]...) } func main() { @@ -487,7 +476,10 @@ func main() { sig = <-sc cancel() }() - cli := newClient(cfg) + cli, err := newClient(ctx, cfg) + if err != nil { + log.Fatal("create client error", zap.Error(err)) + } initClusterID(ctx, cli) go runHTTPServer(cfg, options) regions := new(Regions) @@ -604,7 +596,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio() newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio() newCfg.SpaceUpdateRatio = options.GetSpaceUpdateRatio() - newCfg.NoUpdateRatio = options.GetNoUpdateRatio() + newCfg.ReportRatio = options.GetReportRatio() if err := c.BindJSON(&newCfg); err != nil { c.String(http.StatusBadRequest, err.Error()) return @@ -622,7 +614,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { output.LeaderUpdateRatio = options.GetLeaderUpdateRatio() output.EpochUpdateRatio = options.GetEpochUpdateRatio() output.SpaceUpdateRatio = options.GetSpaceUpdateRatio() - output.NoUpdateRatio = options.GetNoUpdateRatio() + output.ReportRatio = options.GetReportRatio() c.IndentedJSON(http.StatusOK, output) })