Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

restore: remove tiflash replica before restore #194

Merged
merged 15 commits into from
Mar 18, 2020
19 changes: 19 additions & 0 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,25 @@ func (rc *Client) CreateTables(
return rewriteRules, newTables, nil
}

// GetTiFlashStores returns an id list of tiflash stores.
func (rc *Client) GetTiFlashStores() ([]uint64, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an option to GatAllTiKVStores to return TiFlash store only?

stores, err := rc.pdClient.GetAllStores(rc.ctx)
if err != nil {
return nil, err
}

tiflashStores := make([]uint64, 0)
for _, store := range stores {
for _, label := range store.GetLabels() {
if label.GetKey() == "engine" && label.GetValue() == "tiflash" {
tiflashStores = append(tiflashStores, store.GetId())
}
}
}

return tiflashStores, nil
}

// ExecDDLs executes the queries of the ddl jobs.
func (rc *Client) ExecDDLs(ddlJobs []*model.Job) error {
// Sort the ddl jobs by schema version in ascending order.
Expand Down
25 changes: 20 additions & 5 deletions pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func (db *DB) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {

// CreateTable executes a CREATE TABLE SQL.
func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
schema := table.Info
createSQL, err := db.se.ShowCreateTable(schema, newIDAllocator(schema.AutoIncID))
tableInfo := table.Info
createSQL, err := db.se.ShowCreateTable(tableInfo, newIDAllocator(tableInfo.AutoIncID))
if err != nil {
log.Error(
"build create table SQL failed",
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", schema.Name),
zap.Stringer("table", tableInfo.Name),
zap.Error(err))
return errors.Trace(err)
}
Expand Down Expand Up @@ -119,8 +119,8 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
}
alterAutoIncIDSQL := fmt.Sprintf(
"alter table %s auto_increment = %d",
utils.EncloseName(schema.Name.O),
schema.AutoIncID)
utils.EncloseName(tableInfo.Name.O),
tableInfo.AutoIncID)
err = db.se.Execute(ctx, alterAutoIncIDSQL)
if err != nil {
log.Error("alter AutoIncID failed",
Expand All @@ -129,6 +129,21 @@ func (db *DB) CreateTable(ctx context.Context, table *utils.Table) error {
zap.Stringer("table", table.Info.Name),
zap.Error(err))
}

// TODO: remove this after tiflash supports restore
removeTiFlashSQL := fmt.Sprintf(
"alter table %s set tiflash replica 0",
utils.EncloseName(tableInfo.Name.O),
)
err = db.se.Execute(ctx, removeTiFlashSQL)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if ignore this error when tidb not support set tiflash replica

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would fail, do we need to support the versions of tidb which don't support tiflash?

if err != nil {
log.Error("remove tiflash replica failed",
zap.String("query", removeTiFlashSQL),
zap.Stringer("db", table.Db.Name),
zap.Stringer("table", table.Info.Name),
zap.Error(err))
}

return errors.Trace(err)
}

Expand Down
107 changes: 92 additions & 15 deletions pkg/restore/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ const (
ScatterWaitMaxRetryTimes = 64
ScatterWaitInterval = 50 * time.Millisecond
ScatterMaxWaitInterval = time.Second

ScatterWaitUpperInterval = 180 * time.Second

RejectStoreCheckRetryTimes = 64
RejectStoreCheckInterval = 100 * time.Millisecond
RejectStoreMaxCheckInterval = 2 * time.Second
)

