Skip to content

Commit

Permalink
schedule: fix split-merge-interval update (#8405)
Browse files Browse the repository at this point in the history
close #8404

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
lhy1024 and ti-chi-bot[bot] committed Jul 17, 2024
1 parent 5ec6af4 commit 1ad446e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 2 deletions.
9 changes: 7 additions & 2 deletions pkg/schedule/checker/merge_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ const (
mergeOptionValueDeny = "deny"
)

var gcInterval = time.Minute

// MergeChecker ensures region to merge with adjacent region when size is small
type MergeChecker struct {
PauseController
Expand All @@ -57,7 +59,7 @@ type MergeChecker struct {

// NewMergeChecker creates a merge checker.
func NewMergeChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *MergeChecker {
splitCache := cache.NewIDTTL(ctx, time.Minute, conf.GetSplitMergeInterval())
splitCache := cache.NewIDTTL(ctx, gcInterval, conf.GetSplitMergeInterval())
return &MergeChecker{
cluster: cluster,
conf: conf,
Expand Down Expand Up @@ -88,13 +90,16 @@ func (m *MergeChecker) Check(region *core.RegionInfo) []*operator.Operator {
return nil
}

// update the split cache.
// It must be called before the following merge checker logic.
m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval())

expireTime := m.startTime.Add(m.conf.GetSplitMergeInterval())
if time.Now().Before(expireTime) {
mergeCheckerRecentlyStartCounter.Inc()
return nil
}

m.splitCache.UpdateTTL(m.conf.GetSplitMergeInterval())
if m.splitCache.Exists(region.GetID()) {
mergeCheckerRecentlySplitCounter.Inc()
return nil
Expand Down
16 changes: 16 additions & 0 deletions pkg/schedule/checker/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestMergeCheckerTestSuite(t *testing.T) {

func (suite *mergeCheckerTestSuite) SetupTest() {
cfg := mockconfig.NewTestOptions()
gcInterval = 100 * time.Millisecond
suite.ctx, suite.cancel = context.WithCancel(context.Background())
suite.cluster = mockcluster.NewCluster(suite.ctx, cfg)
suite.cluster.SetMaxMergeRegionSize(2)
Expand Down Expand Up @@ -84,6 +85,7 @@ func (suite *mergeCheckerTestSuite) SetupTest() {
}

func (suite *mergeCheckerTestSuite) TearDownTest() {
gcInterval = time.Minute
suite.cancel()
}

Expand Down Expand Up @@ -234,6 +236,7 @@ func (suite *mergeCheckerTestSuite) TestBasic() {
ops = suite.mc.Check(suite.regions[3])
re.Nil(ops)

// issue #4616
suite.cluster.SetSplitMergeInterval(500 * time.Millisecond)
ops = suite.mc.Check(suite.regions[2])
re.Nil(ops)
Expand All @@ -245,6 +248,19 @@ func (suite *mergeCheckerTestSuite) TestBasic() {
re.NotNil(ops)
ops = suite.mc.Check(suite.regions[3])
re.NotNil(ops)

// issue #8405
suite.mc.startTime = time.Now()
suite.cluster.SetSplitMergeInterval(time.Second)
suite.cluster.SetSplitMergeInterval(time.Hour)
suite.mc.RecordRegionSplit([]uint64{suite.regions[2].GetID()})
suite.cluster.SetSplitMergeInterval(time.Second)
suite.mc.Check(suite.regions[2]) // trigger the config update
time.Sleep(time.Second) // wait for the cache to gc
ops = suite.mc.Check(suite.regions[2])
re.NotNil(ops)
ops = suite.mc.Check(suite.regions[3])
re.NotNil(ops)
}

func (suite *mergeCheckerTestSuite) TestMatchPeers() {
Expand Down

0 comments on commit 1ad446e

Please sign in to comment.