Skip to content

Commit

Permalink
rule: split txn to multi batch (#7600)
Browse files Browse the repository at this point in the history
close #7599

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 22, 2023
1 parent 07f7e78 commit b36b725
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 133 deletions.
17 changes: 7 additions & 10 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
)
Expand All @@ -51,10 +52,6 @@ const (
// Note: Config[TSOKeyspaceGroupIDKey] is only used to judge whether there is keyspace group id.
// It will not update the keyspace group id when merging or splitting.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// MaxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128.
// We use 120 here to leave some space for other operations.
// See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61
MaxEtcdTxnOps = 120
)

// Config is the interface for keyspace config.
Expand Down Expand Up @@ -710,7 +707,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
zap.Duration("cost", time.Since(start)),
zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount),
zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount),
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Int("batch-size", etcdutil.MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -734,7 +731,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
if defaultKeyspaceGroup.IsMerging() {
return ErrKeyspaceGroupInMerging(utils.DefaultKeyspaceGroupID)
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, MaxEtcdTxnOps)
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, etcdutil.MaxEtcdTxnOps)
if err != nil {
return err
}
Expand All @@ -744,9 +741,9 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
// If there are less than `MaxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// If there are less than ` etcdutil.MaxEtcdTxnOps` keyspaces or the next start ID reaches the end,
// there is no need to patrol again.
moreToPatrol = keyspaceNum == MaxEtcdTxnOps
moreToPatrol = keyspaceNum == etcdutil.MaxEtcdTxnOps
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
Expand Down Expand Up @@ -785,7 +782,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Int("batch-size", etcdutil.MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand All @@ -799,7 +796,7 @@ func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", MaxEtcdTxnOps),
zap.Int("batch-size", etcdutil.MaxEtcdTxnOps),
zap.Uint32("start-keyspace-id", startKeyspaceID),
zap.Uint32("end-keyspace-id", endKeyspaceID),
zap.Uint32("current-start-id", currentStartID),
Expand Down
19 changes: 10 additions & 9 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/typeutil"
)

Expand Down Expand Up @@ -408,7 +409,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -423,7 +424,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
Expand All @@ -433,15 +434,15 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Expand All @@ -456,22 +457,22 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() {
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment with range [MaxEtcdTxnOps/2, MaxEtcdTxnOps/2+MaxEtcdTxnOps+1]
// Patrol the keyspace assignment with range [ etcdutil.MaxEtcdTxnOps/2, etcdutil.MaxEtcdTxnOps/2+ etcdutil.MaxEtcdTxnOps+1]
// to make sure the range crossing the boundary of etcd transaction operation limit.
var (
startKeyspaceID = uint32(MaxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + MaxEtcdTxnOps + 1
startKeyspaceID = uint32(etcdutil.MaxEtcdTxnOps / 2)
endKeyspaceID = startKeyspaceID + etcdutil.MaxEtcdTxnOps + 1
)
err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID)
re.NoError(err)
// Check if only the keyspaces within the range are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < MaxEtcdTxnOps*2+1; i++ {
for i := 1; i < etcdutil.MaxEtcdTxnOps*2+1; i++ {
keyspaceID := uint32(i)
if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID {
re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID)
Expand Down
4 changes: 2 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uin
// - Load and delete the keyspace groups in the merge list.
// - Load and update the target keyspace group.
// So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction.
if (mergeListNum+1)*2 > MaxEtcdTxnOps {
if (mergeListNum+1)*2 > etcdutil.MaxEtcdTxnOps {
return ErrExceedMaxEtcdTxnOps
}
if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) {
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (m *GroupManager) MergeAllIntoDefaultKeyspaceGroup() error {
continue
}
var (
maxBatchSize = MaxEtcdTxnOps/2 - 1
maxBatchSize = etcdutil.MaxEtcdTxnOps/2 - 1
groupsToMerge = make([]uint32, 0, maxBatchSize)
)
for idx, group := range groups.GetAll() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
)

type keyspaceGroupTestSuite struct {
Expand Down Expand Up @@ -449,7 +450,7 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() {
err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5})
re.ErrorContains(err, ErrKeyspaceGroupNotExists(5).Error())
// merge with the number of keyspace groups exceeds the limit
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, MaxEtcdTxnOps/2))
err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, etcdutil.MaxEtcdTxnOps/2))
re.ErrorIs(err, ErrExceedMaxEtcdTxnOps)
// merge the default keyspace group
err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID})
Expand Down
9 changes: 9 additions & 0 deletions pkg/schedule/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,15 @@ func (g *RuleGroup) String() string {
return string(b)
}

// Clone returns a copy of RuleGroup.
func (g *RuleGroup) Clone() *RuleGroup {
return &RuleGroup{
ID: g.ID,
Index: g.Index,
Override: g.Override,
}
}

