From b0b03a2ba3dacac545b48ba0a7c29cd7250a35ff Mon Sep 17 00:00:00 2001 From: Omar Ramos Date: Fri, 27 Sep 2024 06:46:06 -0700 Subject: [PATCH] Added ability to show extra details during restore process When working with a larger restore using the `restore-dump` command by default there is only the "Restoring database ..." indicator by default which provides minimal insight into how things are progressing. On the other hand, if you choose to add the `--debug` flag the amount of information can then be too much as the large `INSERT` queries are then also included in the output. The new `--show-details` flag optionally allows for users to see how things are going with their restore more easily. Additionally, a `--start-from` flag was added that can be provided with a table name. This allows a user to skip earlier tables and start the import from a later point easily without having to create a separate copy of their dump folder and manually remove those files. The new `--allow-different-destination` flag primarily helps with simplifying the process of taking a folder that was created for one database, e.g. the files contain a prefix for "first-database", and allows you to restore into a second database without having to adjust the database prefix on all of those existing files. Also incorporated a check to prevent an issue for custom generated files where users might accidentally provide an `INSERT` query that is larger than 16777216 bytes. This provides some useful feedback rather than the `pkt` error that was being received previously. --- internal/cmd/database/restore.go | 32 ++++-- internal/dumper/dumper.go | 55 ++++++----- internal/dumper/loader.go | 164 ++++++++++++++++++++++++++----- 3 files changed, 190 insertions(+), 61 deletions(-) diff --git a/internal/cmd/database/restore.go b/internal/cmd/database/restore.go index c35410f0..a0ca4a01 100644 --- a/internal/cmd/database/restore.go +++ b/internal/cmd/database/restore.go @@ -18,11 +18,14 @@ import ( ) type restoreFlags struct { - localAddr string - remoteAddr string - dir string - overwrite bool - threads int + localAddr string + remoteAddr string + dir string + overwrite bool + showDetails bool + startFrom string + allowDifferentDestination bool + threads int } // RestoreCmd encapsulates the commands for restore a database @@ -42,7 +45,10 @@ func RestoreCmd(ch *cmdutil.Helper) *cobra.Command { cmd.PersistentFlags().StringVar(&f.dir, "dir", "", "Directory containing the files to be used for the restore (required)") cmd.PersistentFlags().BoolVar(&f.overwrite, "overwrite-tables", false, "If true, will attempt to DROP TABLE before restoring.") - + cmd.PersistentFlags().BoolVar(&f.showDetails, "show-details", false, "If true, will add extra output during the restore process.") + cmd.PersistentFlags().StringVar(&f.startFrom, "start-from", "", + "Table to start from for the restore (useful for restarting from a certain point)") + cmd.PersistentFlags().BoolVar(&f.allowDifferentDestination, "allow-different-destination", false, "If true, will allow you to restore the files to a database with a different name without needing to rename the existing dump's files.") cmd.PersistentFlags().IntVar(&f.threads, "threads", 1, "Number of concurrent threads to use to restore the database.") return cmd } @@ -151,20 +157,30 @@ func restore(ch *cmdutil.Helper, cmd *cobra.Command, flags *restoreFlags, args [ cfg.IntervalMs = 10 * 1000 cfg.Outdir = flags.dir cfg.OverwriteTables = flags.overwrite + cfg.ShowDetails = flags.showDetails + cfg.AllowDifferentDestination = flags.allowDifferentDestination + cfg.Database = database // Needs to be passed in to allow for allowDifferentDestination flag to work + cfg.StartFrom = flags.startFrom loader, err := dumper.NewLoader(cfg) if err != nil { return err } + end := func() {} + ch.Printer.Printf("Starting to restore database %s from folder %s\n", printer.BoldBlue(database), printer.BoldBlue(flags.dir)) - end := ch.Printer.PrintProgress("Restoring database ...") + if flags.showDetails { + ch.Printer.Println("Restoring database ...") + } else { + end = ch.Printer.PrintProgress("Restoring database ...\n") + } defer end() start := time.Now() - err = loader.Run(ctx) + err = loader.Run(ctx, ch) if err != nil { return fmt.Errorf("failed to restore database: %s", err) } diff --git a/internal/dumper/dumper.go b/internal/dumper/dumper.go index d651d61e..65d84a02 100644 --- a/internal/dumper/dumper.go +++ b/internal/dumper/dumper.go @@ -22,32 +22,35 @@ const VITESS_GHOST_TABLE_REGEX = "_vt_EVAC_.*|_vt_DROP_.*|_vt_PURGE_.*|_vt_HOLD_ // Config describes the settings to dump from a database. type Config struct { - User string - Password string - Address string - ToUser string - ToPassword string - ToAddress string - ToDatabase string - ToEngine string - Database string - DatabaseRegexp string - DatabaseInvertRegexp bool - Shard string - Table string - Outdir string - SessionVars []string - Threads int - ChunksizeInMB int - StmtSize int - Allbytes uint64 - Allrows uint64 - OverwriteTables bool - UseReplica bool - UseRdonly bool - Wheres map[string]string - Selects map[string]map[string]string - Filters map[string]map[string]string + User string + Password string + Address string + ToUser string + ToPassword string + ToAddress string + ToDatabase string + ToEngine string + Database string + DatabaseRegexp string + DatabaseInvertRegexp bool + Shard string + Table string + Outdir string + SessionVars []string + Threads int + ChunksizeInMB int + StmtSize int + Allbytes uint64 + Allrows uint64 + OverwriteTables bool + ShowDetails bool + StartFrom string + AllowDifferentDestination bool + UseReplica bool + UseRdonly bool + Wheres map[string]string + Selects map[string]map[string]string + Filters map[string]map[string]string // Interval in millisecond. IntervalMs int diff --git a/internal/dumper/loader.go b/internal/dumper/loader.go index 3298dcc1..c4c48539 100644 --- a/internal/dumper/loader.go +++ b/internal/dumper/loader.go @@ -2,8 +2,8 @@ package dumper import ( "context" + "errors" "fmt" - "math/rand" "os" "path/filepath" "strings" @@ -11,6 +11,7 @@ import ( "time" "github.com/planetscale/cli/internal/cmdutil" + "github.com/planetscale/cli/internal/printer" "golang.org/x/sync/errgroup" "go.uber.org/zap" @@ -42,54 +43,73 @@ func NewLoader(cfg *Config) (*Loader, error) { } // Run used to start the loader worker. -func (l *Loader) Run(ctx context.Context) error { +func (l *Loader) Run(ctx context.Context, ch *cmdutil.Helper) error { pool, err := NewPool(l.log, l.cfg.Threads, l.cfg.Address, l.cfg.User, l.cfg.Password, l.cfg.SessionVars, "") if err != nil { return err } defer pool.Close() - files, err := l.loadFiles(l.cfg.Outdir) + if l.cfg.ShowDetails && l.cfg.AllowDifferentDestination { + ch.Printer.Println("The allow different destination option is enabled for this restore.") + ch.Printer.Printf("Files that do not begin with the provided database name of %s will still be processed without having to rename them first.\n", printer.BoldBlue(l.cfg.Database)) + } + + files, err := l.loadFiles(l.cfg.Outdir, ch, l.cfg.ShowDetails, l.cfg.StartFrom) if err != nil { return err } // database. conn := pool.Get() - if err := l.restoreDatabaseSchema(files.databases, conn); err != nil { + if err := l.restoreDatabaseSchema(files.databases, conn, ch, l.cfg.ShowDetails); err != nil { return err } pool.Put(conn) // tables. conn = pool.Get() - if err := l.restoreTableSchema(l.cfg.OverwriteTables, files.schemas, conn); err != nil { + if err := l.restoreTableSchema(l.cfg.OverwriteTables, files.schemas, conn, ch, l.cfg.ShowDetails, l.cfg.StartFrom); err != nil { return err } pool.Put(conn) + // Commenting out to add some predictability to the order in which data files will be processed: // Shuffle the tables - for i := range files.tables { - j := rand.Intn(i + 1) - files.tables[i], files.tables[j] = files.tables[j], files.tables[i] - } + //for i := range files.tables { + // j := rand.Intn(i + 1) + // files.tables[i], files.tables[j] = files.tables[j], files.tables[i] + //} var eg errgroup.Group var bytes uint64 t := time.Now() - for _, table := range files.tables { + numberOfDataFiles := len(files.tables) + + for idx, table := range files.tables { table := table conn := pool.Get() eg.Go(func() error { defer pool.Put(conn) - r, err := l.restoreTable(table, conn) + if l.cfg.ShowDetails { + ch.Printer.Printf("%s: %s in thread %s (File %d of %d)\n", printer.BoldGreen("Started Processing Data File"), printer.BoldBlue(filepath.Base(table)), printer.BoldBlue(conn.ID), (idx + 1), numberOfDataFiles) + } + fileProcessingTimeStart := time.Now() + r, err := l.restoreTable(table, conn, ch, l.cfg.ShowDetails) + if err != nil { return err } + fileProcessingTimeFinish := time.Since(fileProcessingTimeStart) + timeElapsedSofar := time.Since(t) + if l.cfg.ShowDetails { + ch.Printer.Printf("%s: %s in %s with %s elapsed so far (File %d of %d)\n", printer.BoldGreen("Finished Processing Data File"), printer.BoldBlue(filepath.Base(table)), printer.BoldBlue(fileProcessingTimeFinish), printer.BoldBlue(timeElapsedSofar), (idx + 1), numberOfDataFiles) + } + atomic.AddUint64(&bytes, uint64(r)) return nil }) @@ -127,22 +147,42 @@ func (l *Loader) Run(ctx context.Context) error { return nil } -func (l *Loader) loadFiles(dir string) (*Files, error) { +func (l *Loader) loadFiles(dir string, ch *cmdutil.Helper, showDetails bool, startFrom string) (*Files, error) { files := &Files{} + if showDetails { + ch.Printer.Println("Collecting files from folder " + printer.BoldBlue(dir)) + } + if err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { if err != nil { return fmt.Errorf("loader.file.walk.error:%+v", err) } if !info.IsDir() { + tbl := tableNameFromFilename(path) switch { case strings.HasSuffix(path, dbSuffix): files.databases = append(files.databases, path) + if showDetails { + ch.Printer.Println("Database file: " + filepath.Base(path)) + } case strings.HasSuffix(path, schemaSuffix): - files.schemas = append(files.schemas, path) + if tbl >= startFrom { + files.schemas = append(files.schemas, path) + if showDetails { + ch.Printer.Println(" |- Table file: " + printer.BoldBlue(filepath.Base(path))) + } + } else { + ch.Printer.Printf("Skipping files associated with the %s table...\n", printer.BoldBlue(tbl)) + } default: if strings.HasSuffix(path, tableSuffix) { - files.tables = append(files.tables, path) + if tbl >= startFrom { + files.tables = append(files.tables, path) + if showDetails { + ch.Printer.Println(" |- Data file: " + printer.BoldBlue(filepath.Base(path))) + } + } } } } @@ -153,7 +193,7 @@ func (l *Loader) loadFiles(dir string) (*Files, error) { return files, nil } -func (l *Loader) restoreDatabaseSchema(dbs []string, conn *Connection) error { +func (l *Loader) restoreDatabaseSchema(dbs []string, conn *Connection, ch *cmdutil.Helper, showDetails bool) error { for _, db := range dbs { base := filepath.Base(db) name := strings.TrimSuffix(base, dbSuffix) @@ -163,6 +203,9 @@ func (l *Loader) restoreDatabaseSchema(dbs []string, conn *Connection) error { return err } + if showDetails { + ch.Printer.Println("Restoring Database: " + base) + } err = conn.Execute(string(data)) if err != nil { return err @@ -174,11 +217,17 @@ func (l *Loader) restoreDatabaseSchema(dbs []string, conn *Connection) error { return nil } -func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Connection) error { - for _, table := range tables { +func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Connection, ch *cmdutil.Helper, showDetails bool, startFrom string) error { + if startFrom != "" { + ch.Printer.Printf("Starting from %s table...\n", printer.BoldBlue(startFrom)) + } + + numberOfTables := len(tables) + + for idx, table := range tables { base := filepath.Base(table) name := strings.TrimSuffix(base, schemaSuffix) - db := strings.Split(name, ".")[0] + db := l.databaseNameFromFilename(name) tbl := strings.Split(name, ".")[1] name = fmt.Sprintf("`%v`.`%v`", db, tbl) @@ -188,6 +237,11 @@ func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Conne zap.String("table ", tbl), ) + if tbl < startFrom { + ch.Printer.Printf("Skipping %s table (Table %d of %d)...\n", printer.BoldBlue(tbl), (idx + 1), numberOfTables) + continue + } + err := conn.Execute(fmt.Sprintf("USE `%s`", db)) if err != nil { return err @@ -203,8 +257,8 @@ func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Conne return err } query1 := string(data) - querys := strings.Split(query1, ";\n") - for _, query := range querys { + queries := strings.Split(query1, ";\n") + for _, query := range queries { if !strings.HasPrefix(query, "/*") && query != "" { if overwrite { l.log.Info( @@ -213,12 +267,19 @@ func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Conne zap.String("table ", tbl), ) + if showDetails { + ch.Printer.Println("Dropping Existing Table (if it exists): " + printer.BoldBlue(name)) + } dropQuery := fmt.Sprintf("DROP TABLE IF EXISTS %s", name) err = conn.Execute(dropQuery) if err != nil { return err } } + + if showDetails { + ch.Printer.Printf("Creating Table: %s (Table %d of %d)\n", printer.BoldBlue(name), (idx + 1), numberOfTables) + } err = conn.Execute(query) if err != nil { return err @@ -234,7 +295,7 @@ func (l *Loader) restoreTableSchema(overwrite bool, tables []string, conn *Conne return nil } -func (l *Loader) restoreTable(table string, conn *Connection) (int, error) { +func (l *Loader) restoreTable(table string, conn *Connection, ch *cmdutil.Helper, showDetails bool) (int, error) { bytes := 0 part := "0" base := filepath.Base(table) @@ -245,7 +306,7 @@ func (l *Loader) restoreTable(table string, conn *Connection) (int, error) { return 0, fmt.Errorf("expected database.table, but got: %q", name) } - db := splits[0] + db := l.databaseNameFromFilename(splits[0]) tbl := splits[1] if len(splits) > 2 { @@ -259,6 +320,7 @@ func (l *Loader) restoreTable(table string, conn *Connection) (int, error) { zap.String("part", part), zap.Int("thread_conn_id", conn.ID), ) + err := conn.Execute(fmt.Sprintf("USE `%s`", db)) if err != nil { return 0, err @@ -274,14 +336,37 @@ func (l *Loader) restoreTable(table string, conn *Connection) (int, error) { return 0, err } query1 := string(data) - querys := strings.Split(query1, ";\n") + queries := strings.Split(query1, ";\n") + lastQuery := queries[len(queries)-1] + + // Commonly for our files the last entry is non-actionable so we should exclude it automatically: + if strings.HasPrefix(lastQuery, "/*") || lastQuery == "" { + queries = queries[:len(queries)-1] + } + bytes = len(query1) - for _, query := range querys { + queriesInFile := len(queries) + + for idx, query := range queries { if !strings.HasPrefix(query, "/*") && query != "" { - err = conn.Execute(query) - if err != nil { - return 0, err + queryBytes := len(query) + if queryBytes <= 16777216 { + if showDetails { + ch.Printer.Printf(" Processing Query %s out of %s within %s in thread %s\n", printer.BoldBlue((idx + 1)), printer.BoldBlue(queriesInFile), printer.BoldBlue(base), printer.BoldBlue(conn.ID)) + } + + err = conn.Execute(query) + if err != nil { + return 0, err + } + } else { + // Encountering this error should be uncommon for our users. + // However, it may be encountered if users generate files manually to match our expected folder format. + ch.Printer.Printf("%s: Query %s within %s in thread %s is larger than 16777216 bytes. Please reduce query size to avoid pkt error.\n", printer.BoldRed("ERROR"), printer.BoldBlue((idx + 1)), printer.BoldBlue(base), printer.BoldBlue(conn.ID)) + return 0, errors.New("query is larger than 16777216 bytes in size") } + } else { + ch.Printer.Printf(" Skipping Empty Query %s out of %s within %s in thread %s\n", printer.BoldBlue((idx + 1)), printer.BoldBlue(queriesInFile), printer.BoldBlue(base), printer.BoldBlue(conn.ID)) } } l.log.Info( @@ -291,5 +376,30 @@ func (l *Loader) restoreTable(table string, conn *Connection) (int, error) { zap.String("part", part), zap.Int("thread_conn_id", conn.ID), ) + return bytes, nil } + +func (l *Loader) databaseNameFromFilename(filename string) string { + if l.cfg.AllowDifferentDestination { + return l.cfg.Database + } + + return strings.Split(filename, ".")[0] +} + +func tableNameFromFilename(filename string) string { + base := filepath.Base(filename) + name := strings.TrimSuffix(base, dbSuffix) + name = strings.TrimSuffix(name, schemaSuffix) + name = strings.TrimSuffix(name, tableSuffix) + + splits := strings.Split(name, ".") + if len(splits) < 2 { + return "" + } + + tbl := splits[1] + + return tbl +}