Skip to content

Commit

Permalink
br-stream: implement restore command (#31821)
Browse files Browse the repository at this point in the history
* add restore command

* coarse implement restore
  • Loading branch information
3pointer authored Feb 11, 2022
1 parent e8c1fcf commit 66ba596
Show file tree
Hide file tree
Showing 15 changed files with 594 additions and 87 deletions.
4 changes: 2 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0, true)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Info.ID] = int64(tableID)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func encodeBackupMetaCommand() *cobra.Command {
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return errors.Trace(err)
}
_, s, err := task.GetStorage(ctx, &cfg)
_, s, err := task.GetStorage(ctx, cfg.Storage, &cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
NewRestoreCommand(),
NewStreamCommand(),
)
// Ouputs cmd.Print to stdout.
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)

rootCmd.SetArgs(os.Args[1:])
Expand Down
50 changes: 41 additions & 9 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func NewStreamCommand() *cobra.Command {
newStreamStartCommand(),
newStreamStopCommand(),
newStreamPauseCommand(),
newStreamResumeComamnd(),
newStreamStatusComamnd(),
newStreamResumeCommand(),
newStreamStatusCommand(),
newStreamRestoreCommand(),
)
return command
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func newStreamPauseCommand() *cobra.Command {
return command
}

func newStreamResumeComamnd() *cobra.Command {
func newStreamResumeCommand() *cobra.Command {
command := &cobra.Command{
Use: "resume",
Short: "resume a stream task",
Expand All @@ -108,7 +109,7 @@ func newStreamResumeComamnd() *cobra.Command {
return command
}

func newStreamStatusComamnd() *cobra.Command {
func newStreamStatusCommand() *cobra.Command {
command := &cobra.Command{
Use: "status",
Short: "get status of a stream task",
Expand All @@ -122,19 +123,50 @@ func newStreamStatusComamnd() *cobra.Command {
return command
}

// TODO maybe we should use `br restore stream` rather than `br stream restore`
// because the restore and stream task has no common flags.
func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "restore",
Short: "restore a stream backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamRestore)
},
}
task.DefineFilterFlags(command, acceptAllTables)
task.DefineStreamRestoreFlags(command.Flags())
return command
}

func streamCommand(command *cobra.Command, cmdName string) error {
var cfg task.StreamConfig
if err := cfg.ParseCommonFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
var err error
defer func() {
if err != nil {
command.SilenceUsage = false
}
}()
if err = cfg.Config.ParseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}

if cmdName == task.StreamStart {
if err := cfg.ParseStreamStartFromFlags(command.Flags()); err != nil {
switch cmdName {
case task.StreamRestore:
if err = cfg.ParseStreamRestoreFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamStart:
if err = cfg.ParseStreamStartFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
// TODO use `br restore stream` rather than `br stream restore`
fallthrough
default:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
}

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
Expand Down
127 changes: 116 additions & 11 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
return nil
}

func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}

// GetPDClient returns a pd client.
func (rc *Client) GetPDClient() pd.Client {
return rc.pdClient
Expand All @@ -174,13 +178,19 @@ func (rc *Client) Close() {
log.Info("Restore client closed")
}

func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) {
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rateLimit)
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
func (rc *Client) InitBackupMeta(
c context.Context,
backupMeta *backuppb.BackupMeta,
backend *backuppb.StorageBackend,
externalStorage storage.ExternalStorage,
reader *metautil.MetaReader) error {

if !backupMeta.IsRawKv {
databases, err := utils.LoadBackupTables(c, reader)
if err != nil {
Expand All @@ -203,11 +213,9 @@ func (rc *Client) InitBackupMeta(
rc.ddlJobs = ddlJobs
}
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
rc.InitClients(backend, backupMeta.IsRawKv)
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
}

Expand Down Expand Up @@ -428,7 +436,7 @@ func (rc *Client) createTable(
table.Info.IsCommonHandle,
newTableInfo.IsCommonHandle)
}
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS, true)
et := CreatedTable{
RewriteRule: rules,
Table: newTableInfo,
Expand Down Expand Up @@ -595,8 +603,8 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F
return files[:idx], files[idx:]
}

// RestoreFiles tries to restore the files.
func (rc *Client) RestoreFiles(
// RestoreSSTFiles tries to restore the files.
func (rc *Client) RestoreSSTFiles(
ctx context.Context,
files []*backuppb.File,
rewriteRules *RewriteRules,
Expand All @@ -614,7 +622,7 @@ func (rc *Client) RestoreFiles(
log.Debug("start to restore files", zap.Int("files", len(files)))

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreFiles", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
Expand All @@ -637,7 +645,7 @@ func (rc *Client) RestoreFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher)
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher)
})
}

Expand Down Expand Up @@ -678,7 +686,7 @@ func (rc *Client) RestoreRaw(
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher)
return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher)
})
}
if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -1198,6 +1206,103 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

const (
streamBackupMetaPrefix = "v1_backupmeta"
)

// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) {
streamBackupMetaFiles := make([]*backuppb.Metadata, 0)
err := rc.storage.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
if strings.Contains(path, streamBackupMetaPrefix) {
m := &backuppb.Metadata{}
b, err := rc.storage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
err = m.Unmarshal(b)
if err != nil {
return errors.Trace(err)
}
// TODO find a way to filter some unnecessary meta files.
log.Debug("backup stream collect meta file", zap.String("file", path))
streamBackupMetaFiles = append(streamBackupMetaFiles, m)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return streamBackupMetaFiles, nil
}

// ReadStreamDataFiles is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamDataFiles(ctx context.Context, metas []*backuppb.Metadata, restoreTS uint64) ([]*backuppb.DataFileInfo, error) {
streamBackupDataFiles := make([]*backuppb.DataFileInfo, 0)
for _, m := range metas {
for _, d := range m.Files {
if d.MinTs > restoreTS {
continue
}
streamBackupDataFiles = append(streamBackupDataFiles, d)
log.Debug("backup stream collect data file", zap.String("file", d.Path))
}
}
return streamBackupDataFiles, nil
}

func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRules, files []*backuppb.DataFileInfo) error {
var err error
start := time.Now()
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore KV files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", len(files), elapsed)
}
}()

log.Debug("start to restore files", zap.Int("files", len(files)))

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

eg, ectx := errgroup.WithContext(ctx)
for _, file := range files {
filesReplica := file
// get rewrite rule from table id
rule, ok := rules[filesReplica.TableId]
if !ok {
// TODO handle new created table
// For this version we do not handle new created table after full backup.
// in next version we will perform rewrite and restore meta key to restore new created tables.
// so we can simply skip the file that doesn't have the rule here.
log.Info("skip file due to table id not matched", zap.String("file", file.Path))
continue
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
fileStart := time.Now()
defer func() {
log.Debug("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, filesReplica, rule)
})
}

if err = eg.Wait(); err != nil {
summary.CollectFailureUnit("file", err)
log.Error(
"restore files failed",
zap.Error(err),
)
return errors.Trace(err)
}
return nil
}

func transferBoolToValue(enable bool) string {
if enable {
return "ON"
Expand Down
Loading

0 comments on commit 66ba596

Please sign in to comment.