diff --git a/tools/pd-heartbeat-bench/config-template.toml b/tools/pd-heartbeat-bench/config-template.toml index 60a4e011128..d2a0fa844fe 100644 --- a/tools/pd-heartbeat-bench/config-template.toml +++ b/tools/pd-heartbeat-bench/config-template.toml @@ -10,5 +10,6 @@ leader-update-ratio = 0.06 epoch-update-ratio = 0.04 space-update-ratio = 0.15 flow-update-ratio = 0.35 +no-update-ratio = 0.0 sample = false diff --git a/tools/pd-heartbeat-bench/config/config.go b/tools/pd-heartbeat-bench/config/config.go index 0637d991c5e..be669c00666 100644 --- a/tools/pd-heartbeat-bench/config/config.go +++ b/tools/pd-heartbeat-bench/config/config.go @@ -1,6 +1,7 @@ package config import ( + "math" "sync/atomic" "github.com/BurntSushi/toml" @@ -20,6 +21,7 @@ const ( defaultEpochUpdateRatio = 0.04 defaultSpaceUpdateRatio = 0.15 defaultFlowUpdateRatio = 0.35 + defaultNoUpdateRatio = 0 defaultRound = 0 defaultSample = false @@ -45,6 +47,7 @@ type Config struct { EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"` SpaceUpdateRatio float64 `toml:"space-update-ratio" json:"space-update-ratio"` FlowUpdateRatio float64 `toml:"flow-update-ratio" json:"flow-update-ratio"` + NoUpdateRatio float64 `toml:"no-update-ratio" json:"no-update-ratio"` Sample bool `toml:"sample" json:"sample"` Round int `toml:"round" json:"round"` } @@ -90,7 +93,7 @@ func (c *Config) Parse(arguments []string) error { } c.Adjust(meta) - return nil + return c.Validate() } // Adjust is used to adjust configurations @@ -129,11 +132,38 @@ func (c *Config) Adjust(meta *toml.MetaData) { if !meta.IsDefined("flow-update-ratio") { configutil.AdjustFloat64(&c.FlowUpdateRatio, defaultFlowUpdateRatio) } + if !meta.IsDefined("no-update-ratio") { + configutil.AdjustFloat64(&c.NoUpdateRatio, defaultNoUpdateRatio) + } if !meta.IsDefined("sample") { c.Sample = defaultSample } } +// Validate is used to validate configurations +func (c *Config) Validate() error { + if c.LeaderUpdateRatio < 0 || c.LeaderUpdateRatio > 1 { + return errors.Errorf("leader-update-ratio must be in [0, 1]") + } + if c.EpochUpdateRatio < 0 || c.EpochUpdateRatio > 1 { + return errors.Errorf("epoch-update-ratio must be in [0, 1]") + } + if c.SpaceUpdateRatio < 0 || c.SpaceUpdateRatio > 1 { + return errors.Errorf("space-update-ratio must be in [0, 1]") + } + if c.FlowUpdateRatio < 0 || c.FlowUpdateRatio > 1 { + return errors.Errorf("flow-update-ratio must be in [0, 1]") + } + if c.NoUpdateRatio < 0 || c.NoUpdateRatio > 1 { + return errors.Errorf("no-update-ratio must be in [0, 1]") + } + max := math.Max(c.LeaderUpdateRatio, math.Max(c.EpochUpdateRatio, math.Max(c.SpaceUpdateRatio, c.FlowUpdateRatio))) + if max+c.NoUpdateRatio > 1 { + return errors.Errorf("sum of update-ratio must be in [0, 1]") + } + return nil +} + // Clone creates a copy of current config. func (c *Config) Clone() *Config { cfg := &Config{} @@ -147,6 +177,7 @@ type Options struct { EpochUpdateRatio atomic.Value SpaceUpdateRatio atomic.Value FlowUpdateRatio atomic.Value + NoUpdateRatio atomic.Value } // NewOptions creates a new option. @@ -156,6 +187,7 @@ func NewOptions(cfg *Config) *Options { o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) + o.NoUpdateRatio.Store(cfg.NoUpdateRatio) return o } @@ -179,10 +211,16 @@ func (o *Options) GetFlowUpdateRatio() float64 { return o.FlowUpdateRatio.Load().(float64) } +// GetNoUpdateRatio returns the no update ratio. +func (o *Options) GetNoUpdateRatio() float64 { + return o.NoUpdateRatio.Load().(float64) +} + // SetOptions sets the option. func (o *Options) SetOptions(cfg *Config) { o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio) o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio) o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio) o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio) + o.NoUpdateRatio.Store(cfg.NoUpdateRatio) } diff --git a/tools/pd-heartbeat-bench/main.go b/tools/pd-heartbeat-bench/main.go index e9fe32e067d..0e1af0de9ca 100644 --- a/tools/pd-heartbeat-bench/main.go +++ b/tools/pd-heartbeat-bench/main.go @@ -186,7 +186,8 @@ func newEndKey(id uint64, keyLen int) []byte { // Regions simulates all regions to heartbeat. type Regions struct { - regions []*pdpb.RegionHeartbeatRequest + regions []*pdpb.RegionHeartbeatRequest + awakenRegions atomic.Value updateRound int @@ -258,22 +259,27 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [ rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio()) rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio()) rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio()) + updatedRegionsMap := make(map[int]*pdpb.RegionHeartbeatRequest) + var awakenRegions []*pdpb.RegionHeartbeatRequest // update leader for _, i := range rs.updateLeader { region := rs.regions[i] region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica] + updatedRegionsMap[i] = region } // update epoch for _, i := range rs.updateEpoch { region := rs.regions[i] region.Region.RegionEpoch.Version += 1 + updatedRegionsMap[i] = region } // update space for _, i := range rs.updateSpace { region := rs.regions[i] region.ApproximateSize = uint64(bytesUnit * rand.Float64()) region.ApproximateKeys = uint64(keysUint * rand.Float64()) + updatedRegionsMap[i] = region } // update flow for _, i := range rs.updateFlow { @@ -286,12 +292,21 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [ Get: uint64(queryUnit * rand.Float64()), Put: uint64(queryUnit * rand.Float64()), } + updatedRegionsMap[i] = region } // update interval for _, region := range rs.regions { region.Interval.StartTimestamp = region.Interval.EndTimestamp region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval } + for _, region := range updatedRegionsMap { + awakenRegions = append(awakenRegions, region) + } + noUpdatedRegions := pickNoUpdatedRegions(indexes, cfg, options.GetNoUpdateRatio(), updatedRegionsMap) + for _, i := range noUpdatedRegions { + awakenRegions = append(awakenRegions, rs.regions[i]) + } + rs.awakenRegions.Store(awakenRegions) } func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient { @@ -312,8 +327,14 @@ func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_Regi func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_RegionHeartbeatClient, storeID uint64, rep report.Report) { defer wg.Done() - var regions []*pdpb.RegionHeartbeatRequest - for _, region := range rs.regions { + var regions, toUpdate []*pdpb.RegionHeartbeatRequest + updatedRegions := rs.awakenRegions.Load() + if updatedRegions == nil { + toUpdate = rs.regions + } else { + toUpdate = updatedRegions.([]*pdpb.RegionHeartbeatRequest) + } + for _, region := range toUpdate { if region.Leader.StoreId != storeID { continue } @@ -411,6 +432,23 @@ func pick(slice []int, cfg *config.Config, ratio float64) []int { return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...) } +func pickNoUpdatedRegions(slice []int, cfg *config.Config, ratio float64, updatedMap map[int]*pdpb.RegionHeartbeatRequest) []int { + if ratio == 0 { + return nil + } + rand.Shuffle(cfg.RegionCount, func(i, j int) { + slice[i], slice[j] = slice[j], slice[i] + }) + NoUpdatedRegionsNum := int(float64(cfg.RegionCount) * ratio) + res := make([]int, 0, NoUpdatedRegionsNum) + for i := 0; len(res) < NoUpdatedRegionsNum; i++ { + if _, ok := updatedMap[slice[i]]; !ok { + res = append(res, slice[i]) + } + } + return res +} + func main() { rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times cfg := config.NewConfig() @@ -562,11 +600,19 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { pprof.Register(engine) engine.PUT("config", func(c *gin.Context) { newCfg := cfg.Clone() + newCfg.FlowUpdateRatio = options.GetFlowUpdateRatio() + newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio() + newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio() + newCfg.SpaceUpdateRatio = options.GetSpaceUpdateRatio() + newCfg.NoUpdateRatio = options.GetNoUpdateRatio() if err := c.BindJSON(&newCfg); err != nil { c.String(http.StatusBadRequest, err.Error()) return } - + if err := newCfg.Validate(); err != nil { + c.String(http.StatusBadRequest, err.Error()) + return + } options.SetOptions(newCfg) c.String(http.StatusOK, "Successfully updated the configuration") }) @@ -576,6 +622,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) { output.LeaderUpdateRatio = options.GetLeaderUpdateRatio() output.EpochUpdateRatio = options.GetEpochUpdateRatio() output.SpaceUpdateRatio = options.GetSpaceUpdateRatio() + output.NoUpdateRatio = options.GetNoUpdateRatio() c.IndentedJSON(http.StatusOK, output) })