Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br-stream: implement restore command #31821

Merged
merged 30 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0, true)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Info.ID] = int64(tableID)
}
Expand Down Expand Up @@ -322,7 +322,7 @@ func encodeBackupMetaCommand() *cobra.Command {
if err := cfg.ParseFromFlags(cmd.Flags()); err != nil {
return errors.Trace(err)
}
_, s, err := task.GetStorage(ctx, &cfg)
_, s, err := task.GetStorage(ctx, cfg.Storage, &cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/cmd/br/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
NewRestoreCommand(),
NewStreamCommand(),
)
// Ouputs cmd.Print to stdout.
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)

rootCmd.SetArgs(os.Args[1:])
Expand Down
50 changes: 41 additions & 9 deletions br/cmd/br/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ func NewStreamCommand() *cobra.Command {
newStreamStartCommand(),
newStreamStopCommand(),
newStreamPauseCommand(),
newStreamResumeComamnd(),
newStreamStatusComamnd(),
newStreamResumeCommand(),
newStreamStatusCommand(),
newStreamRestoreCommand(),
)
return command
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func newStreamPauseCommand() *cobra.Command {
return command
}

func newStreamResumeComamnd() *cobra.Command {
func newStreamResumeCommand() *cobra.Command {
command := &cobra.Command{
Use: "resume",
Short: "resume a stream task",
Expand All @@ -108,7 +109,7 @@ func newStreamResumeComamnd() *cobra.Command {
return command
}

func newStreamStatusComamnd() *cobra.Command {
func newStreamStatusCommand() *cobra.Command {
command := &cobra.Command{
Use: "status",
Short: "get status of a stream task",
Expand All @@ -122,19 +123,50 @@ func newStreamStatusComamnd() *cobra.Command {
return command
}

// TODO maybe we should use `br restore stream` rather than `br stream restore`
// because the restore and stream task has no common flags.
func newStreamRestoreCommand() *cobra.Command {
command := &cobra.Command{
Use: "restore",
Short: "restore a stream backups",
Args: cobra.NoArgs,
RunE: func(command *cobra.Command, _ []string) error {
return streamCommand(command, task.StreamRestore)
},
}
task.DefineFilterFlags(command, acceptAllTables)
task.DefineStreamRestoreFlags(command.Flags())
return command
}

func streamCommand(command *cobra.Command, cmdName string) error {
var cfg task.StreamConfig
if err := cfg.ParseCommonFromFlags(command.Flags()); err != nil {
command.SilenceUsage = false
var err error
defer func() {
if err != nil {
command.SilenceUsage = false
}
}()
if err = cfg.Config.ParseFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}

if cmdName == task.StreamStart {
if err := cfg.ParseStreamStartFromFlags(command.Flags()); err != nil {
switch cmdName {
case task.StreamRestore:
if err = cfg.ParseStreamRestoreFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
case task.StreamStart:
if err = cfg.ParseStreamStartFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
// TODO use `br restore stream` rather than `br stream restore`
fallthrough
default:
if err = cfg.ParseStreamCommonFromFlags(command.Flags()); err != nil {
return errors.Trace(err)
}
}

ctx := GetDefaultContext()
if cfg.EnableOpenTracing {
var store *appdash.MemoryStore
Expand Down
127 changes: 116 additions & 11 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ func (rc *Client) SetStorage(ctx context.Context, backend *backuppb.StorageBacke
return nil
}

func (rc *Client) GetDomain() *domain.Domain {
return rc.dom
}

// GetPDClient returns a pd client.
func (rc *Client) GetPDClient() pd.Client {
return rc.pdClient
Expand All @@ -174,13 +178,19 @@ func (rc *Client) Close() {
log.Info("Restore client closed")
}

func (rc *Client) InitClients(backend *backuppb.StorageBackend, isRawKvMode bool) {
metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, isRawKvMode, rc.rateLimit)
}

// InitBackupMeta loads schemas from BackupMeta to initialize RestoreClient.
func (rc *Client) InitBackupMeta(
c context.Context,
backupMeta *backuppb.BackupMeta,
backend *backuppb.StorageBackend,
externalStorage storage.ExternalStorage,
reader *metautil.MetaReader) error {

if !backupMeta.IsRawKv {
databases, err := utils.LoadBackupTables(c, reader)
if err != nil {
Expand All @@ -203,11 +213,9 @@ func (rc *Client) InitBackupMeta(
rc.ddlJobs = ddlJobs
}
rc.backupMeta = backupMeta
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))

metaClient := NewSplitClient(rc.pdClient, rc.tlsConf)
importCli := NewImportClient(metaClient, rc.tlsConf, rc.keepaliveConf)
rc.fileImporter = NewFileImporter(metaClient, importCli, backend, rc.backupMeta.IsRawKv, rc.rateLimit)
rc.InitClients(backend, backupMeta.IsRawKv)
log.Info("load backupmeta", zap.Int("databases", len(rc.databases)), zap.Int("jobs", len(rc.ddlJobs)))
return rc.fileImporter.CheckMultiIngestSupport(c, rc.pdClient)
}

Expand Down Expand Up @@ -428,7 +436,7 @@ func (rc *Client) createTable(
table.Info.IsCommonHandle,
newTableInfo.IsCommonHandle)
}
rules := GetRewriteRules(newTableInfo, table.Info, newTS)
rules := GetRewriteRules(newTableInfo, table.Info, newTS, true)
et := CreatedTable{
RewriteRule: rules,
Table: newTableInfo,
Expand Down Expand Up @@ -595,8 +603,8 @@ func drainFilesByRange(files []*backuppb.File, supportMulti bool) ([]*backuppb.F
return files[:idx], files[idx:]
}

// RestoreFiles tries to restore the files.
func (rc *Client) RestoreFiles(
// RestoreSSTFiles tries to restore the files.
func (rc *Client) RestoreSSTFiles(
ctx context.Context,
files []*backuppb.File,
rewriteRules *RewriteRules,
Expand All @@ -614,7 +622,7 @@ func (rc *Client) RestoreFiles(
log.Debug("start to restore files", zap.Int("files", len(files)))

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreFiles", opentracing.ChildOf(span.Context()))
span1 := span.Tracer().StartSpan("Client.RestoreSSTFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}
Expand All @@ -637,7 +645,7 @@ func (rc *Client) RestoreFiles(
zap.Duration("take", time.Since(fileStart)))
updateCh.Inc()
}()
return rc.fileImporter.Import(ectx, filesReplica, rewriteRules, rc.cipher)
return rc.fileImporter.ImportSSTFiles(ectx, filesReplica, rewriteRules, rc.cipher)
})
}

Expand Down Expand Up @@ -678,7 +686,7 @@ func (rc *Client) RestoreRaw(
rc.workerPool.ApplyOnErrorGroup(eg,
func() error {
defer updateCh.Inc()
return rc.fileImporter.Import(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher)
return rc.fileImporter.ImportSSTFiles(ectx, []*backuppb.File{fileReplica}, EmptyRewriteRule(), rc.cipher)
})
}
if err := eg.Wait(); err != nil {
Expand Down Expand Up @@ -1198,6 +1206,103 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

const (
streamBackupMetaPrefix = "v1_backupmeta"
)

// ReadStreamMetaByTS is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamMetaByTS(ctx context.Context, restoreTS uint64) ([]*backuppb.Metadata, error) {
streamBackupMetaFiles := make([]*backuppb.Metadata, 0)
err := rc.storage.WalkDir(ctx, &storage.WalkOption{}, func(path string, size int64) error {
if strings.Contains(path, streamBackupMetaPrefix) {
m := &backuppb.Metadata{}
b, err := rc.storage.ReadFile(ctx, path)
if err != nil {
return errors.Trace(err)
}
err = m.Unmarshal(b)
if err != nil {
return errors.Trace(err)
}
// TODO find a way to filter some unnecessary meta files.
log.Debug("backup stream collect meta file", zap.String("file", path))
streamBackupMetaFiles = append(streamBackupMetaFiles, m)
}
return nil
})
if err != nil {
return nil, errors.Trace(err)
}
return streamBackupMetaFiles, nil
}

// ReadStreamDataFiles is used for streaming task. collect all meta file by TS.
func (rc *Client) ReadStreamDataFiles(ctx context.Context, metas []*backuppb.Metadata, restoreTS uint64) ([]*backuppb.DataFileInfo, error) {
streamBackupDataFiles := make([]*backuppb.DataFileInfo, 0)
for _, m := range metas {
for _, d := range m.Files {
if d.MinTs > restoreTS {
continue
}
streamBackupDataFiles = append(streamBackupDataFiles, d)
log.Debug("backup stream collect data file", zap.String("file", d.Path))
}
}
return streamBackupDataFiles, nil
}

func (rc *Client) RestoreKVFiles(ctx context.Context, rules map[int64]*RewriteRules, files []*backuppb.DataFileInfo) error {
var err error
start := time.Now()
defer func() {
elapsed := time.Since(start)
if err == nil {
log.Info("Restore KV files", zap.Duration("take", elapsed))
summary.CollectSuccessUnit("files", len(files), elapsed)
}
}()

log.Debug("start to restore files", zap.Int("files", len(files)))

if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("Client.RestoreKVFiles", opentracing.ChildOf(span.Context()))
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

eg, ectx := errgroup.WithContext(ctx)
for _, file := range files {
filesReplica := file
// get rewrite rule from table id
rule, ok := rules[filesReplica.TableId]
if !ok {
// TODO handle new created table
// For this version we do not handle new created table after full backup.
// in next version we will perform rewrite and restore meta key to restore new created tables.
// so we can simply skip the file that doesn't have the rule here.
log.Info("skip file due to table id not matched", zap.String("file", file.Path))
continue
}
rc.workerPool.ApplyOnErrorGroup(eg, func() error {
fileStart := time.Now()
defer func() {
log.Debug("import files done", zap.String("name", file.Path), zap.Duration("take", time.Since(fileStart)))
}()
return rc.fileImporter.ImportKVFiles(ectx, filesReplica, rule)
})
}

if err = eg.Wait(); err != nil {
summary.CollectFailureUnit("file", err)
log.Error(
"restore files failed",
zap.Error(err),
)
return errors.Trace(err)
}
return nil
}

func transferBoolToValue(enable bool) string {
if enable {
return "ON"
Expand Down
Loading