Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail fast when builtinbackup fails to restore a single file #16856

Merged
merged 5 commits into from
Sep 30, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 47 additions & 21 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ func (be *BuiltinBackupEngine) executeFullBackup(ctx context.Context, params Bac
// Save initial state so we can restore.
replicaStartRequired := false
sourceIsPrimary := false
superReadOnly := true //nolint
readOnly := true //nolint
superReadOnly := true // nolint
readOnly := true // nolint
var replicationPosition replication.Position
semiSyncSource, semiSyncReplica := params.Mysqld.SemiSyncEnabled(ctx)

Expand Down Expand Up @@ -602,39 +602,52 @@ func (be *BuiltinBackupEngine) backupFiles(
// Backup with the provided concurrency.
sema := semaphore.NewWeighted(int64(params.Concurrency))
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if bh.HasErrors() {
params.Logger.Infof("failed to backup files due to error.")
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
bh.RecordError(be.backupFile(ctx, params, bh, fe, name))
err := be.backupFile(ctxCancel, params, bh, fe, name)
if err != nil {
bh.RecordError(acqErr)
cancel()
}
}(i)
}

Expand Down Expand Up @@ -766,7 +779,7 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger, restore bool) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger, restore bool) {
messageStr := "restoring "
if !restore {
messageStr = "backing up "
Expand All @@ -775,6 +788,9 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %s of %q file", messageStr, bp.filename)
return
case <-bp.done:
logger.Infof("Completed %s %q", messageStr, bp.filename)
return
Expand Down Expand Up @@ -817,7 +833,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger, false /*restore*/)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, false /*restore*/)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1016,43 +1032,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()
}
}(i)
}
Expand Down Expand Up @@ -1082,7 +1108,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger, true /*restore*/)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger, true)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down
Loading