Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add new region score formula #3269

Merged
merged 5 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (mc *Cluster) SetTolerantSizeRatio(v float64) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.TolerantSizeRatio = v })
}

// SetRegionScoreFormulaVersion updates the RegionScoreFormulaVersion configuration.
func (mc *Cluster) SetRegionScoreFormulaVersion(v string) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.RegionScoreFormulaVersion = v })
}

// SetLeaderScheduleLimit updates the LeaderScheduleLimit configuration.
func (mc *Cluster) SetLeaderScheduleLimit(v int) {
mc.updateScheduleConfig(func(s *config.ScheduleConfig) { s.LeaderScheduleLimit = uint64(v) })
Expand Down
2 changes: 1 addition & 1 deletion server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (h *confHandler) Get(w http.ResponseWriter, r *http.Request) {
// @Router /config/default [get]
func (h *confHandler) GetDefault(w http.ResponseWriter, r *http.Request) {
config := config.NewConfig()
err := config.Adjust(nil)
err := config.Adjust(nil, false)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion server/api/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newStoreInfo(opt *config.ScheduleConfig, store *core.StoreInfo) *StoreInfo
LeaderSize: store.GetLeaderSize(),
RegionCount: store.GetRegionCount(),
RegionWeight: store.GetRegionWeight(),
RegionScore: store.RegionScore(opt.HighSpaceRatio, opt.LowSpaceRatio, 0),
RegionScore: store.RegionScore(opt.RegionScoreFormulaVersion, opt.HighSpaceRatio, opt.LowSpaceRatio, 0, 0),
RegionSize: store.GetRegionSize(),
SendingSnapCount: store.GetSendingSnapCount(),
ReceivingSnapCount: store.GetReceivingSnapCount(),
Expand Down
2 changes: 1 addition & 1 deletion server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ type testCluster struct {
func newTestScheduleConfig() (*config.ScheduleConfig, *config.PersistOptions, error) {
cfg := config.NewConfig()
cfg.Schedule.TolerantSizeRatio = 5
if err := cfg.Adjust(nil); err != nil {
if err := cfg.Adjust(nil, false); err != nil {
return nil, nil, err
}
opt := config.NewPersistOptions(cfg)
Expand Down
48 changes: 28 additions & 20 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (c *Config) Parse(arguments []string) error {
return errors.Errorf("'%s' is an invalid flag", c.flagSet.Arg(0))
}

err = c.Adjust(meta)
err = c.Adjust(meta, false)
return err
}

Expand Down Expand Up @@ -461,7 +461,7 @@ func (m *configMetaData) CheckUndecoded() error {
}

// Adjust is used to adjust the PD configurations.
func (c *Config) Adjust(meta *toml.MetaData) error {
func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
configMetaData := newConfigMetadata(meta)
if err := configMetaData.CheckUndecoded(); err != nil {
c.WarningMsgs = append(c.WarningMsgs, err.Error())
Expand Down Expand Up @@ -538,7 +538,7 @@ func (c *Config) Adjust(meta *toml.MetaData) error {

adjustString(&c.Metric.PushJob, c.Name)

if err := c.Schedule.adjust(configMetaData.Child("schedule")); err != nil {
if err := c.Schedule.adjust(configMetaData.Child("schedule"), reloading); err != nil {
return err
}
if err := c.Replication.adjust(configMetaData.Child("replication")); err != nil {
Expand Down Expand Up @@ -654,6 +654,8 @@ type ScheduleConfig struct {
// HighSpaceRatio is the highest usage ratio of store which regraded as high space.
// High space means there is a lot of spare capacity, and store region score varies directly with used size.
HighSpaceRatio float64 `toml:"high-space-ratio" json:"high-space-ratio"`
// RegionScoreFormulaVersion is used to control the formula used to calculate region score.
RegionScoreFormulaVersion string `toml:"region-score-formula-version" json:"region-score-formula-version"`
// SchedulerMaxWaitingOperator is the max coexist operators for each scheduler.
SchedulerMaxWaitingOperator uint64 `toml:"scheduler-max-waiting-operator" json:"scheduler-max-waiting-operator"`
// WARN: DisableLearner is deprecated.
Expand Down Expand Up @@ -728,22 +730,23 @@ func (c *ScheduleConfig) Clone() *ScheduleConfig {
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 2048
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.7
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 2048
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 4
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.7
defaultRegionScoreFormulaVersion = "v2"
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defaultHotRegionCacheHitsThreshold = 3
Expand All @@ -754,7 +757,7 @@ const (
defaultEnableCrossTableMerge = true
)

func (c *ScheduleConfig) adjust(meta *configMetaData) error {
func (c *ScheduleConfig) adjust(meta *configMetaData, reloading bool) error {
if !meta.IsDefined("max-snapshot-count") {
adjustUint64(&c.MaxSnapshotCount, defaultMaxSnapshotCount)
}
Expand Down Expand Up @@ -809,6 +812,11 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)

// new cluster:v2, old cluster:v1
if !meta.IsDefined("region-score-formula-version") && !reloading {
adjustString(&c.RegionScoreFormulaVersion, defaultRegionScoreFormulaVersion)
}

adjustSchedulers(&c.Schedulers, DefaultSchedulers)

for k, b := range c.migrateConfigurationMap() {
Expand Down
56 changes: 38 additions & 18 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *testConfigSuite) TestTLS(c *C) {
func (s *testConfigSuite) TestBadFormatJoinAddr(c *C) {
cfg := NewConfig()
cfg.Join = "127.0.0.1:2379" // Wrong join addr without scheme.
c.Assert(cfg.Adjust(nil), NotNil)
c.Assert(cfg.Adjust(nil, false), NotNil)
}

func (s *testConfigSuite) TestReloadConfig(c *C) {
Expand Down Expand Up @@ -114,9 +114,29 @@ func (s *testConfigSuite) TestReloadUpgrade(c *C) {
c.Assert(newOpt.GetPDServerConfig().KeyType, Equals, defaultKeyType) // should be set to default value.
}

func (s *testConfigSuite) TestReloadUpgrade2(c *C) {
opt, err := newTestScheduleOption()
c.Assert(err, IsNil)

// Simulate an old configuration that does not contain ScheduleConfig.
type OldConfig struct {
Replication ReplicationConfig `toml:"replication" json:"replication"`
}
old := &OldConfig{
Replication: *opt.GetReplicationConfig(),
}
storage := core.NewStorage(kv.NewMemoryKV())
c.Assert(storage.SaveConfig(old), IsNil)

newOpt, err := newTestScheduleOption()
c.Assert(err, IsNil)
c.Assert(newOpt.Reload(storage), IsNil)
c.Assert(newOpt.GetScheduleConfig().RegionScoreFormulaVersion, Equals, "") // formulaVersion keep old value when reloading.
}

func (s *testConfigSuite) TestValidation(c *C) {
cfg := NewConfig()
c.Assert(cfg.Adjust(nil), IsNil)
c.Assert(cfg.Adjust(nil, false), IsNil)

cfg.Log.File.Filename = path.Join(cfg.DataDir, "test")
c.Assert(cfg.Validate(), NotNil)
Expand Down Expand Up @@ -154,7 +174,7 @@ leader-schedule-limit = 0
cfg := NewConfig()
meta, err := toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

// When invalid, use default values.
Expand Down Expand Up @@ -185,7 +205,7 @@ type = "random-merge"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(strings.Contains(cfg.WarningMsgs[0], "Config contains undefined item"), IsTrue)

Expand All @@ -200,7 +220,7 @@ type = "random-merge-schedulers"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, NotNil)

// Check correct schedulers name
Expand All @@ -214,7 +234,7 @@ type = "random-merge"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

cfgData = `
Expand All @@ -225,7 +245,7 @@ address = "localhost:9090"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

c.Assert(cfg.Metric.PushInterval.Duration, Equals, 35*time.Second)
Expand All @@ -238,7 +258,7 @@ tso-update-physical-interval = "10ms"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

c.Assert(cfg.TSOUpdatePhysicalInterval.Duration, Equals, minTSOUpdatePhysicalInterval)
Expand All @@ -249,7 +269,7 @@ tso-update-physical-interval = "15s"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

c.Assert(cfg.TSOUpdatePhysicalInterval.Duration, Equals, maxTSOUpdatePhysicalInterval)
Expand All @@ -260,7 +280,7 @@ func (s *testConfigSuite) TestMigrateFlags(c *C) {
cfg := NewConfig()
meta, err := toml.Decode(s, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
return cfg, err
}
cfg, err := load(`
Expand Down Expand Up @@ -290,7 +310,7 @@ disable-make-up-replica = false

func newTestScheduleOption() (*PersistOptions, error) {
cfg := NewConfig()
if err := cfg.Adjust(nil); err != nil {
if err := cfg.Adjust(nil, false); err != nil {
return nil, err
}
opt := NewPersistOptions(cfg)
Expand Down Expand Up @@ -354,7 +374,7 @@ dashboard-address = "foo"
cfg := NewConfig()
meta, err := toml.Decode(t.cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err != nil, Equals, t.hasErr)
if !t.hasErr {
c.Assert(cfg.PDServerCfg.DashboardAddress, Equals, t.dashboardAddress)
Expand All @@ -372,7 +392,7 @@ tidb-cert-path = "/path/client.pem"
cfg := NewConfig()
meta, err := toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(cfg.Dashboard.TiDBCAPath, Equals, "/path/ca.pem")
c.Assert(cfg.Dashboard.TiDBKeyPath, Equals, "/path/client-key.pem")
Expand All @@ -393,7 +413,7 @@ tidb-cert-path = "/path/client.pem"
cfg = NewConfig()
meta, err = toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(cfg.Dashboard.EnableTelemetry, Equals, test.EnableTelemetry)
}
Expand All @@ -415,7 +435,7 @@ wait-store-timeout = "120s"
cfg := NewConfig()
meta, err := toml.Decode(cfgData, &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)

c.Assert(cfg.ReplicationMode.ReplicationMode, Equals, "dr-auto-sync")
Expand All @@ -430,20 +450,20 @@ wait-store-timeout = "120s"
cfg = NewConfig()
meta, err = toml.Decode("", &cfg)
c.Assert(err, IsNil)
err = cfg.Adjust(&meta)
err = cfg.Adjust(&meta, false)
c.Assert(err, IsNil)
c.Assert(cfg.ReplicationMode.ReplicationMode, Equals, "majority")
}

func (s *testConfigSuite) TestConfigClone(c *C) {
cfg := &Config{}
cfg.Adjust(nil)
cfg.Adjust(nil, false)
c.Assert(cfg.Clone(), DeepEquals, cfg)

emptyConfigMetaData := newConfigMetadata(nil)

schedule := &ScheduleConfig{}
schedule.adjust(emptyConfigMetaData)
schedule.adjust(emptyConfigMetaData, false)
c.Assert(schedule.Clone(), DeepEquals, schedule)

replication := &ReplicationConfig{}
Expand Down
7 changes: 6 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,11 @@ func (o *PersistOptions) GetHighSpaceRatio() float64 {
return o.GetScheduleConfig().HighSpaceRatio
}

// GetRegionScoreFormulaVersion returns the formula version config.
func (o *PersistOptions) GetRegionScoreFormulaVersion() string {
return o.GetScheduleConfig().RegionScoreFormulaVersion
}

// GetSchedulerMaxWaitingOperator returns the number of the max waiting operators.
func (o *PersistOptions) GetSchedulerMaxWaitingOperator() uint64 {
return o.getTTLUintOr(schedulerMaxWaitingOperatorKey, o.GetScheduleConfig().SchedulerMaxWaitingOperator)
Expand Down Expand Up @@ -563,7 +568,7 @@ func (o *PersistOptions) Persist(storage *core.Storage) error {
func (o *PersistOptions) Reload(storage *core.Storage) error {
cfg := &Config{}
// pass nil to initialize cfg to default values (all items undefined)
cfg.Adjust(nil)
cfg.Adjust(nil, true)

isExist, err := storage.LoadConfig(cfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion server/config/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ func NewTestOptions() *PersistOptions {
RegisterScheduler(d.Type)
}
c := NewConfig()
c.Adjust(nil)
c.Adjust(nil, false)
return NewPersistOptions(c)
}
Loading