Skip to content

Commit

Permalink
keyspace: patrol the keyspace assignment in batch (tikv#6411)
Browse files Browse the repository at this point in the history
ref tikv#6232

Patrol the keyspace assignment in batch.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
  • Loading branch information
JmPotato authored and zeminzhou committed May 10, 2023
1 parent 95d1ea2 commit 251721b
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 59 deletions.
155 changes: 96 additions & 59 deletions pkg/keyspace/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ const (
UserKindKey = "user_kind"
// TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config.
TSOKeyspaceGroupIDKey = "tso_keyspace_group_id"
// keyspacePatrolBatchSize is the batch size for keyspace assignment patrol.
keyspacePatrolBatchSize = 256
)

// Config is the interface for keyspace config.
Expand All @@ -72,6 +74,8 @@ type Manager struct {
config Config
// kgm is the keyspace group manager of the server.
kgm *GroupManager
// nextPatrolStartID is the next start id of keyspace assignment patrol.
nextPatrolStartID uint32
}

// CreateKeyspaceRequest represents necessary arguments to create a keyspace.
Expand All @@ -94,13 +98,14 @@ func NewKeyspaceManager(
kgm *GroupManager,
) *Manager {
return &Manager{
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
metaLock: syncutil.NewLockGroup(syncutil.WithHash(keyspaceIDHash)),
idAllocator: idAllocator,
store: store,
cluster: cluster,
ctx: ctx,
config: config,
kgm: kgm,
nextPatrolStartID: utils.DefaultKeyspaceID,
}
}

Expand Down Expand Up @@ -570,64 +575,96 @@ func (manager *Manager) allocID() (uint32, error) {

// PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (manager *Manager) PatrolKeyspaceAssignment() error {
// TODO: since the number of keyspaces might be large, we should consider to assign them in batches.
return manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, len(keyspaces))
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
var (
// The current start ID of the patrol, used for logging.
currentStartID = manager.nextPatrolStartID
// The next start ID of the patrol, used for the next patrol.
nextStartID = currentStartID
moreToPatrol = true
err error
)
for moreToPatrol {
err = manager.store.RunInTxn(manager.ctx, func(txn kv.Txn) error {
defaultKeyspaceGroup, err := manager.kgm.store.LoadKeyspaceGroup(txn, utils.DefaultKeyspaceGroupID)
if err != nil {
return err
}
if defaultKeyspaceGroup == nil {
return errors.Errorf("default keyspace group %d not found", utils.DefaultKeyspaceGroupID)
}
if defaultKeyspaceGroup.IsSplitting() {
return ErrKeyspaceGroupInSplit
}
keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize)
if err != nil {
return err
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
keyspaceNum := len(keyspaces)
// If there are more than one keyspace, update the current and next start IDs.
if keyspaceNum > 0 {
currentStartID = keyspaces[0].GetId()
nextStartID = keyspaces[keyspaceNum-1].GetId() + 1
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else {
// If the keyspace already has a group ID, skip it.
_, ok := ks.Config[TSOKeyspaceGroupIDKey]
if ok {
// If there are less than `keyspacePatrolBatchSize` keyspaces,
// we have reached the end of the keyspace list.
moreToPatrol = keyspaceNum == keyspacePatrolBatchSize
var (
assigned = false
keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum)
)
defer func() {
for _, id := range keyspaceIDsToUnlock {
manager.metaLock.Unlock(id)
}
}()
for _, ks := range keyspaces {
if ks == nil {
continue
}
manager.metaLock.Lock(ks.Id)
if ks.Config == nil {
ks.Config = make(map[string]string, 1)
} else if _, ok := ks.Config[TSOKeyspaceGroupIDKey]; ok {
// If the keyspace already has a group ID, skip it.
manager.metaLock.Unlock(ks.Id)
continue
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
// Only save the keyspace group meta if any keyspace is assigned to it.
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID),
zap.Uint32("keyspace-id", ks.Id), zap.Error(err))
return err
}
}
// Unlock the keyspace meta lock after the whole txn.
keyspaceIDsToUnlock = append(keyspaceIDsToUnlock, ks.Id)
// If the keyspace doesn't have a group ID, assign it to the default keyspace group.
if !slice.Contains(defaultKeyspaceGroup.Keyspaces, ks.Id) {
defaultKeyspaceGroup.Keyspaces = append(defaultKeyspaceGroup.Keyspaces, ks.Id)
assigned = true
}
ks.Config[TSOKeyspaceGroupIDKey] = strconv.FormatUint(uint64(utils.DefaultKeyspaceGroupID), 10)
err = manager.store.SaveKeyspaceMeta(txn, ks)
if err != nil {
log.Error("[keyspace] failed to save keyspace meta during patrol",
zap.Uint32("ID", ks.Id), zap.Error(err))
return err
if assigned {
err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
if err != nil {
log.Error("[keyspace] failed to save default keyspace group meta during patrol",
zap.Int("batch-size", keyspacePatrolBatchSize),
zap.Uint32("current-start-id", currentStartID),
zap.Uint32("next-start-id", nextStartID), zap.Error(err))
return err
}
}
return nil
})
if err != nil {
return err
}
if assigned {
return manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup)
}
return nil
})
// If all keyspaces in the current batch are assigned, update the next start ID.
manager.nextPatrolStartID = nextStartID
}
return nil
}
33 changes: 33 additions & 0 deletions pkg/keyspace/keyspace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,36 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() {
re.NotNil(defaultKeyspaceGroup)
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(111))
}

func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() {
re := suite.Require()
// Create some keyspaces without any keyspace group.
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
now := time.Now().Unix()
err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: uint32(i),
Name: strconv.Itoa(i),
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
}
// Check if all the keyspaces are not attached to the default group.
defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
// Patrol the keyspace assignment.
err = suite.manager.PatrolKeyspaceAssignment()
re.NoError(err)
// Check if all the keyspaces are attached to the default group.
defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID)
re.NoError(err)
re.NotNil(defaultKeyspaceGroup)
for i := 1; i < keyspacePatrolBatchSize*2+1; i++ {
re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i))
}
}

0 comments on commit 251721b

Please sign in to comment.