// RegionSplitter is a executor of region split by rules.
Expand All @@ -60,16 +63,17 @@ func (rs *RegionSplitter) Split(
ctx context.Context,
ranges []rtree.Range,
rewriteRules *RewriteRules,
rejectStores []uint64,
onSplit OnSplitFunc,
) error {
if len(ranges) == 0 {
return nil
}
startTime := time.Now()
// Sort the range for getting the min and max key of the ranges
sortedRanges, err := sortRanges(ranges, rewriteRules)
if err != nil {
return errors.Trace(err)
sortedRanges, errSplit := sortRanges(ranges, rewriteRules)
if errSplit != nil {
return errors.Trace(errSplit)
}
minKey := codec.EncodeBytes([]byte{}, sortedRanges[0].StartKey)
maxKey := codec.EncodeBytes([]byte{}, sortedRanges[len(sortedRanges)-1].EndKey)
Expand All @@ -91,12 +95,14 @@ func (rs *RegionSplitter) Split(
}
interval := SplitRetryInterval
scatterRegions := make([]*RegionInfo, 0)
allRegions := make([]*RegionInfo, 0)
SplitRegions:
for i := 0; i < SplitRetryTimes; i++ {
regions, err1 := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if err1 != nil {
return errors.Trace(err1)
regions, errScan := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit)
if errScan != nil {
return errors.Trace(errScan)
}
allRegions = append(allRegions, regions...)
if len(regions) == 0 {
log.Warn("cannot scan any region")
return nil
Expand All @@ -109,24 +115,24 @@ SplitRegions:
for regionID, keys := range splitKeyMap {
var newRegions []*RegionInfo
region := regionMap[regionID]
newRegions, err = rs.splitAndScatterRegions(ctx, region, keys)
if err != nil {
if strings.Contains(err.Error(), "no valid key") {
newRegions, errSplit = rs.splitAndScatterRegions(ctx, region, keys)
if errSplit != nil {
if strings.Contains(errSplit.Error(), "no valid key") {
for _, key := range keys {
log.Error("no valid key",
zap.Binary("startKey", region.Region.StartKey),
zap.Binary("endKey", region.Region.EndKey),
zap.Binary("key", codec.EncodeBytes([]byte{}, key)))
}
return errors.Trace(err)
return errors.Trace(errSplit)
}
interval = 2 * interval
if interval > SplitMaxRetryInterval {
interval = SplitMaxRetryInterval
}
time.Sleep(interval)
if i > 3 {
log.Warn("splitting regions failed, retry it", zap.Error(err), zap.ByteStrings("keys", keys))
log.Warn("splitting regions failed, retry it", zap.Error(errSplit), zap.ByteStrings("keys", keys))
}
continue SplitRegions
}
Expand All @@ -136,10 +142,27 @@ SplitRegions:
}
break
}
if err != nil {
return errors.Trace(err)
if errSplit != nil {
return errors.Trace(errSplit)
}
log.Info("splitting regions done, wait for scattering regions",
if len(rejectStores) > 0 {
startTime = time.Now()
log.Info("start to wait for removing rejected stores", zap.Uint64s("rejectStores", rejectStores))
storeMap := make(map[uint64]bool)
for _, storeID := range rejectStores {
storeMap[storeID] = true
}
for _, region := range allRegions {
if !rs.waitForRemoveRejectStores(ctx, region, storeMap) {
log.Error("waiting for removing rejected stores failed",
zap.Stringer("region", region.Region))
return errors.New("waiting for removing rejected stores failed")
}
}
log.Info("waiting for removing rejected stores done",
zap.Int("regions", len(allRegions)), zap.Duration("take", time.Since(startTime)))
}
log.Info("start to wait for scattering regions",
zap.Int("regions", len(scatterRegions)), zap.Duration("take", time.Since(startTime)))
startTime = time.Now()
scatterCount := 0
Expand Down Expand Up @@ -192,6 +215,30 @@ func (rs *RegionSplitter) isScatterRegionFinished(ctx context.Context, regionID
return ok, nil
}

func (rs *RegionSplitter) hasRejectStorePeer(
ctx context.Context,
regionID uint64,
rejectStores map[uint64]bool,
) (bool, error) {
regionInfo, err := rs.client.GetRegionByID(ctx, regionID)
if err != nil {
return false, err
}
if regionInfo == nil {
return false, nil
}
for _, peer := range regionInfo.Region.GetPeers() {
if rejectStores[peer.GetStoreId()] {
return true, nil
}
}
retryTimes := ctx.Value(retryTimes).(int)
if retryTimes > 10 {
log.Warn("get region info", zap.Stringer("region", regionInfo.Region))
}
Comment on lines +231 to +234
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you handle the retry outside of the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the region info would be printed here.

return false, nil
}

func (rs *RegionSplitter) waitForSplit(ctx context.Context, regionID uint64) {
interval := SplitCheckInterval
for i := 0; i < SplitCheckMaxRetryTimes; i++ {
Expand Down Expand Up @@ -237,6 +284,36 @@ func (rs *RegionSplitter) waitForScatterRegion(ctx context.Context, regionInfo *
}
}

func (rs *RegionSplitter) waitForRemoveRejectStores(
ctx context.Context,
regionInfo *RegionInfo,
rejectStores map[uint64]bool,
) bool {
interval := RejectStoreCheckInterval
regionID := regionInfo.Region.GetId()
for i := 0; i < RejectStoreCheckRetryTimes; i++ {
ctx1 := context.WithValue(ctx, retryTimes, i)
ok, err := rs.hasRejectStorePeer(ctx1, regionID, rejectStores)
if err != nil {
log.Warn("wait for rejecting store failed",
zap.Stringer("region", regionInfo.Region),
zap.Error(err))
return false
}
// Do not have any peer in the rejected store, return true
if !ok {
return true
}
interval = 2 * interval
if interval > RejectStoreMaxCheckInterval {
interval = RejectStoreMaxCheckInterval
}
time.Sleep(interval)
}

return false
}

func (rs *RegionSplitter) splitAndScatterRegions(
ctx context.Context, regionInfo *RegionInfo, keys [][]byte,
) ([]*RegionInfo, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/restore/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *testRestoreUtilSuite) TestSplit(c *C) {
regionSplitter := NewRegionSplitter(client)

ctx := context.Background()
err := regionSplitter.Split(ctx, ranges, rewriteRules, func(key [][]byte) {})
err := regionSplitter.Split(ctx, ranges, rewriteRules, []uint64{}, func(key [][]byte) {})
if err != nil {
c.Assert(err, IsNil, Commentf("split regions failed: %v", err))
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/restore/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,12 @@ func SplitRanges(
summary.CollectDuration("split region", elapsed)
}()
splitter := NewRegionSplitter(NewSplitClient(client.GetPDClient(), client.GetTLSConfig()))
return splitter.Split(ctx, ranges, rewriteRules, func(keys [][]byte) {
tiflashStores, err := client.GetTiFlashStores()
if err != nil {
return errors.Trace(err)
}

return splitter.Split(ctx, ranges, rewriteRules, tiflashStores, func(keys [][]byte) {
for range keys {
updateCh <- struct{}{}
}
Expand Down