diff --git a/pkg/mcs/scheduling/server/apis/v1/api.go b/pkg/mcs/scheduling/server/apis/v1/api.go index b59780b7a61..e6881f2f85c 100644 --- a/pkg/mcs/scheduling/server/apis/v1/api.go +++ b/pkg/mcs/scheduling/server/apis/v1/api.go @@ -1330,5 +1330,5 @@ func checkRegionsReplicated(c *gin.Context) { c.String(http.StatusBadRequest, err.Error()) return } - c.String(http.StatusOK, state) + c.IndentedJSON(http.StatusOK, state) } diff --git a/pkg/mcs/scheduling/server/config/watcher.go b/pkg/mcs/scheduling/server/config/watcher.go index 4ded93ceb1b..c97306d50ba 100644 --- a/pkg/mcs/scheduling/server/config/watcher.go +++ b/pkg/mcs/scheduling/server/config/watcher.go @@ -154,7 +154,7 @@ func (cw *Watcher) initializeConfigWatcher() error { func (cw *Watcher) initializeTTLConfigWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") value := string(kv.Value) leaseID := kv.Lease resp, err := cw.etcdClient.TimeToLive(cw.ctx, clientv3.LeaseID(leaseID)) @@ -166,7 +166,7 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key)[len(sc.TTLConfigPrefix)+1:] + key := strings.TrimPrefix(string(kv.Key), sc.TTLConfigPrefix+"/") cw.ttl.PutWithTTL(key, nil, 0) return nil } @@ -186,13 +186,14 @@ func (cw *Watcher) initializeTTLConfigWatcher() error { func (cw *Watcher) initializeSchedulerConfigWatcher() error { prefixToTrim := cw.schedulerConfigPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - name := strings.TrimPrefix(string(kv.Key), prefixToTrim) + key := string(kv.Key) + name := strings.TrimPrefix(key, prefixToTrim) log.Info("update scheduler config", zap.String("name", name), zap.String("value", string(kv.Value))) err := cw.storage.SaveSchedulerConfig(name, kv.Value) if err != nil { log.Warn("failed to save scheduler config", - zap.String("event-kv-key", string(kv.Key)), + zap.String("event-kv-key", key), zap.String("trimmed-key", name), zap.Error(err)) return err @@ -204,9 +205,10 @@ func (cw *Watcher) initializeSchedulerConfigWatcher() error { return nil } deleteFn := func(kv *mvccpb.KeyValue) error { - log.Info("remove scheduler config", zap.String("key", string(kv.Key))) + key := string(kv.Key) + log.Info("remove scheduler config", zap.String("key", key)) return cw.storage.RemoveSchedulerConfig( - strings.TrimPrefix(string(kv.Key), prefixToTrim), + strings.TrimPrefix(key, prefixToTrim), ) } postEventFn := func() error { diff --git a/pkg/mcs/scheduling/server/meta/watcher.go b/pkg/mcs/scheduling/server/meta/watcher.go index 6fae537eab9..7832c862f3c 100644 --- a/pkg/mcs/scheduling/server/meta/watcher.go +++ b/pkg/mcs/scheduling/server/meta/watcher.go @@ -73,9 +73,10 @@ func NewWatcher( func (w *Watcher) initializeStoreWatcher() error { putFn := func(kv *mvccpb.KeyValue) error { store := &metapb.Store{} + key := string(kv.Key) if err := proto.Unmarshal(kv.Value, store); err != nil { log.Warn("failed to unmarshal store entry", - zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + zap.String("event-kv-key", key), zap.Error(err)) return err } origin := w.basicCluster.GetStore(store.GetId()) diff --git a/pkg/mcs/scheduling/server/rule/watcher.go b/pkg/mcs/scheduling/server/rule/watcher.go index 912fb9c01e5..8ccfd02e07c 100644 --- a/pkg/mcs/scheduling/server/rule/watcher.go +++ b/pkg/mcs/scheduling/server/rule/watcher.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/schedule/placement" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/syncutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -36,6 +37,10 @@ type Watcher struct { cancel context.CancelFunc wg sync.WaitGroup + // ruleCommonPathPrefix: + // - Key: /pd/{cluster_id}/rule + // - Value: placement.Rule or placement.RuleGroup + ruleCommonPathPrefix string // rulesPathPrefix: // - Key: /pd/{cluster_id}/rules/{group_id}-{rule_id} // - Value: placement.Rule @@ -60,8 +65,18 @@ type Watcher struct { regionLabeler *labeler.RegionLabeler ruleWatcher *etcdutil.LoopWatcher - groupWatcher *etcdutil.LoopWatcher labelWatcher *etcdutil.LoopWatcher + + // pendingDeletion is a structure used to track the rules or rule groups that are marked for deletion. + // If a rule or rule group cannot be deleted immediately due to the absence of rules, + // it will be held here and removed later when a new rule or rule group put event allows for its deletion. + pendingDeletion struct { + syncutil.RWMutex + // key: path, value: [groupID, ruleID] + // The map 'kvs' holds the rules or rule groups that are pending deletion. + // If a rule group needs to be deleted, the ruleID will be an empty string. + kvs map[string][2]string + } } // NewWatcher creates a new watcher to watch the Placement Rule change from PD API server. @@ -79,6 +94,7 @@ func NewWatcher( ctx: ctx, cancel: cancel, rulesPathPrefix: endpoint.RulesPathPrefix(clusterID), + ruleCommonPathPrefix: endpoint.RuleCommonPathPrefix(clusterID), ruleGroupPathPrefix: endpoint.RuleGroupPathPrefix(clusterID), regionLabelPathPrefix: endpoint.RegionLabelPathPrefix(clusterID), etcdClient: etcdClient, @@ -86,15 +102,17 @@ func NewWatcher( checkerController: checkerController, ruleManager: ruleManager, regionLabeler: regionLabeler, + pendingDeletion: struct { + syncutil.RWMutex + kvs map[string][2]string + }{ + kvs: make(map[string][2]string), + }, } err := rw.initializeRuleWatcher() if err != nil { return nil, err } - err = rw.initializeGroupWatcher() - if err != nil { - return nil, err - } err = rw.initializeRegionLabelWatcher() if err != nil { return nil, err @@ -103,33 +121,73 @@ func NewWatcher( } func (rw *Watcher) initializeRuleWatcher() error { - prefixToTrim := rw.rulesPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - rule, err := placement.NewRuleFromJSON(kv.Value) - if err != nil { - return err - } - // Update the suspect key ranges in the checker. - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { - rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + err := func() error { + key := string(kv.Key) + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("update placement rule", zap.String("key", key), zap.String("value", string(kv.Value))) + rule, err := placement.NewRuleFromJSON(kv.Value) + if err != nil { + return err + } + // Update the suspect key ranges in the checker. + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + if oldRule := rw.ruleManager.GetRule(rule.GroupID, rule.ID); oldRule != nil { + rw.checkerController.AddSuspectKeyRange(oldRule.StartKey, oldRule.EndKey) + } + return rw.ruleManager.SetRule(rule) + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("update placement rule group", zap.String("key", key), zap.String("value", string(kv.Value))) + ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) + if err != nil { + return err + } + // Add all rule key ranges within the group to the suspect key ranges. + for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return rw.ruleManager.SetRuleGroup(ruleGroup) + } else { + log.Warn("unknown key when update placement rule", zap.String("key", key)) + return nil + } + }() + if err == nil && rw.hasPendingDeletion() { + rw.tryFinishPendingDeletion() } - return rw.ruleManager.SetRule(rule) + return err } deleteFn := func(kv *mvccpb.KeyValue) error { key := string(kv.Key) - log.Info("delete placement rule", zap.String("key", key)) - ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, prefixToTrim)) - if err != nil { - return err - } - rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) - if err != nil { - return err + groupID, ruleID, err := func() (string, string, error) { + if strings.HasPrefix(key, rw.rulesPathPrefix) { + log.Info("delete placement rule", zap.String("key", key)) + ruleJSON, err := rw.ruleStorage.LoadRule(strings.TrimPrefix(key, rw.rulesPathPrefix+"/")) + if err != nil { + return "", "", err + } + rule, err := placement.NewRuleFromJSON([]byte(ruleJSON)) + if err != nil { + return "", "", err + } + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + return rule.GroupID, rule.ID, rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + } else if strings.HasPrefix(key, rw.ruleGroupPathPrefix) { + log.Info("delete placement rule group", zap.String("key", key)) + trimmedKey := strings.TrimPrefix(key, rw.ruleGroupPathPrefix+"/") + for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { + rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) + } + return trimmedKey, "", rw.ruleManager.DeleteRuleGroup(trimmedKey) + } else { + log.Warn("unknown key when delete placement rule", zap.String("key", key)) + return "", "", nil + } + }() + if err != nil && strings.Contains(err.Error(), "no rule left") && groupID != "" { + rw.addPendingDeletion(key, groupID, ruleID) } - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - return rw.ruleManager.DeleteRule(rule.GroupID, rule.ID) + return err } postEventFn := func() error { return nil @@ -137,7 +195,7 @@ func (rw *Watcher) initializeRuleWatcher() error { rw.ruleWatcher = etcdutil.NewLoopWatcher( rw.ctx, &rw.wg, rw.etcdClient, - "scheduling-rule-watcher", rw.rulesPathPrefix, + "scheduling-rule-watcher", rw.ruleCommonPathPrefix, putFn, deleteFn, postEventFn, clientv3.WithPrefix(), ) @@ -145,47 +203,11 @@ func (rw *Watcher) initializeRuleWatcher() error { return rw.ruleWatcher.WaitLoad() } -func (rw *Watcher) initializeGroupWatcher() error { - prefixToTrim := rw.ruleGroupPathPrefix + "/" - putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update placement rule group", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) - ruleGroup, err := placement.NewRuleGroupFromJSON(kv.Value) - if err != nil { - return err - } - // Add all rule key ranges within the group to the suspect key ranges. - for _, rule := range rw.ruleManager.GetRulesByGroup(ruleGroup.ID) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.SetRuleGroup(ruleGroup) - } - deleteFn := func(kv *mvccpb.KeyValue) error { - key := string(kv.Key) - log.Info("delete placement rule group", zap.String("key", key)) - trimmedKey := strings.TrimPrefix(key, prefixToTrim) - for _, rule := range rw.ruleManager.GetRulesByGroup(trimmedKey) { - rw.checkerController.AddSuspectKeyRange(rule.StartKey, rule.EndKey) - } - return rw.ruleManager.DeleteRuleGroup(trimmedKey) - } - postEventFn := func() error { - return nil - } - rw.groupWatcher = etcdutil.NewLoopWatcher( - rw.ctx, &rw.wg, - rw.etcdClient, - "scheduling-rule-group-watcher", rw.ruleGroupPathPrefix, - putFn, deleteFn, postEventFn, - clientv3.WithPrefix(), - ) - rw.groupWatcher.StartWatchLoop() - return rw.groupWatcher.WaitLoad() -} - func (rw *Watcher) initializeRegionLabelWatcher() error { prefixToTrim := rw.regionLabelPathPrefix + "/" putFn := func(kv *mvccpb.KeyValue) error { - log.Info("update region label rule", zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + key := string(kv.Key) + log.Info("update region label rule", zap.String("key", key), zap.String("value", string(kv.Value))) rule, err := labeler.NewLabelRuleFromJSON(kv.Value) if err != nil { return err @@ -216,3 +238,37 @@ func (rw *Watcher) Close() { rw.cancel() rw.wg.Wait() } + +func (rw *Watcher) hasPendingDeletion() bool { + rw.pendingDeletion.RLock() + defer rw.pendingDeletion.RUnlock() + return len(rw.pendingDeletion.kvs) > 0 +} + +func (rw *Watcher) addPendingDeletion(path, groupID, ruleID string) { + rw.pendingDeletion.Lock() + defer rw.pendingDeletion.Unlock() + rw.pendingDeletion.kvs[path] = [2]string{groupID, ruleID} +} + +func (rw *Watcher) tryFinishPendingDeletion() { + rw.pendingDeletion.Lock() + defer rw.pendingDeletion.Unlock() + previousLen := len(rw.pendingDeletion.kvs) + for k, v := range rw.pendingDeletion.kvs { + groupID, ruleID := v[0], v[1] + var err error + if ruleID == "" { + err = rw.ruleManager.DeleteRuleGroup(groupID) + } else { + err = rw.ruleManager.DeleteRule(groupID, ruleID) + } + if err == nil { + delete(rw.pendingDeletion.kvs, k) + } + } + // TODO: If the length of the map is changed, it means that some rules or rule groups have been deleted. + // We need to compare the rules and rule groups to make sure sync with etcd, + // rather than just force load all the rules and rule groups. + log.Info("clean pending deletion", zap.Int("current", len(rw.pendingDeletion.kvs)), zap.Int("previous", previousLen)) +} diff --git a/pkg/schedule/checker/rule_checker.go b/pkg/schedule/checker/rule_checker.go index 553ece09e65..95cc77ade5d 100644 --- a/pkg/schedule/checker/rule_checker.go +++ b/pkg/schedule/checker/rule_checker.go @@ -199,7 +199,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region if c.isDownPeer(region, peer) { if c.isStoreDownTimeHitMaxDownTime(peer.GetStoreId()) { ruleCheckerReplaceDownCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, downStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, downStatus) } // When witness placement rule is enabled, promotes the witness to voter when region has down voter. if c.isWitnessEnabled() && core.IsVoter(peer) { @@ -211,7 +211,7 @@ func (c *RuleChecker) fixRulePeer(region *core.RegionInfo, fit *placement.Region } if c.isOfflinePeer(peer) { ruleCheckerReplaceOfflineCounter.Inc() - return c.replaceUnexpectRulePeer(region, rf, fit, peer, offlineStatus) + return c.replaceUnexpectedRulePeer(region, rf, fit, peer, offlineStatus) } } // fix loose matched peers. @@ -246,7 +246,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region continue } ruleCheckerNoStoreThenTryReplace.Inc() - op, err := c.replaceUnexpectRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") + op, err := c.replaceUnexpectedRulePeer(region, oldPeerRuleFit, fit, p, "swap-fit") if err != nil { return nil, err } @@ -267,7 +267,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region } // The peer's store may in Offline or Down, need to be replace. -func (c *RuleChecker) replaceUnexpectRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { +func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *placement.RuleFit, fit *placement.RegionFit, peer *metapb.Peer, status string) (*operator.Operator, error) { var fastFailover bool // If the store to which the original peer belongs is TiFlash, the new peer cannot be set to witness, nor can it perform fast failover if c.isWitnessEnabled() && !c.cluster.GetStore(peer.StoreId).IsTiFlash() { diff --git a/pkg/schedule/schedulers/shuffle_region.go b/pkg/schedule/schedulers/shuffle_region.go index f1d35e80925..f9bed18d3fa 100644 --- a/pkg/schedule/schedulers/shuffle_region.go +++ b/pkg/schedule/schedulers/shuffle_region.go @@ -139,18 +139,19 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster sche.SchedulerCluste pendingFilter := filter.NewRegionPendingFilter() downFilter := filter.NewRegionDownFilter() replicaFilter := filter.NewRegionReplicatedFilter(cluster) + ranges := s.conf.GetRanges() for _, source := range candidates.Stores { var region *core.RegionInfo if s.conf.IsRoleAllow(roleFollower) { - region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandFollowerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLeader) { - region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLeaderRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region == nil && s.conf.IsRoleAllow(roleLearner) { - region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), s.conf.Ranges), nil, + region = filter.SelectOneRegion(cluster.RandLearnerRegions(source.GetID(), ranges), nil, pendingFilter, downFilter, replicaFilter) } if region != nil { diff --git a/pkg/schedule/schedulers/shuffle_region_config.go b/pkg/schedule/schedulers/shuffle_region_config.go index 7d04879c992..552d7ea8bce 100644 --- a/pkg/schedule/schedulers/shuffle_region_config.go +++ b/pkg/schedule/schedulers/shuffle_region_config.go @@ -58,7 +58,9 @@ func (conf *shuffleRegionSchedulerConfig) GetRoles() []string { func (conf *shuffleRegionSchedulerConfig) GetRanges() []core.KeyRange { conf.RLock() defer conf.RUnlock() - return conf.Ranges + ranges := make([]core.KeyRange, len(conf.Ranges)) + copy(ranges, conf.Ranges) + return ranges } func (conf *shuffleRegionSchedulerConfig) IsRoleAllow(role string) bool { diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index cac40db29c5..69b8d0f2f8e 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -31,6 +31,7 @@ const ( serviceMiddlewarePath = "service_middleware" schedulePath = "schedule" gcPath = "gc" + ruleCommonPath = "rule" rulesPath = "rules" ruleGroupPath = "rule_group" regionLabelPath = "region_label" @@ -102,6 +103,11 @@ func RulesPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), rulesPath) } +// RuleCommonPathPrefix returns the path prefix to save the placement rule common config. +func RuleCommonPathPrefix(clusterID uint64) string { + return path.Join(PDRootPath(clusterID), ruleCommonPath) +} + // RuleGroupPathPrefix returns the path prefix to save the placement rule groups. func RuleGroupPathPrefix(clusterID uint64) string { return path.Join(PDRootPath(clusterID), ruleGroupPath) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 03c2374efc6..d24c5ded98a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -818,18 +818,20 @@ func (lw *LoopWatcher) watch(ctx context.Context, revision int64) (nextRevision case clientv3.EventTypePut: if err := lw.putFn(event.Kv); err != nil { log.Error("put failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("put in watch loop", zap.String("name", lw.name), + log.Debug("put successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key), zap.ByteString("value", event.Kv.Value)) } case clientv3.EventTypeDelete: if err := lw.deleteFn(event.Kv); err != nil { log.Error("delete failed in watch loop", zap.Error(err), - zap.Int64("revision", revision), zap.String("name", lw.name), zap.String("key", lw.key)) + zap.Int64("revision", revision), zap.String("name", lw.name), + zap.String("watch-key", lw.key), zap.ByteString("event-kv-key", event.Kv.Key)) } else { - log.Debug("delete in watch loop", zap.String("name", lw.name), + log.Debug("delete successfully in watch loop", zap.String("name", lw.name), zap.ByteString("key", event.Kv.Key)) } } diff --git a/tests/server/api/region_test.go b/tests/server/api/region_test.go index 450995a6e5e..452cdb63b39 100644 --- a/tests/server/api/region_test.go +++ b/tests/server/api/region_test.go @@ -24,11 +24,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type regionTestSuite struct { @@ -248,8 +250,7 @@ func (suite *regionTestSuite) checkScatterRegions(cluster *tests.TestCluster) { } func (suite *regionTestSuite) TestCheckRegionsReplicated() { - // Fixme: after delete+set rule, the key range will be empty, so the test will fail in api mode. - suite.env.RunTestInPDMode(suite.checkRegionsReplicated) + suite.env.RunTestInTwoModes(suite.checkRegionsReplicated) } func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) { @@ -304,6 +305,14 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + return len(respBundle) == 1 && respBundle[0].ID == "5" + }) + tu.Eventually(re, func() bool { err = tu.ReadGetJSON(re, testDialClient, url, &status) suite.NoError(err) @@ -328,9 +337,20 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + log.Info("respBundle", zap.Any("respBundle", respBundle)) + return len(respBundle) == 1 && len(respBundle[0].Rules) == 2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) // test multiple bundles bundle = append(bundle, placement.GroupBundle{ @@ -347,17 +367,34 @@ func (suite *regionTestSuite) checkRegionsReplicated(cluster *tests.TestCluster) err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) suite.NoError(err) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("INPROGRESS", status) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != 2 { + return false + } + s1 := respBundle[0].ID == "5" && respBundle[1].ID == "6" + s2 := respBundle[0].ID == "6" && respBundle[1].ID == "5" + return s1 || s2 + }) + + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "INPROGRESS" + }) r1 = core.NewTestRegionInfo(2, 1, []byte("a"), []byte("b")) r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 1}, &metapb.Peer{Id: 6, StoreId: 1}, &metapb.Peer{Id: 7, StoreId: 1}) tests.MustPutRegionInfo(re, cluster, r1) - err = tu.ReadGetJSON(re, testDialClient, url, &status) - suite.NoError(err) - suite.Equal("REPLICATED", status) + tu.Eventually(re, func() bool { + err = tu.ReadGetJSON(re, testDialClient, url, &status) + suite.NoError(err) + return status == "REPLICATED" + }) } func (suite *regionTestSuite) checkRegionCount(cluster *tests.TestCluster, count uint64) { diff --git a/tests/server/api/rule_test.go b/tests/server/api/rule_test.go index 0a0c3f2fb2e..de6fb99a8d0 100644 --- a/tests/server/api/rule_test.go +++ b/tests/server/api/rule_test.go @@ -20,17 +20,23 @@ import ( "fmt" "net/http" "net/url" + "sort" + "strconv" + "sync" "testing" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/log" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/labeler" "github.com/tikv/pd/pkg/schedule/placement" + "github.com/tikv/pd/pkg/utils/syncutil" tu "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/tests" + "go.uber.org/zap" ) type ruleTestSuite struct { @@ -777,7 +783,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err := tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set b2 := placement.GroupBundle{ @@ -797,14 +803,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { var bundle placement.GroupBundle err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/foo", &bundle) suite.NoError(err) - suite.compareBundle(bundle, b2) + suite.assertBundleEqual(bundle, b2) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b2) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b2) // Delete err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/pd", tu.StatusOK(suite.Require())) @@ -814,7 +820,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b2) + suite.assertBundleEqual(bundles[0], b2) // SetAll b2.Rules = append(b2.Rules, &placement.Rule{GroupID: "foo", ID: "baz", Index: 2, Role: placement.Follower, Count: 1}) @@ -829,9 +835,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b2) - suite.compareBundle(bundles[1], b1) - suite.compareBundle(bundles[2], b3) + suite.assertBundleEqual(bundles[0], b2) + suite.assertBundleEqual(bundles[1], b1) + suite.assertBundleEqual(bundles[2], b3) // Delete using regexp err = tu.CheckDelete(testDialClient, urlPrefix+"/placement-rule/"+url.PathEscape("foo.*")+"?regexp", tu.StatusOK(suite.Require())) @@ -841,7 +847,7 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 1) - suite.compareBundle(bundles[0], b1) + suite.assertBundleEqual(bundles[0], b1) // Set id := "rule-without-group-id" @@ -862,14 +868,14 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { // Get err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule/"+id, &bundle) suite.NoError(err) - suite.compareBundle(bundle, b4) + suite.assertBundleEqual(bundle, b4) // GetAll again err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 2) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) // SetAll b5 := placement.GroupBundle{ @@ -890,9 +896,9 @@ func (suite *ruleTestSuite) checkBundle(cluster *tests.TestCluster) { err = tu.ReadGetJSON(re, testDialClient, urlPrefix+"/placement-rule", &bundles) suite.NoError(err) suite.Len(bundles, 3) - suite.compareBundle(bundles[0], b1) - suite.compareBundle(bundles[1], b4) - suite.compareBundle(bundles[2], b5) + suite.assertBundleEqual(bundles[0], b1) + suite.assertBundleEqual(bundles[1], b4) + suite.assertBundleEqual(bundles[2], b5) } func (suite *ruleTestSuite) TestBundleBadRequest() { @@ -925,20 +931,219 @@ func (suite *ruleTestSuite) checkBundleBadRequest(cluster *tests.TestCluster) { } } -func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) { - tu.Eventually(suite.Require(), func() bool { - if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { - return false - } - for i := range b1.Rules { - if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { +func (suite *ruleTestSuite) TestDeleteAndUpdate() { + suite.env.RunTestInTwoModes(suite.checkDeleteAndUpdate) +} + +func (suite *ruleTestSuite) checkDeleteAndUpdate(cluster *tests.TestCluster) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + + bundles := [][]placement.GroupBundle{ + // 1 rule group with 1 rule + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + }, + }, + }}, + // 2 rule groups with different range rules + {{ + ID: "1", + Index: 1, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 1, Role: placement.Voter, Count: 1, GroupID: "1", + StartKey: []byte("a"), EndKey: []byte("b"), + }, + }, + }, { + ID: "2", + Index: 2, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 2, Role: placement.Voter, Count: 1, GroupID: "2", + StartKey: []byte("b"), EndKey: []byte("c"), + }, + }, + }}, + // 2 rule groups with 1 rule and 2 rules + {{ + ID: "3", + Index: 3, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 3, Role: placement.Voter, Count: 1, GroupID: "3", + }, + }, + }, { + ID: "4", + Index: 4, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 4, Role: placement.Voter, Count: 1, GroupID: "4", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "4", + }, + }, + }}, + // 1 rule group with 2 rules + {{ + ID: "5", + Index: 5, + Rules: []*placement.Rule{ + { + ID: "foo", Index: 5, Role: placement.Voter, Count: 1, GroupID: "5", + }, + { + ID: "bar", Index: 6, Role: placement.Voter, Count: 1, GroupID: "5", + }, + }, + }}, + } + + for _, bundle := range bundles { + data, err := json.Marshal(bundle) + suite.NoError(err) + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err = tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + if len(respBundle) != len(bundle) { return false } - } - return true + log.Info("respBundle", zap.Any("respBundle", respBundle), zap.Any("bundle", bundle)) + sort.Slice(respBundle, func(i, j int) bool { return respBundle[i].ID < respBundle[j].ID }) + sort.Slice(bundle, func(i, j int) bool { return bundle[i].ID < bundle[j].ID }) + for i := range respBundle { + if !suite.compareBundle(respBundle[i], bundle[i]) { + return false + } + } + return true + }) + } +} + +func (suite *ruleTestSuite) TestConcurrency() { + suite.env.RunTestInTwoModes(suite.checkConcurrency) +} + +func (suite *ruleTestSuite) checkConcurrency(cluster *tests.TestCluster) { + // test concurrency of set rule group with different group id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: strconv.Itoa(i), + Index: i, + Rules: []*placement.Rule{ + { + ID: "foo", Index: i, Role: placement.Voter, Count: 1, GroupID: strconv.Itoa(i), + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == strconv.Itoa(i) + }, + ) + // test concurrency of set rule with different id + suite.checkConcurrencyWith(cluster, + func(i int) []placement.GroupBundle { + return []placement.GroupBundle{ + { + ID: "pd", + Index: 1, + Rules: []*placement.Rule{ + { + ID: strconv.Itoa(i), Index: i, Role: placement.Voter, Count: 1, GroupID: "pd", + }, + }, + }, + } + }, + func(resp []placement.GroupBundle, i int) bool { + return len(resp) == 1 && resp[0].ID == "pd" && resp[0].Rules[0].ID == strconv.Itoa(i) + }, + ) +} + +func (suite *ruleTestSuite) checkConcurrencyWith(cluster *tests.TestCluster, + genBundle func(int) []placement.GroupBundle, + checkBundle func([]placement.GroupBundle, int) bool) { + re := suite.Require() + leaderServer := cluster.GetLeaderServer() + pdAddr := leaderServer.GetAddr() + urlPrefix := fmt.Sprintf("%s%s/api/v1", pdAddr, apiPrefix) + expectResult := struct { + syncutil.RWMutex + val int + }{} + wg := sync.WaitGroup{} + + for i := 1; i <= 10; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + bundle := genBundle(i) + data, err := json.Marshal(bundle) + suite.NoError(err) + for j := 0; j < 10; j++ { + expectResult.Lock() + err = tu.CheckPostJSON(testDialClient, urlPrefix+"/config/placement-rule", data, tu.StatusOK(re)) + suite.NoError(err) + expectResult.val = i + expectResult.Unlock() + } + }(i) + } + + wg.Wait() + expectResult.RLock() + defer expectResult.RUnlock() + suite.NotZero(expectResult.val) + tu.Eventually(re, func() bool { + respBundle := make([]placement.GroupBundle, 0) + err := tu.CheckGetJSON(testDialClient, urlPrefix+"/config/placement-rule", nil, + tu.StatusOK(re), tu.ExtractJSON(re, &respBundle)) + suite.NoError(err) + suite.Len(respBundle, 1) + return checkBundle(respBundle, expectResult.val) + }) +} + +func (suite *ruleTestSuite) assertBundleEqual(b1, b2 placement.GroupBundle) { + tu.Eventually(suite.Require(), func() bool { + return suite.compareBundle(b1, b2) }) } +func (suite *ruleTestSuite) compareBundle(b1, b2 placement.GroupBundle) bool { + if b2.ID != b1.ID || b2.Index != b1.Index || b2.Override != b1.Override || len(b2.Rules) != len(b1.Rules) { + return false + } + sort.Slice(b1.Rules, func(i, j int) bool { return b1.Rules[i].ID < b1.Rules[j].ID }) + sort.Slice(b2.Rules, func(i, j int) bool { return b2.Rules[i].ID < b2.Rules[j].ID }) + for i := range b1.Rules { + if !suite.compareRule(b1.Rules[i], b2.Rules[i]) { + return false + } + } + return true +} + func (suite *ruleTestSuite) compareRule(r1 *placement.Rule, r2 *placement.Rule) bool { return r2.GroupID == r1.GroupID && r2.ID == r1.ID &&