Skip to content

Commit

Permalink
Switch WaitGroup usage to ErrGroup
Browse files Browse the repository at this point in the history
In a recent ticket I noticed that there might be cases where `EOF` errors occur, possibly due to network connectivity issues or something else, that weren't causing a full failure of the restore process so it seemed the user was generating backups in these cases that weren't fully complete as the dump process would continue after these were encountered meaning that there might have been gaps in the middle of their dumped dataset.

Switching to the ErrGroup usage here plus adding a few context error checking locations should hopefully help the dump process to stop if needed rather than provide users with a false positive.
  • Loading branch information
orware committed Dec 3, 2024
1 parent f5ed825 commit 53bff35
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions internal/dumper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ import (
"os"
"regexp"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/planetscale/cli/internal/cmdutil"
"github.com/planetscale/cli/internal/printer"
querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
"golang.org/x/sync/errgroup"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -136,8 +136,8 @@ func (d *Dumper) Run(ctx context.Context) error {
}
initPool.Put(conn)

// TODO(fatih): use errgroup
var wg sync.WaitGroup
// Adding the context here helps down below if a query issue is encountered to prevent further processing:
eg, egCtx := errgroup.WithContext(ctx)
for i, database := range databases {
pool, err := NewPool(d.log, d.cfg.Threads/len(databases), d.cfg.Address, d.cfg.User, d.cfg.Password, d.cfg.SessionVars, database)
if err != nil {
Expand All @@ -151,6 +151,11 @@ func (d *Dumper) Run(ctx context.Context) error {
continue
}

// Allows for quicker exit when using Ctrl+C at the Terminal:
if egCtx.Err() != nil {
return egCtx.Err()
}

conn := initPool.Get()
err := d.dumpTableSchema(conn, database, table, views[i])
if err != nil {
Expand All @@ -165,12 +170,13 @@ func (d *Dumper) Run(ctx context.Context) error {
}

conn = pool.Get()
wg.Add(1)
go func(conn *Connection, database string, table string) {
defer func() {
wg.Done()
pool.Put(conn)
}()

eg.Go(func() error {
defer pool.Put(conn)

if egCtx.Err() != nil {
return egCtx.Err()
}

d.log.Info(
"dumping table ...",
Expand All @@ -179,11 +185,13 @@ func (d *Dumper) Run(ctx context.Context) error {
zap.Int("thread_conn_id", conn.ID),
)

err := d.dumpTable(ctx, conn, database, table, d.cfg.UseReplica, d.cfg.UseRdonly)
err := d.dumpTable(egCtx, conn, database, table, d.cfg.UseReplica, d.cfg.UseRdonly)
if err != nil {
d.log.Error("error dumping table", zap.Error(err))
}
}(conn, database, table)

return nil
})
}
}

Expand All @@ -205,8 +213,13 @@ func (d *Dumper) Run(ctx context.Context) error {
}
}()

wg.Wait()
elapsed := time.Since(t)

if err := eg.Wait(); err != nil {
d.log.Error("error dumping", zap.Error(err))
return err
}

d.log.Info(
"dumping all done",
zap.Duration("elapsed_time", elapsed),
Expand Down

0 comments on commit 53bff35

Please sign in to comment.