Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tools: support changing the percentage of heartbeat #7698

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tools/pd-heartbeat-bench/config-template.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ leader-update-ratio = 0.06
epoch-update-ratio = 0.04
space-update-ratio = 0.15
flow-update-ratio = 0.35
no-update-ratio = 0.0

sample = false
13 changes: 13 additions & 0 deletions tools/pd-heartbeat-bench/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const (
defaultEpochUpdateRatio = 0.04
defaultSpaceUpdateRatio = 0.15
defaultFlowUpdateRatio = 0.35
defaultNoUpdateRatio = 0
defaultRound = 0
defaultSample = false

Expand All @@ -45,6 +46,7 @@ type Config struct {
EpochUpdateRatio float64 `toml:"epoch-update-ratio" json:"epoch-update-ratio"`
SpaceUpdateRatio float64 `toml:"space-update-ratio" json:"space-update-ratio"`
FlowUpdateRatio float64 `toml:"flow-update-ratio" json:"flow-update-ratio"`
NoUpdateRatio float64 `toml:"no-update-ratio" json:"no-update-ratio"`
Sample bool `toml:"sample" json:"sample"`
Round int `toml:"round" json:"round"`
}
Expand Down Expand Up @@ -129,6 +131,9 @@ func (c *Config) Adjust(meta *toml.MetaData) {
if !meta.IsDefined("flow-update-ratio") {
configutil.AdjustFloat64(&c.FlowUpdateRatio, defaultFlowUpdateRatio)
}
if !meta.IsDefined("no-update-ratio") {
configutil.AdjustFloat64(&c.NoUpdateRatio, defaultNoUpdateRatio)
}
if !meta.IsDefined("sample") {
c.Sample = defaultSample
}
Expand All @@ -147,6 +152,7 @@ type Options struct {
EpochUpdateRatio atomic.Value
SpaceUpdateRatio atomic.Value
FlowUpdateRatio atomic.Value
NoUpdateRatio atomic.Value
}

// NewOptions creates a new option.
Expand All @@ -156,6 +162,7 @@ func NewOptions(cfg *Config) *Options {
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
return o
}

Expand All @@ -179,10 +186,16 @@ func (o *Options) GetFlowUpdateRatio() float64 {
return o.FlowUpdateRatio.Load().(float64)
}

// GetNoUpdateRatio returns the no update ratio.
func (o *Options) GetNoUpdateRatio() float64 {
return o.NoUpdateRatio.Load().(float64)
}

// SetOptions sets the option.
func (o *Options) SetOptions(cfg *Config) {
o.LeaderUpdateRatio.Store(cfg.LeaderUpdateRatio)
o.EpochUpdateRatio.Store(cfg.EpochUpdateRatio)
o.SpaceUpdateRatio.Store(cfg.SpaceUpdateRatio)
o.FlowUpdateRatio.Store(cfg.FlowUpdateRatio)
o.NoUpdateRatio.Store(cfg.NoUpdateRatio)
}
55 changes: 52 additions & 3 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,8 @@ func newEndKey(id uint64, keyLen int) []byte {

// Regions simulates all regions to heartbeat.
type Regions struct {
regions []*pdpb.RegionHeartbeatRequest
regions []*pdpb.RegionHeartbeatRequest
awakenRegions atomic.Value

updateRound int

Expand Down Expand Up @@ -258,22 +259,27 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
rs.updateEpoch = pick(indexes, cfg, options.GetEpochUpdateRatio())
rs.updateSpace = pick(indexes, cfg, options.GetSpaceUpdateRatio())
rs.updateFlow = pick(indexes, cfg, options.GetFlowUpdateRatio())
updatedRegionsMap := make(map[int]*pdpb.RegionHeartbeatRequest)
var awakenRegions []*pdpb.RegionHeartbeatRequest

// update leader
for _, i := range rs.updateLeader {
region := rs.regions[i]
region.Leader = region.Region.Peers[rs.updateRound%cfg.Replica]
updatedRegionsMap[i] = region
}
// update epoch
for _, i := range rs.updateEpoch {
region := rs.regions[i]
region.Region.RegionEpoch.Version += 1
updatedRegionsMap[i] = region
}
// update space
for _, i := range rs.updateSpace {
region := rs.regions[i]
region.ApproximateSize = uint64(bytesUnit * rand.Float64())
region.ApproximateKeys = uint64(keysUint * rand.Float64())
updatedRegionsMap[i] = region
}
// update flow
for _, i := range rs.updateFlow {
Expand All @@ -286,12 +292,21 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options, indexes [
Get: uint64(queryUnit * rand.Float64()),
Put: uint64(queryUnit * rand.Float64()),
}
updatedRegionsMap[i] = region
}
// update interval
for _, region := range rs.regions {
region.Interval.StartTimestamp = region.Interval.EndTimestamp
region.Interval.EndTimestamp = region.Interval.StartTimestamp + regionReportInterval
}
for _, region := range updatedRegionsMap {
awakenRegions = append(awakenRegions, region)
}
noUpdatedRegions := pickNoUpdatedRegions(indexes, cfg, options.GetNoUpdateRatio(), updatedRegionsMap)
for _, i := range noUpdatedRegions {
awakenRegions = append(awakenRegions, rs.regions[i])
}
rs.awakenRegions.Store(awakenRegions)
}

func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient {
Expand All @@ -312,8 +327,14 @@ func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_Regi

func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_RegionHeartbeatClient, storeID uint64, rep report.Report) {
defer wg.Done()
var regions []*pdpb.RegionHeartbeatRequest
for _, region := range rs.regions {
var regions, toUpdate []*pdpb.RegionHeartbeatRequest
updatedRegions := rs.awakenRegions.Load()
if updatedRegions == nil {
toUpdate = rs.regions
} else {
toUpdate = updatedRegions.([]*pdpb.RegionHeartbeatRequest)
}
for _, region := range toUpdate {
if region.Leader.StoreId != storeID {
continue
}
Expand Down Expand Up @@ -411,6 +432,28 @@ func pick(slice []int, cfg *config.Config, ratio float64) []int {
return append(slice[:0:0], slice[0:int(float64(cfg.RegionCount)*ratio)]...)
}

func pickNoUpdatedRegions(slice []int, cfg *config.Config, ratio float64, updatedMap map[int]*pdpb.RegionHeartbeatRequest) []int {
if ratio == 0 {
return nil
}
rand.Shuffle(cfg.RegionCount, func(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
})
NoUpdatedRegionsNum := int(float64(cfg.RegionCount) * ratio)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And do we need check the NoUpdatedRegionsNum + len(updatedMap) should less than len(slice)

res := make([]int, 0, NoUpdatedRegionsNum)
i := 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
i := 0
for i := 0; len(res) < NoUpdatedRegionsNum; i++ {
if _, ok := updatedMap[slice[i]]; !ok {
res = append(res, slice[i])
}
}

for {
if len(res) == NoUpdatedRegionsNum {
break
}
if _, ok := updatedMap[slice[i]]; !ok {
res = append(res, slice[i])
}
i++
}
return res
}

func main() {
rand.New(rand.NewSource(0)) // Ensure consistent behavior multiple times
cfg := config.NewConfig()
Expand Down Expand Up @@ -562,6 +605,11 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
pprof.Register(engine)
engine.PUT("config", func(c *gin.Context) {
newCfg := cfg.Clone()
newCfg.FlowUpdateRatio = options.GetFlowUpdateRatio()
newCfg.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
newCfg.EpochUpdateRatio = options.GetEpochUpdateRatio()
newCfg.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
newCfg.NoUpdateRatio = options.GetNoUpdateRatio()
if err := c.BindJSON(&newCfg); err != nil {
c.String(http.StatusBadRequest, err.Error())
return
Expand All @@ -576,6 +624,7 @@ func runHTTPServer(cfg *config.Config, options *config.Options) {
output.LeaderUpdateRatio = options.GetLeaderUpdateRatio()
output.EpochUpdateRatio = options.GetEpochUpdateRatio()
output.SpaceUpdateRatio = options.GetSpaceUpdateRatio()
output.NoUpdateRatio = options.GetNoUpdateRatio()

c.IndentedJSON(http.StatusOK, output)
})
Expand Down
Loading