Skip to content

Commit

Permalink
restore_data: remove wait apply phase (pingcap#50316) (pingcap#50470) (
Browse files Browse the repository at this point in the history
…pingcap#36)

close pingcap#50312, close pingcap#50315

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and GitHub Enterprise committed Jan 23, 2024
1 parent 23b557e commit 991ee45
Showing 1 changed file with 1 addition and 41 deletions.
42 changes: 1 addition & 41 deletions br/pkg/restore/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ func doRecoveryData(ctx context.Context, resolveTS uint64, allStores []*metapb.S
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.WaitApply(ctx); err != nil {
return totalRegions, recoveryError{error: err, atStage: StageRecovering}
}

if err := recovery.PrepareFlashbackToVersion(ctx, resolveTS, restoreTS-1); err != nil {
return totalRegions, recoveryError{error: err, atStage: StageFlashback}
}
Expand Down Expand Up @@ -228,7 +224,7 @@ func (recovery *Recovery) ReadRegionMeta(ctx context.Context) error {
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, common.MaxStoreConcurrency)), "Collect Region Meta") // TODO: int overflow?

// TODO: optimize the ErroGroup when TiKV is panic
// TODO: optimize the ErrorGroup when TiKV is panic
metaChan := make(chan StoreMeta, 1024)
defer close(metaChan)

Expand Down Expand Up @@ -399,42 +395,6 @@ func (recovery *Recovery) SpawnTiKVShutDownWatchers(ctx context.Context) {
go mainLoop()
}

// WaitApply send wait apply to all tikv ensure all region peer apply log into the last
func (recovery *Recovery) WaitApply(ctx context.Context) (err error) {
eg, ectx := errgroup.WithContext(ctx)
totalStores := len(recovery.allStores)
workers := utils.NewWorkerPool(uint(mathutil.Min(totalStores, common.MaxStoreConcurrency)), "wait apply")

for _, store := range recovery.allStores {
if err := ectx.Err(); err != nil {
break
}
storeAddr := getStoreAddress(recovery.allStores, store.Id)
storeId := store.Id

workers.ApplyOnErrorGroup(eg, func() error {
recoveryClient, conn, err := recovery.newRecoveryClient(ectx, storeAddr)
if err != nil {
return errors.Trace(err)
}
defer conn.Close()
log.Info("send wait apply to tikv", zap.String("tikv address", storeAddr), zap.Uint64("store id", storeId))
req := &recovpb.WaitApplyRequest{StoreId: storeId}
_, err = recoveryClient.WaitApply(ectx, req)
if err != nil {
log.Error("wait apply failed", zap.Uint64("store id", storeId))
return errors.Trace(err)
}

recovery.progress.Inc()
log.Info("wait apply execution success", zap.Uint64("store id", storeId))
return nil
})
}
// Wait for all TiKV instances force leader and wait apply to last log.
return eg.Wait()
}

// prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state
func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) {
retryErr := utils.WithRetry(
Expand Down

0 comments on commit 991ee45

Please sign in to comment.