Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: fix rule sync when meet "no rule left" and concurrency #7481

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) {
c.String(http.StatusBadRequest, err.Error())
return
}
c.String(http.StatusOK, state)
c.IndentedJSON(http.StatusOK, state)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this modification? Do we need to modify the corresponding API interface for the non-API mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid failure in JSON unmarshalling and maintain consistency with PD mode.

}
184 changes: 120 additions & 64 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"
Expand All @@ -36,6 +37,10 @@
cancel context.CancelFunc
wg sync.WaitGroup

// ruleCommonPathPrefix:
// - Key: /pd/{cluster_id}/rule
// - Value: placement.Rule or placement.RuleGroup
ruleCommonPathPrefix string
// rulesPathPrefix:
// - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id}
// - Value: placement.Rule
Expand All @@ -60,8 +65,18 @@
regionLabeler *labeler.RegionLabeler

ruleWatcher *etcdutil.LoopWatcher
rleungx marked this conversation as resolved.
Show resolved Hide resolved
groupWatcher *etcdutil.LoopWatcher
labelWatcher *etcdutil.LoopWatcher

// pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion.
// If a rule or rule group cannot be deleted immediately due to the absence of rules,
// it will be held here and removed later when a new rule or rule group put event allows for its deletion.
pendingDeletion struct {
syncutil.RWMutex
// key: path, value: [groupID, ruleID]
// The map 'kvs' holds the rules or rule groups that are pending deletion.
// If a rule group needs to be deleted, the ruleID will be an empty string.
kvs map[string][2]string
}
}

// NewWatcher creates a new watcher to watch the Placement Rule change from PD API server.
Expand All @@ -79,22 +94,25 @@
ctx: ctx,
cancel: cancel,
rulesPathPrefix: endpoint.RulesPathPrefix(clusterID),
ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID),
ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID),
regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID),
etcdClient: etcdClient,
ruleStorage: ruleStorage,
checkerController: checkerController,
ruleManager: ruleManager,
regionLabeler: regionLabeler,
pendingDeletion: struct {
syncutil.RWMutex
kvs map[string][2]string
}{
kvs: make(map[string][2]string),
},
}
err := rw.initializeRuleWatcher()
if err != nil {
return nil, err
}
err = rw.initializeGroupWatcher()
if err != nil {
return nil, err
}
err = rw.initializeRegionLabelWatcher()
if err != nil {
return nil, err
Expand All @@ -103,85 +121,87 @@
}

func (rw *Watcher) initializeRuleWatcher() error {
prefixToTrim := rw.rulesPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
err := func() error {
if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) {
log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
rule, err := placement.NewRuleFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 130 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L130

Added line #L130 was not covered by tests
}
// Update the suspect key ranges in the checker.
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil {
rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey)
}
return rw.ruleManager.SetRule(rule)
} else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err

Check warning on line 142 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L142

Added line #L142 was not covered by tests
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
} else {
log.Warn("unknown key when update placement rule", zap.String("key", string(kv.Key)))
return nil

Check warning on line 151 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L149-L151

Added lines #L149 - L151 were not covered by tests
}
}()
if err == nil && rw.hasPendingDeletion() {
rw.tryFinishPendingDeletion()
}
return rw.ruleManager.SetRule(rule)
return err
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim))
if err != nil {
return err
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return err
groupID, ruleID, err := func() (string, string, error) {
if strings.HasPrefix(string(kv.Key), rw.rulesPathPrefix) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("delete placement rule", zap.String("key", key))
ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/"))
if err != nil {
return "", "", err

Check warning on line 166 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L166

Added line #L166 was not covered by tests
}
rule, err := placement.NewRuleFromJSON([]byte(ruleJSON))
if err != nil {
return "", "", err

Check warning on line 170 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L170

Added line #L170 was not covered by tests
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
} else if strings.HasPrefix(string(kv.Key), rw.ruleGroupPathPrefix) {
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/")
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey)
} else {
log.Warn("unknown key when delete placement rule", zap.String("key", string(kv.Key)))
lhy1024 marked this conversation as resolved.
Show resolved Hide resolved
return "", "", nil

Check warning on line 183 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L181-L183

Added lines #L181 - L183 were not covered by tests
}
}()
if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" {
rw.addPendingDeletion(key, groupID, ruleID)
}
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID)
return nil
}
postEventFn := func() error {
return nil
}
rw.ruleWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-watcher", rw.rulesPathPrefix,
"scheduling-rule-watcher", rw.ruleCommonPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.ruleWatcher.StartWatchLoop()
return rw.ruleWatcher.WaitLoad()
}

func (rw *Watcher) initializeGroupWatcher() error {
prefixToTrim := rw.ruleGroupPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value)))
ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value)
if err != nil {
return err
}
// Add all rule key ranges within the group to the suspect key ranges.
for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.SetRuleGroup(ruleGroup)
}
deleteFn := func(kv *mvccpb.KeyValue) error {
key := string(kv.Key)
log.Info("delete placement rule group", zap.String("key", key))
trimmedKey := strings.TrimPrefix(key, prefixToTrim)
for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) {
rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey)
}
return rw.ruleManager.DeleteRuleGroup(trimmedKey)
}
postEventFn := func() error {
return nil
}
rw.groupWatcher = etcdutil.NewLoopWatcher(
rw.ctx, &rw.wg,
rw.etcdClient,
"scheduling-rule-group-watcher", rw.ruleGroupPathPrefix,
putFn, deleteFn, postEventFn,
clientv3.WithPrefix(),
)
rw.groupWatcher.StartWatchLoop()
return rw.groupWatcher.WaitLoad()
}