// Rules are ordered by (GroupID, Index, ID).
func compareRule(a, b *Rule) int {
switch {
Expand Down
166 changes: 97 additions & 69 deletions pkg/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"golang.org/x/exp/slices"
Expand Down Expand Up @@ -157,61 +158,61 @@ func (m *RuleManager) loadRules() error {
toSave []*Rule
toDelete []string
)
return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
err = m.storage.LoadRules(txn, func(k, v string) {
r, err := NewRuleFromJSON([]byte(v))
if err != nil {
log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}
err = m.AdjustRule(r, "")
if err != nil {
log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err))
toDelete = append(toDelete, k)
return
}
_, ok := m.ruleConfig.rules[r.Key()]
if ok {
log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}
if k != r.StoreKey() {
log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
toSave = append(toSave, r)
}
m.ruleConfig.rules[r.Key()] = r
})
// load rules from storage
err := m.storage.LoadRules(func(k, v string) {
r, err := NewRuleFromJSON([]byte(v))
if err != nil {
return err
log.Error("failed to unmarshal rule value", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}

for _, s := range toSave {
if err = m.storage.SaveRule(txn, s.StoreKey(), s); err != nil {
return err
}
err = m.AdjustRule(r, "")
if err != nil {
log.Error("rule is in bad format", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule, err))
toDelete = append(toDelete, k)
return
}
for _, d := range toDelete {
if err = m.storage.DeleteRule(txn, d); err != nil {
return err
}
_, ok := m.ruleConfig.rules[r.Key()]
if ok {
log.Error("duplicated rule key", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
return
}
return nil
if k != r.StoreKey() {
log.Error("mismatch data key, need to restore", zap.String("rule-key", k), zap.String("rule-value", v), errs.ZapError(errs.ErrLoadRule))
toDelete = append(toDelete, k)
toSave = append(toSave, r)
}
m.ruleConfig.rules[r.Key()] = r
})
if err != nil {
return err
}
// save the rules with mismatch data key or bad format
var batch []func(kv.Txn) error
for _, s := range toSave {
localRule := s
batch = append(batch, func(txn kv.Txn) error {
return m.storage.SaveRule(txn, localRule.StoreKey(), localRule)
})
}
for _, d := range toDelete {
localKey := d
batch = append(batch, func(txn kv.Txn) error {
return m.storage.DeleteRule(txn, localKey)
})
}
return m.runBatchOpInTxn(batch)
}

func (m *RuleManager) loadGroups() error {
return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
return m.storage.LoadRuleGroups(txn, func(k, v string) {
g, err := NewRuleGroupFromJSON([]byte(v))
if err != nil {
log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err))
return
}
m.ruleConfig.groups[g.ID] = g
})
return m.storage.LoadRuleGroups(func(k, v string) {
g, err := NewRuleGroupFromJSON([]byte(v))
if err != nil {
log.Error("failed to unmarshal rule group", zap.String("group-id", k), errs.ZapError(errs.ErrLoadRuleGroup, err))
return
}
m.ruleConfig.groups[g.ID] = g
})
}

Expand Down Expand Up @@ -492,30 +493,35 @@ func (m *RuleManager) TryCommitPatch(patch *RuleConfigPatch) error {
}

func (m *RuleManager) savePatch(p *ruleConfig) error {
return m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
for key, r := range p.rules {
if r == nil {
r = &Rule{GroupID: key[0], ID: key[1]}
err = m.storage.DeleteRule(txn, r.StoreKey())
} else {
err = m.storage.SaveRule(txn, r.StoreKey(), r)
}
if err != nil {
return err
}
var batch []func(kv.Txn) error
// add rules to batch
for key, r := range p.rules {
localKey, localRule := key, r
if r == nil {
rule := &Rule{GroupID: localKey[0], ID: localKey[1]}
batch = append(batch, func(txn kv.Txn) error {
return m.storage.DeleteRule(txn, rule.StoreKey())
})
} else {
batch = append(batch, func(txn kv.Txn) error {
return m.storage.SaveRule(txn, localRule.StoreKey(), localRule)
})
}
for id, g := range p.groups {
if g.isDefault() {
err = m.storage.DeleteRuleGroup(txn, id)
} else {
err = m.storage.SaveRuleGroup(txn, id, g)
}
if err != nil {
return err
}
}
// add groups to batch
for id, g := range p.groups {
localID, localGroup := id, g
if g.isDefault() {
batch = append(batch, func(txn kv.Txn) error {
return m.storage.DeleteRuleGroup(txn, localID)
})
} else {
batch = append(batch, func(txn kv.Txn) error {
return m.storage.SaveRuleGroup(txn, localID, localGroup)
})
}
return nil
})
}
return m.runBatchOpInTxn(batch)
}

// SetRules inserts or updates lots of Rules at once.
Expand Down Expand Up @@ -808,6 +814,28 @@ func (m *RuleManager) IsInitialized() bool {
return m.initialized
}

func (m *RuleManager) runBatchOpInTxn(batch []func(kv.Txn) error) error {
// execute batch in transaction with limited operations per transaction
for start := 0; start < len(batch); start += etcdutil.MaxEtcdTxnOps {
end := start + etcdutil.MaxEtcdTxnOps
if end > len(batch) {
end = len(batch)
}
err := m.storage.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
for _, op := range batch[start:end] {
if err = op(txn); err != nil {
return err
}
}
return nil
})
if err != nil {
return err
}
}
return nil
}

// checkRule check the rule whether will have RuleFit after FitRegion
// in order to reduce the calculation.
func checkRule(rule *Rule, stores []*core.StoreInfo) bool {
Expand Down
Loading

0 comments on commit b36b725

Please sign in to comment.