diff --git a/br/cmd/br/debug.go b/br/cmd/br/debug.go index 8525c6bd61993..02adb0c713ddc 100644 --- a/br/cmd/br/debug.go +++ b/br/cmd/br/debug.go @@ -229,7 +229,7 @@ func newBackupMetaValidateCommand() *cobra.Command { Name: indexInfo.Name, } } - rules := restore.GetRewriteRules(newTable, table.Info, 0) + rules := restore.GetRewriteRules(newTable, table.Info, 0, true) rewriteRules.Data = append(rewriteRules.Data, rules.Data...) tableIDMap[table.Info.ID] = int64(tableID) } @@ -322,7 +322,7 @@ func encodeBackupMetaCommand() *cobra.Command { if err := cfg.ParseFromFlags(cmd.Flags()); err != nil { return errors.Trace(err) } - _, s, err := task.GetStorage(ctx, &cfg) + _, s, err := task.GetStorage(ctx, cfg.Storage, &cfg) if err != nil { return errors.Trace(err) } diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index b43c7bd482d86..29944fa9e2691 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -50,7 +50,7 @@ func main() { NewRestoreCommand(), NewStreamCommand(), ) - // Ouputs cmd.Print to stdout. + // Outputs cmd.Print to stdout. rootCmd.SetOut(os.Stdout) rootCmd.SetArgs(os.Args[1:]) diff --git a/br/cmd/br/stream.go b/br/cmd/br/stream.go index e821a0ceef34c..a1174f5b0dc1a 100644 --- a/br/cmd/br/stream.go +++ b/br/cmd/br/stream.go @@ -44,8 +44,9 @@ func NewStreamCommand() *cobra.Command { newStreamStartCommand(), newStreamStopCommand(), newStreamPauseCommand(), - newStreamResumeComamnd(), - newStreamStatusComamnd(), + newStreamResumeCommand(), + newStreamStatusCommand(), + newStreamRestoreCommand(), ) return command } @@ -94,7 +95,7 @@ func newStreamPauseCommand() *cobra.Command { return command } -func newStreamResumeComamnd() *cobra.Command { +func newStreamResumeCommand() *cobra.Command { command := &cobra.Command{ Use: "resume", Short: "resume a stream task", @@ -108,7 +109,7 @@ func newStreamResumeComamnd() *cobra.Command { return command } -func newStreamStatusComamnd() *cobra.Command { +func newStreamStatusCommand() *cobra.Command { command := &cobra.Command{ Use: "status", Short: "get status of a stream task", @@ -122,19 +123,50 @@ func newStreamStatusComamnd() *cobra.Command { return command } +// TODO maybe we should use `br restore stream` rather than `br stream restore` +// because the restore and stream task has no common flags. +func newStreamRestoreCommand() *cobra.Command { + command := &cobra.Command{ + Use: "restore", + Short: "restore a stream backups", + Args: cobra.NoArgs, + RunE: func(command *cobra.Command, _ []string) error { + return streamCommand(command, task.StreamRestore) + }, + } + task.DefineFilterFlags(command, acceptAllTables) + task.DefineStreamRestoreFlags(command.Flags()) + return command +} + func streamCommand(command *cobra.Command, cmdName string) error { var cfg task.StreamConfig - if err := cfg.ParseCommonFromFlags(command.Flags()); err != nil { - command.SilenceUsage = false + var err error + defer func() { + if err != nil { + command.SilenceUsage = false + } + }() + if err = cfg.Config.ParseFromFlags(command.Flags()); err != nil { return errors.Trace(err) } - if cmdName == task.StreamStart { - if err := cfg.ParseStreamStartFromFlags(command.Flags()); err != nil { + switch cmdName { + case task.StreamRestore: + if err = cfg.ParseStreamRestoreFromFlags(command.Flags()); err != nil { + return errors.Trace(err) + } + case task.StreamStart: + if err = cfg.ParseStreamStartFromFlags(command.Flags()); err != nil { + return errors.Trace(err) + } + // TODO use `br restore stream` rather than `br stream restore` + fallthrough + default: + if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil { return errors.Trace(err) } } - ctx := GetDefaultContext() if cfg.EnableOpenTracing { var store *appdash.MemoryStore diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index c62eb1c3af6d0..9968faaefc8cf 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -150,6 +150,10 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke return nil } +func (rc *Client) GetDomain() *domain.Domain { + return rc.dom +} + // GetPDClient returns a pd client. func (rc *Client) GetPDClient() pd.Client { return rc.pdClient @@ -174,13 +178,19 @@ func (rc *Client) Close() { log.Info("Restore client closed") } +func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) { + metaClient := NewSplitClient(rc.pdClient, rc.tlsConf) + importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) + rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rateLimit) +} + // InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient. func (rc *Client) InitBackupMeta( c context.Context, backupMeta *backuppb.BackupMeta, backend *backuppb.StorageBackend, - externalStorage storage.ExternalStorage, reader *metautil.MetaReader) error { + if !backupMeta.IsRawKv { databases, err := utils.LoadBackupTables(c, reader) if err != nil { @@ -203,11 +213,9 @@ func (rc *Client) InitBackupMeta( rc.ddlJobs = ddlJobs } rc.backupMeta = backupMeta - log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs))) - metaClient := NewSplitClient(rc.pdClient, rc.tlsConf) - importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf) - rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit) + rc.InitClients(backend, backupMeta.IsRawKv) + log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs))) return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient) } @@ -428,7 +436,7 @@ func (rc *Client) createTable( table.Info.IsCommonHandle, newTableInfo.IsCommonHandle) } - rules := GetRewriteRules(newTableInfo, table.Info, newTS) + rules := GetRewriteRules(newTableInfo, table.Info, newTS, true) et := CreatedTable{ RewriteRule: rules, Table: newTableInfo, @@ -595,8 +603,8 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F return files[:idx], files[idx:] } -// RestoreFiles tries to restore the files. -func (rc *Client) RestoreFiles( +// RestoreSSTFiles tries to restore the files. +func (rc *Client) RestoreSSTFiles( ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, @@ -614,7 +622,7 @@ func (rc *Client) RestoreFiles( log.Debug("start to restore files", zap.Int("files", len(files))) if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("Client.RestoreFiles", opentracing.ChildOf(span.Context())) + span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context())) defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } @@ -637,7 +645,7 @@ func (rc *Client) RestoreFiles( zap.Duration("take", time.Since(fileStart))) updateCh.Inc() }() - return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher) + return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher) }) } @@ -678,7 +686,7 @@ func (rc *Client) RestoreRaw( rc.workerPool.ApplyOnErrorGroup(eg, func() error { defer updateCh.Inc() - return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher) + return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher) }) } if err := eg.Wait(); err != nil { @@ -1198,6 +1206,103 @@ func (rc *Client) PreCheckTableClusterIndex( return nil } +const ( + streamBackupMetaPrefix = "v1_backupmeta" +) + +// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS. +func (rc *Client) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) { + streamBackupMetaFiles := make([]*backuppb.Metadata, 0) + err := rc.storage.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error { + if strings.Contains(path, streamBackupMetaPrefix) { + m := &backuppb.Metadata{} + b, err := rc.storage.ReadFile(ctx, path) + if err != nil { + return errors.Trace(err) + } + err = m.Unmarshal(b) + if err != nil { + return errors.Trace(err) + } + // TODO find a way to filter some unnecessary meta files. + log.Debug("backup stream collect meta file", zap.String("file", path)) + streamBackupMetaFiles = append(streamBackupMetaFiles, m) + } + return nil + }) + if err != nil { + return nil, errors.Trace(err) + } + return streamBackupMetaFiles, nil +} + +// ReadStreamDataFiles is used for streaming task. collect all meta file by TS. +func (rc *Client) ReadStreamDataFiles(ctx context.Context, metas []*backuppb.Metadata, restoreTS uint64) ([]*backuppb.DataFileInfo, error) { + streamBackupDataFiles := make([]*backuppb.DataFileInfo, 0) + for _, m := range metas { + for _, d := range m.Files { + if d.MinTs > restoreTS { + continue + } + streamBackupDataFiles = append(streamBackupDataFiles, d) + log.Debug("backup stream collect data file", zap.String("file", d.Path)) + } + } + return streamBackupDataFiles, nil +} + +func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRules, files []*backuppb.DataFileInfo) error { + var err error + start := time.Now() + defer func() { + elapsed := time.Since(start) + if err == nil { + log.Info("Restore KV files", zap.Duration("take", elapsed)) + summary.CollectSuccessUnit("files", len(files), elapsed) + } + }() + + log.Debug("start to restore files", zap.Int("files", len(files))) + + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + + eg, ectx := errgroup.WithContext(ctx) + for _, file := range files { + filesReplica := file + // get rewrite rule from table id + rule, ok := rules[filesReplica.TableId] + if !ok { + // TODO handle new created table + // For this version we do not handle new created table after full backup. + // in next version we will perform rewrite and restore meta key to restore new created tables. + // so we can simply skip the file that doesn't have the rule here. + log.Info("skip file due to table id not matched", zap.String("file", file.Path)) + continue + } + rc.workerPool.ApplyOnErrorGroup(eg, func() error { + fileStart := time.Now() + defer func() { + log.Debug("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart))) + }() + return rc.fileImporter.ImportKVFiles(ectx, filesReplica, rule) + }) + } + + if err = eg.Wait(); err != nil { + summary.CollectFailureUnit("file", err) + log.Error( + "restore files failed", + zap.Error(err), + ) + return errors.Trace(err) + } + return nil +} + func transferBoolToValue(enable bool) string { if enable { return "ON" diff --git a/br/pkg/restore/import.go b/br/pkg/restore/import.go index aafb144959202..26aa2e2fbd009 100644 --- a/br/pkg/restore/import.go +++ b/br/pkg/restore/import.go @@ -42,6 +42,12 @@ const ( // ImporterClient is used to import a file to TiKV. type ImporterClient interface { + ApplyKVFile( + ctx context.Context, + storeID uint64, + req *import_sstpb.ApplyRequest, + ) (*import_sstpb.ApplyResponse, error) + DownloadSST( ctx context.Context, storeID uint64, @@ -92,6 +98,18 @@ func NewImportClient(metaClient SplitClient, tlsConf *tls.Config, keepaliveConf } } +func (ic *importClient) ApplyKVFile( + ctx context.Context, + storeID uint64, + req *import_sstpb.ApplyRequest, +) (*import_sstpb.ApplyResponse, error) { + client, err := ic.GetImportClient(ctx, storeID) + if err != nil { + return nil, errors.Trace(err) + } + return client.Apply(ctx, req) +} + func (ic *importClient) DownloadSST( ctx context.Context, storeID uint64, @@ -259,9 +277,88 @@ func (importer *FileImporter) SetRawRange(startKey, endKey []byte) error { return nil } -// Import tries to import a file. +func (importer *FileImporter) ImportKVFiles( + ctx context.Context, + file *backuppb.DataFileInfo, + rule *RewriteRules, +) error { + startTime := time.Now() + log.Debug("import kv files", zap.String("file", file.Path)) + var startKey, endKey []byte + start, end, err := rewriteFileKeys(file, rule) + if err != nil { + return errors.Trace(err) + } + if len(startKey) == 0 || bytes.Compare(startKey, start) > 0 { + startKey = start + } + if bytes.Compare(endKey, end) < 0 { + endKey = end + } + + log.Debug("rewrite file keys", + zap.String("name", file.Path), + logutil.Key("startKey", startKey), + logutil.Key("endKey", endKey)) + + err = utils.WithRetry(ctx, func() error { + tctx, cancel := context.WithTimeout(ctx, importScanRegionTime) + defer cancel() + // Scan regions covered by the file range + regionInfos, errScanRegion := PaginateScanRegion( + tctx, importer.metaClient, startKey, endKey, ScanRegionPaginationLimit) + if errScanRegion != nil { + return errors.Trace(errScanRegion) + } + + log.Debug("scan regions", zap.String("name", file.Path), zap.Int("count", len(regionInfos))) + // Try to download and ingest the file in every region + regionLoop: + for _, regionInfo := range regionInfos { + info := regionInfo + // Try to download file. + errDownload := utils.WithRetry(ctx, func() error { + return importer.downloadAndApplyKVFile(ctx, file, rule, info) + }, utils.NewDownloadSSTBackoffer()) + if errDownload != nil { + for _, e := range multierr.Errors(errDownload) { + switch errors.Cause(e) { // nolint:errorlint + case berrors.ErrKVRewriteRuleNotFound, berrors.ErrKVRangeIsEmpty: + // Skip this region + log.Warn("download file skipped", + logutil.Region(info.Region), + logutil.Key("startKey", startKey), + logutil.Key("endKey", endKey), + logutil.Key("fileStart", file.StartKey), + logutil.Key("fileEnd", file.EndKey), + logutil.ShortError(e)) + continue regionLoop + } + } + log.Error("download and apply file failed", + logutil.Region(info.Region), + logutil.Key("startKey", startKey), + logutil.Key("endKey", endKey), + logutil.Key("fileStart", file.StartKey), + logutil.Key("fileEnd", file.EndKey), + logutil.ShortError(errDownload)) + return errors.Trace(errDownload) + } + log.Debug("download and apply file done", + zap.String("file", file.Path), + zap.Stringer("take", time.Since(startTime)), + logutil.Key("fileStart", file.StartKey), + logutil.Key("fileEnd", file.EndKey), + ) + } + return nil + }, utils.NewImportSSTBackoffer()) + return errors.Trace(err) +} + +// ImportSSTFiles tries to import a file. // All rules must contain encoded keys. -func (importer *FileImporter) Import( +func (importer *FileImporter) ImportSSTFiles( ctx context.Context, files []*backuppb.File, rewriteRules *RewriteRules, @@ -626,3 +723,35 @@ func (importer *FileImporter) ingestSSTs( resp, err := importer.importClient.MultiIngest(ctx, leader.GetStoreId(), req) return resp, errors.Trace(err) } + +func (importer *FileImporter) downloadAndApplyKVFile( + ctx context.Context, + file *backuppb.DataFileInfo, + rules *RewriteRules, + regionInfo *RegionInfo, +) error { + leader := regionInfo.Leader + if leader == nil { + return errors.Annotatef(berrors.ErrPDLeaderNotFound, + "region id %d has no leader", regionInfo.Region.Id) + } + // Get the rewrite rule for the file. + fileRule := findMatchedRewriteRule(file, rules) + if fileRule == nil { + return errors.Trace(berrors.ErrKVRewriteRuleNotFound) + } + rule := import_sstpb.RewriteRule{ + OldKeyPrefix: encodeKeyPrefix(fileRule.GetOldKeyPrefix()), + NewKeyPrefix: encodeKeyPrefix(fileRule.GetNewKeyPrefix()), + } + + req := &import_sstpb.ApplyRequest{ + Name: file.Path, + StorageBackend: importer.backend, + Cf: file.Cf, + RewriteRule: rule, + } + log.Debug("apply kv file", logutil.Leader(leader)) + _, err := importer.importClient.ApplyKVFile(ctx, leader.GetStoreId(), req) + return errors.Trace(err) +} diff --git a/br/pkg/restore/pipeline_items.go b/br/pkg/restore/pipeline_items.go index 1bd7502f30642..0046e65c7cd5c 100644 --- a/br/pkg/restore/pipeline_items.go +++ b/br/pkg/restore/pipeline_items.go @@ -355,10 +355,10 @@ func (b *tikvSender) restoreWorker(ctx context.Context, ranges <-chan drainResul return } files := r.result.Files() - // There has been a worker in the `RestoreFiles` procedure. + // There has been a worker in the `RestoreSSTFiles` procedure. // Spawning a raw goroutine won't make too many requests to TiKV. eg.Go(func() error { - e := b.client.RestoreFiles(ectx, files, r.result.RewriteRules, b.updateCh) + e := b.client.RestoreSSTFiles(ectx, files, r.result.RewriteRules, b.updateCh) if e != nil { return e } diff --git a/br/pkg/restore/util.go b/br/pkg/restore/util.go index 812d87b09cec6..9fd32968dc3c5 100644 --- a/br/pkg/restore/util.go +++ b/br/pkg/restore/util.go @@ -32,9 +32,20 @@ var ( quoteRegexp = regexp.MustCompile("`(?:[^`]|``)*`") ) +// ApplyedFile has two types for now. +// 1. SST file used by full backup/restore. +// 2. KV file used by pitr restore. +type ApplyedFile interface { + GetStartKey() []byte + GetEndKey() []byte +} + // GetRewriteRules returns the rewrite rule of the new table and the old table. +// getDetailRule is used for normal backup & restore. +// if set to true, means we collect the rules like tXXX_r, tYYY_i. +// if set to false, means we only collect the rules contain table_id, tXXX, tYYY. func GetRewriteRules( - newTable, oldTable *model.TableInfo, newTimeStamp uint64, + newTable, oldTable *model.TableInfo, newTimeStamp uint64, getDetailRule bool, ) *RewriteRules { tableIDs := make(map[int64]int64) tableIDs[oldTable.ID] = newTable.ID @@ -58,15 +69,23 @@ func GetRewriteRules( dataRules := make([]*import_sstpb.RewriteRule, 0) for oldTableID, newTableID := range tableIDs { - dataRules = append(dataRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: append(tablecodec.EncodeTablePrefix(oldTableID), recordPrefixSep...), - NewKeyPrefix: append(tablecodec.EncodeTablePrefix(newTableID), recordPrefixSep...), - NewTimestamp: newTimeStamp, - }) - for oldIndexID, newIndexID := range indexIDs { + if getDetailRule { + dataRules = append(dataRules, &import_sstpb.RewriteRule{ + OldKeyPrefix: append(tablecodec.EncodeTablePrefix(oldTableID), recordPrefixSep...), + NewKeyPrefix: append(tablecodec.EncodeTablePrefix(newTableID), recordPrefixSep...), + NewTimestamp: newTimeStamp, + }) + for oldIndexID, newIndexID := range indexIDs { + dataRules = append(dataRules, &import_sstpb.RewriteRule{ + OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID), + NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID), + NewTimestamp: newTimeStamp, + }) + } + } else { dataRules = append(dataRules, &import_sstpb.RewriteRule{ - OldKeyPrefix: tablecodec.EncodeTableIndexPrefix(oldTableID, oldIndexID), - NewKeyPrefix: tablecodec.EncodeTableIndexPrefix(newTableID, newIndexID), + OldKeyPrefix: tablecodec.EncodeTablePrefix(oldTableID), + NewKeyPrefix: tablecodec.EncodeTablePrefix(newTableID), NewTimestamp: newTimeStamp, }) } @@ -307,7 +326,19 @@ func ValidateFileRewriteRule(file *backuppb.File, rewriteRules *RewriteRules) er return nil } -// Rewrites a raw key and returns a encoded key. +// Rewrites an encoded key and returns a encoded key. +func rewriteEncodedKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { + if rewriteRules == nil { + return key, nil + } + if len(key) > 0 { + _, rawKey, _ := codec.DecodeBytes(key, nil) + return rewriteRawKey(rawKey, rewriteRules) + } + return nil, nil +} + +// Rewrites a raw key with raw key rewrite rule and returns an encoded key. func rewriteRawKey(key []byte, rewriteRules *RewriteRules) ([]byte, *import_sstpb.RewriteRule) { if rewriteRules == nil { return codec.EncodeBytes([]byte{}, key), nil @@ -355,32 +386,43 @@ func SplitRanges( }) } -func findMatchedRewriteRule(file *backuppb.File, rules *RewriteRules) *import_sstpb.RewriteRule { +func findMatchedRewriteRule(file ApplyedFile, rules *RewriteRules) *import_sstpb.RewriteRule { startID := tablecodec.DecodeTableID(file.GetStartKey()) endID := tablecodec.DecodeTableID(file.GetEndKey()) if startID != endID { return nil } - _, rule := rewriteRawKey(file.StartKey, rules) + _, rule := rewriteRawKey(file.GetStartKey(), rules) + if rule == nil { + // fall back to encoded key + _, rule = rewriteEncodedKey(file.GetStartKey(), rules) + } return rule } -func rewriteFileKeys(file *backuppb.File, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { +func rewriteFileKeys(file ApplyedFile, rewriteRules *RewriteRules) (startKey, endKey []byte, err error) { startID := tablecodec.DecodeTableID(file.GetStartKey()) endID := tablecodec.DecodeTableID(file.GetEndKey()) var rule *import_sstpb.RewriteRule if startID == endID { startKey, rule = rewriteRawKey(file.GetStartKey(), rewriteRules) if rewriteRules != nil && rule == nil { - log.Error("cannot find rewrite rule", + // fall back to encoded key + log.Info("cannot find rewrite rule with raw key format", logutil.Key("startKey", file.GetStartKey()), zap.Reflect("rewrite data", rewriteRules.Data)) - err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for start key") + startKey, rule = rewriteEncodedKey(file.GetStartKey(), rewriteRules) + if rule == nil { + err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for start key") + } return } endKey, rule = rewriteRawKey(file.GetEndKey(), rewriteRules) if rewriteRules != nil && rule == nil { - err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for end key") + endKey, rule = rewriteEncodedKey(file.GetEndKey(), rewriteRules) + if rewriteRules != nil && rule == nil { + err = errors.Annotate(berrors.ErrRestoreInvalidRewrite, "cannot find rewrite rule for end key") + } return } } else { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 1fdd9be727684..cfe92e2646751 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -531,9 +531,10 @@ func NewMgr(ctx context.Context, // GetStorage gets the storage backend from the config. func GetStorage( ctx context.Context, + storageName string, cfg *Config, ) (*backuppb.StorageBackend, storage.ExternalStorage, error) { - u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) + u, err := storage.ParseBackend(storageName, &cfg.BackendOptions) if err != nil { return nil, nil, errors.Trace(err) } @@ -558,7 +559,7 @@ func ReadBackupMeta( fileName string, cfg *Config, ) (*backuppb.StorageBackend, storage.ExternalStorage, *backuppb.BackupMeta, error) { - u, s, err := GetStorage(ctx, cfg) + u, s, err := GetStorage(ctx, cfg.Storage, cfg) if err != nil { return nil, nil, nil, errors.Trace(err) } diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index a80549d005905..582c45bdd003b 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -280,7 +280,7 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf } } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, s, reader); err != nil { + if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index bb1dfddc9ce2d..0536c04dcef72 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -93,7 +93,7 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR return errors.Trace(err) } reader := metautil.NewMetaReader(backupMeta, s, &cfg.CipherInfo) - if err = client.InitBackupMeta(c, backupMeta, u, s, reader); err != nil { + if err = client.InitBackupMeta(c, backupMeta, u, reader); err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 5ab36436999a8..14b52579fd68a 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -29,6 +29,8 @@ import ( berrors "github.com/pingcap/tidb/br/pkg/errors" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" + "github.com/pingcap/tidb/br/pkg/metautil" + "github.com/pingcap/tidb/br/pkg/restore" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" "github.com/pingcap/tidb/br/pkg/summary" @@ -41,40 +43,56 @@ import ( ) const ( - flagStreamTaskName = "task-name" - flagStreamTaskNameDefault = "all" // used for get status for all of tasks. - flagStreamStartTS = "start-ts" - flagStreamEndTS = "end-ts" - flagGCSafePointTTS = "gc-ttl" + flagStreamTaskName = "task-name" + flagStreamTaskNameDefault = "all" // used for get status for all of tasks. + flagStreamStartTS = "start-ts" + flagStreamEndTS = "end-ts" + flagGCSafePointTTS = "gc-ttl" + flagStreamRestoreTS = "restore-ts" + flagStreamFullBackupStorage = "full-backup-storage" ) var ( - StreamStart = "stream start" - StreamStop = "stream stop" - StreamPause = "stream pause" - StreamResume = "stream resume" - StreamStatus = "stream status" + StreamStart = "stream start" + StreamStop = "stream stop" + StreamPause = "stream pause" + StreamResume = "stream resume" + StreamStatus = "stream status" + StreamRestore = "stream restore" ) var StreamCommandMap = map[string]func(c context.Context, g glue.Glue, cmdName string, cfg *StreamConfig) error{ - StreamStart: RunStreamStart, - StreamStop: RunStreamStop, - StreamPause: RunStreamPause, - StreamResume: RunStreamResume, - StreamStatus: RunStreamStatus, + StreamStart: RunStreamStart, + StreamStop: RunStreamStop, + StreamPause: RunStreamPause, + StreamResume: RunStreamResume, + StreamStatus: RunStreamStatus, + StreamRestore: RunStreamRestore, } // StreamConfig specifies the configure about backup stream type StreamConfig struct { // common part that all of stream commands need Config - TaskName string `json:"task-name" toml:"task-name"` - // startTs usually equals the tso of full-backup, but user can reset it + // FullBackupStorage is used to find the maps between table name and table id during restoration. + // if not specified. we cannot apply kv directly. + FullBackupStorage string `json:"full-backup-storage" toml:"full-backup-storage"` + TaskName string `json:"task-name" toml:"task-name"` + + // StartTs usually equals the tso of full-backup, but user can reset it StartTS uint64 `json:"start-ts" toml:"start-ts"` EndTS uint64 `json:"end-ts" toml:"end-ts"` // SafePointTTL ensures TiKV can scan entries not being GC at [startTS, currentTS] - SafePointTTL int64 `json:"saft-point-ttl" toml:"saft-point-ttl"` + SafePointTTL int64 `json:"safe-point-ttl" toml:"safe-point-ttl"` + RestoreTS uint64 `json:"restore-ts" toml:"restore-ts"` +} + +func (sc *StreamConfig) adjustRestoreConfig() { + sc.Config.adjust() + if sc.Concurrency == 0 { + sc.Concurrency = 32 + } } // DefineStreamStartFlags defines flags used for `stream start` @@ -88,6 +106,13 @@ func DefineStreamStartFlags(flags *pflag.FlagSet) { "the TTL (in seconds) that PD holds for BR's GC safepoint") } +// DefineStreamRestoreFlags defines flags used for `stream restore` +func DefineStreamRestoreFlags(flags *pflag.FlagSet) { + flags.String(flagStreamRestoreTS, "", "restore ts, used for restore kv.\n"+ + "support TSO or datetime, e.g. '400036290571534337' or '2018-05-11 01:42:23'") + flags.String(flagStreamFullBackupStorage, "", "find the maps between table id and table name") +} + // DefineStreamCommonFlags define common flags for `stream task` func DefineStreamCommonFlags(flags *pflag.FlagSet) { flags.String(flagStreamTaskName, "", "The task name for backup stream log.") @@ -99,6 +124,23 @@ func DefineStreamStatusCommonFlags(flags *pflag.FlagSet) { ) } +func (cfg *StreamConfig) ParseStreamRestoreFromFlags(flags *pflag.FlagSet) error { + tsString, err := flags.GetString(flagStreamRestoreTS) + if err != nil { + return errors.Trace(err) + } + if cfg.RestoreTS, err = ParseTSString(tsString); err != nil { + return errors.Trace(err) + } + if cfg.FullBackupStorage, err = flags.GetString(flagStreamFullBackupStorage); err != nil { + return errors.Trace(err) + } + if len(cfg.FullBackupStorage) == 0 { + return errors.New("must specify full backup storage.") + } + return nil +} + // ParseStreamStartFromFlags parse parameters for `stream start` func (cfg *StreamConfig) ParseStreamStartFromFlags(flags *pflag.FlagSet) error { tsString, err := flags.GetString(flagStreamStartTS) @@ -130,12 +172,9 @@ func (cfg *StreamConfig) ParseStreamStartFromFlags(flags *pflag.FlagSet) error { return nil } -// ParseCommonFromFlags parse parameters for `stream task` -func (cfg *StreamConfig) ParseCommonFromFlags(flags *pflag.FlagSet) error { +// ParseStreamCommonFromFlags parse parameters for `stream task` +func (cfg *StreamConfig) ParseStreamCommonFromFlags(flags *pflag.FlagSet) error { var err error - if err = cfg.Config.ParseFromFlags(flags); err != nil { - return errors.Trace(err) - } cfg.TaskName, err = flags.GetString(flagStreamTaskName) if err != nil { @@ -167,12 +206,11 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, needStora } }() - client, err := backup.NewBackupClient(ctx, mgr) - if err != nil { - return nil, errors.Trace(err) - } - // just stream start need Storage + s := &streamMgr{ + Cfg: cfg, + mgr: mgr, + } if needStorage { backend, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) if err != nil { @@ -184,15 +222,15 @@ func NewStreamMgr(ctx context.Context, cfg *StreamConfig, g glue.Glue, needStora SendCredentials: cfg.SendCreds, SkipCheckPath: cfg.SkipCheckPath, } - if err = client.SetStorage(ctx, backend, &opts); err != nil { + client, err := backup.NewBackupClient(ctx, mgr) + if err != nil { return nil, errors.Trace(err) } - } - s := &streamMgr{ - Cfg: cfg, - mgr: mgr, - bc: client, + if err = client.SetStorage(ctx, backend, &opts); err != nil { + return nil, errors.Trace(err) + } + s.bc = client } return s, nil } @@ -258,7 +296,7 @@ func (s *streamMgr) setGCSafePoint(ctx context.Context) error { } func (s *streamMgr) getTS(ctx context.Context) (uint64, error) { - p, l, err := s.mgr.PdController.GetPDClient().GetTS(ctx) + p, l, err := s.mgr.GetPDClient().GetTS(ctx) if err != nil { return 0, errors.Trace(err) } @@ -506,7 +544,7 @@ func RunStreamStatus( if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan( - "task.RunStreamStatusRunStreamStatusRunStreamStatus", + "task.RunStreamStatus", opentracing.ChildOf(span.Context()), ) defer span1.Finish() @@ -544,3 +582,153 @@ func RunStreamStatus( } return nil } + +// RunStreamRestore start restore job +func RunStreamRestore( + c context.Context, + g glue.Glue, + cmdName string, + cfg *StreamConfig, +) error { + cfg.adjustRestoreConfig() + + ctx, cancelFn := context.WithCancel(c) + defer cancelFn() + + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan( + "task.RunStreamRestore", + opentracing.ChildOf(span.Context()), + ) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + streamMgr, err := NewStreamMgr(ctx, cfg, g, false) + if err != nil { + return errors.Trace(err) + } + defer streamMgr.close() + + keepaliveCfg := GetKeepalive(&cfg.Config) + keepaliveCfg.PermitWithoutStream = true + client, err := restore.NewRestoreClient(g, streamMgr.mgr.GetPDClient(), streamMgr.mgr.GetStorage(), streamMgr.mgr.GetTLSConfig(), keepaliveCfg) + if err != nil { + return errors.Trace(err) + } + defer client.Close() + + u, err := storage.ParseBackend(cfg.Storage, &cfg.BackendOptions) + if err != nil { + return errors.Trace(err) + } + opts := storage.ExternalStorageOptions{ + NoCredentials: cfg.NoCreds, + SendCredentials: cfg.SendCreds, + SkipCheckPath: cfg.SkipCheckPath, + } + + if err = client.SetStorage(ctx, u, &opts); err != nil { + return errors.Trace(err) + } + client.SetRateLimit(cfg.RateLimit) + client.SetCrypter(&cfg.CipherInfo) + client.SetConcurrency(uint(cfg.Concurrency)) + client.SetSwitchModeInterval(cfg.SwitchModeInterval) + err = client.LoadRestoreStores(ctx) + if err != nil { + return errors.Trace(err) + } + + client.InitClients(u, false) + + currentTs, err := streamMgr.getTS(ctx) + if err != nil { + return errors.Trace(err) + } + + if cfg.RestoreTS == 0 { + cfg.RestoreTS = currentTs + } + log.Info("start restore on point", zap.Uint64("ts", cfg.RestoreTS)) + + // get full backup meta to generate rewrite rules. + fullBackupTables, err := initFullBackupTables(ctx, cfg.FullBackupStorage, cfg) + if err != nil { + return errors.Trace(err) + } + rewriteRules, err := initRewriteRules(client, fullBackupTables) + if err != nil { + return errors.Trace(err) + } + + // read meta by given ts. + metas, err := client.ReadStreamMetaByTS(ctx, cfg.RestoreTS) + if err != nil { + return errors.Trace(err) + } + if len(metas) == 0 { + log.Info("nothing to restore.") + return nil + } + // read data file by given ts. + datas, err := client.ReadStreamDataFiles(ctx, metas, cfg.RestoreTS) + if err != nil { + return errors.Trace(err) + } + + // TODO split put and delete files + // perform restore kv files + return client.RestoreKVFiles(ctx, rewriteRules, datas) +} + +func initFullBackupTables(ctx context.Context, fullBackupStorage string, cfg *StreamConfig) (map[string]*metautil.Table, error) { + _, s, err := GetStorage(ctx, fullBackupStorage, &cfg.Config) + if err != nil { + return nil, errors.Trace(err) + } + metaData, err := s.ReadFile(ctx, metautil.MetaFile) + if err != nil { + return nil, errors.Trace(err) + } + backupMeta := &backuppb.BackupMeta{} + err = backupMeta.Unmarshal(metaData) + if err != nil { + return nil, errors.Trace(err) + } + reader := metautil.NewMetaReader(backupMeta, s, nil) + + // read full backup databases to get map[table]table.Info + databases, err := utils.LoadBackupTables(ctx, reader) + if err != nil { + return nil, errors.Trace(err) + } + tables := make(map[string]*metautil.Table) + for _, db := range databases { + dbName := db.Info.Name.O + if name, ok := utils.GetSysDBName(db.Info.Name); utils.IsSysDB(name) && ok { + dbName = name + } + for _, table := range db.Tables { + if !cfg.TableFilter.MatchTable(dbName, table.Info.Name.O) { + continue + } + tables[utils.UniqueID(dbName, table.Info.Name.String())] = table + } + } + return tables, nil + +} + +func initRewriteRules(client *restore.Client, tables map[string]*metautil.Table) (map[int64]*restore.RewriteRules, error) { + // compare table exists in cluster and map[table]table.Info to get rewrite rules. + rules := make(map[int64]*restore.RewriteRules) + for _, t := range tables { + newTableInfo, err := client.GetTableSchema(client.GetDomain(), t.DB.Name, t.Info.Name) + if err != nil { + return nil, errors.Trace(err) + } + // we don't handle index rule in pitr. since we only support pitr on non-exists table. + rules[t.Info.ID] = restore.GetRewriteRules(newTableInfo, t.Info, 0, false) + } + return rules, nil +} diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 5a21ad8f26a98..8bb9ba0317ca5 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -60,7 +60,7 @@ func (bo *importerBackoffer) NextBackoff(err error) time.Duration { } else { e := errors.Cause(err) switch e { // nolint:errorlint - case berrors.ErrKVEpochNotMatch, berrors.ErrKVDownloadFailed, berrors.ErrKVIngestFailed: + case berrors.ErrKVEpochNotMatch, berrors.ErrKVDownloadFailed, berrors.ErrKVIngestFailed, berrors.ErrPDLeaderNotFound: bo.delayTime = 2 * bo.delayTime bo.attempt-- case berrors.ErrKVRangeIsEmpty, berrors.ErrKVRewriteRuleNotFound: diff --git a/br/pkg/utils/schema.go b/br/pkg/utils/schema.go index f6374576f43ed..571bfd86bac8a 100644 --- a/br/pkg/utils/schema.go +++ b/br/pkg/utils/schema.go @@ -114,3 +114,13 @@ func GetSysDBName(tempDB model.CIStr) (string, bool) { } return tempDB.O[len(temporaryDBNamePrefix):], true } + +func UniqueID(schema string, table string) string { + // QuoteSchema quotes a full table name + return fmt.Sprintf("`%s`.`%s`", EscapeName(schema), EscapeName(table)) +} + +// EscapeName replaces all "`" in name with "``" +func EscapeName(name string) string { + return strings.Replace(name, "`", "``", -1) +} diff --git a/go.mod b/go.mod index f626c01b4ddd3..1bc9fcaf2fccb 100644 --- a/go.mod +++ b/go.mod @@ -103,4 +103,4 @@ replace github.com/pingcap/tidb/parser => ./parser // fix potential security issue(CVE-2020-26160) introduced by indirect dependency. replace github.com/dgrijalva/jwt-go => github.com/form3tech-oss/jwt-go v3.2.6-0.20210809144907-32ab6a8243d7+incompatible -replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20211206061549-14a3e9888bd3 +replace github.com/pingcap/kvproto => github.com/pingcap/kvproto v0.0.0-20220121033106-11c47a2c2f8e diff --git a/go.sum b/go.sum index 59030af41f80a..79d6acf2daf5b 100644 --- a/go.sum +++ b/go.sum @@ -576,8 +576,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059/go.mod h1:fMRU1BA1y+r89AxUoaAar4JjrhUkVDt0o0Np6V8XbDQ= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= -github.com/pingcap/kvproto v0.0.0-20211206061549-14a3e9888bd3 h1:t+8v9l4pz8Iu15ZT7EupxzyletI7sL0vN6Vz7fkeeUA= -github.com/pingcap/kvproto v0.0.0-20211206061549-14a3e9888bd3/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220121033106-11c47a2c2f8e h1:nqImXVDSjM5gXpNK8Goc5yDYzp7djfAx+IohMUOGHnc= +github.com/pingcap/kvproto v0.0.0-20220121033106-11c47a2c2f8e/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=