func (rw *Watcher) initializeRegionLabelWatcher() error {
prefixToTrim := rw.regionLabelPathPrefix + "/"
putFn := func(kv *mvccpb.KeyValue) error {
Expand Down Expand Up @@ -216,3 +236,39 @@
rw.cancel()
rw.wg.Wait()
}

func (rw *Watcher) hasPendingDeletion() bool {
rw.pendingDeletion.RLock()
defer rw.pendingDeletion.RUnlock()
return len(rw.pendingDeletion.kvs) > 0
}

func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) {
rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
rw.pendingDeletion.kvs[path] = [2]string{groupID, ruleID}
}

func (rw *Watcher) tryFinishPendingDeletion() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried about if the put and delete can be disordered. If so, the newly added rule might be deleted unexpectedly.

rw.pendingDeletion.Lock()
defer rw.pendingDeletion.Unlock()
originLen := len(rw.pendingDeletion.kvs)
for k, v := range rw.pendingDeletion.kvs {
groupID, ruleID := v[0], v[1]
var err error
if ruleID == "" {
err = rw.ruleManager.DeleteRuleGroup(groupID)

Check warning on line 260 in pkg/mcs/scheduling/server/rule/watcher.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/rule/watcher.go#L260

Added line #L260 was not covered by tests
} else {
err = rw.ruleManager.DeleteRule(groupID, ruleID)
}
if err == nil {
delete(rw.pendingDeletion.kvs, k)
}
}
// If the length of the map is changed, it means that some rules or rule groups have been deleted.
// We need to force load the rules and rule groups to make sure sync with etcd.
if len(rw.pendingDeletion.kvs) != originLen {
rw.ruleWatcher.ForceLoad()
log.Info("force load rules", zap.Int("pending deletion", len(rw.pendingDeletion.kvs)), zap.Int("origin", originLen))
}
}
8 changes: 4 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
if c.isDownPeer(region, peer) {
if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) {
ruleCheckerReplaceDownCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus)
}
// When witness placement rule is enabled, promotes the witness to voter when region has down voter.
if c.isWitnessEnabled() && core.IsVoter(peer) {
Expand All @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region
}
if c.isOfflinePeer(peer) {
ruleCheckerReplaceOfflineCounter.Inc()
return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus)
return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus)
}
}
// fix loose matched peers.
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
continue
}
ruleCheckerNoStoreThenTryReplace.Inc()
op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit")
if err != nil {
return nil, err
}
Expand All @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
}

// The peer's store may in Offline or Down, need to be replace.
func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) {
var fastFailover bool
// If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover
if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
serviceMiddlewarePath = "service_middleware"
schedulePath = "schedule"
gcPath = "gc"
ruleCommonPath = "rule"
rulesPath = "rules"
ruleGroupPath = "rule_group"
regionLabelPath = "region_label"
Expand Down Expand Up @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), rulesPath)
}

// RuleCommonPathPrefix returns the path prefix to save the placement rule common config.
func RuleCommonPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleCommonPath)
}

// RuleGroupPathPrefix returns the path prefix to save the placement rule groups.
func RuleGroupPathPrefix(clusterID uint64) string {
return path.Join(PDRootPath(clusterID), ruleGroupPath)
Expand Down
46 changes: 42 additions & 4 deletions tests/server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,11 @@ func (suite *regionTestSuite) TestCheckRegionsReplicated() {
func(conf *config.Config, serverName string) {
conf.Replication.EnablePlacementRules = true
})
env.RunTestInPDMode(suite.checkRegionsReplicated)
env.RunTestInTwoModes(suite.checkRegionsReplicated)
}

func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) {
suite.pauseRuleChecker(cluster)
leader := cluster.GetLeaderServer()
urlPrefix := leader.GetAddr() + "/pd/api/v1"
re := suite.Require()
Expand Down Expand Up @@ -271,6 +272,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster)
err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re))
suite.NoError(err)

tu.Eventually(re, func() bool {
respBundle := make([]placement.GroupBundle, 0)
err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil,
tu.StatusOK(re), tu.ExtractJSON(re, &respBundle))
suite.NoError(err)
return len(respBundle) == 1 && respBundle[0].ID == "5"
})

tu.Eventually(re, func() bool {
err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
Expand Down Expand Up @@ -314,9 +323,24 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster)
err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re))
suite.NoError(err)

err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
suite.Equal("INPROGRESS", status)
tu.Eventually(re, func() bool {
respBundle := make([]placement.GroupBundle, 0)
err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil,
tu.StatusOK(re), tu.ExtractJSON(re, &respBundle))
suite.NoError(err)
if len(respBundle) != 2 {
return false
}
s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6"
s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5"
return s1 || s2
})

tu.Eventually(re, func() bool {
err = tu.ReadGetJSON(re, testDialClient, url, &status)
suite.NoError(err)
return status == "INPROGRESS"
})

r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b"))
r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1})
Expand All @@ -338,3 +362,17 @@ func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count
})
}
}

// pauseRuleChecker will pause rule checker to avoid unexpected operator.
func (suite *regionTestSuite) pauseRuleChecker(cluster *tests.TestCluster) {
re := suite.Require()
checkerName := "rule"
addr := cluster.GetLeaderServer().GetAddr()
resp := make(map[string]interface{})
url := fmt.Sprintf("%s/pd/api/v1/checker/%s", addr, checkerName)
err := tu.CheckPostJSON(testDialClient, url, []byte(`{"delay":1000}`), tu.StatusOK(re))
re.NoError(err)
err = tu.ReadGetJSON(re, testDialClient, url, &resp)
re.NoError(err)
re.True(resp["paused"].(bool))
}
Loading
Loading