Skip to content

Commit

Permalink
br: add errch buf for checkpoint (#40166)
Browse files Browse the repository at this point in the history
close #40165
  • Loading branch information
Leavrth authored Jan 4, 2023
1 parent 00604eb commit bf2cc45
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
1 change: 1 addition & 0 deletions br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/logutil",
"//br/pkg/metautil",
"//br/pkg/rtree",
"//br/pkg/storage",
Expand Down
19 changes: 14 additions & 5 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/rtree"
"github.com/pingcap/tidb/br/pkg/storage"
Expand Down Expand Up @@ -240,7 +241,7 @@ func StartCheckpointRunnerForTest(ctx context.Context, storage storage.ExternalS

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
errCh: make(chan error),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tick)
Expand All @@ -258,7 +259,7 @@ func StartCheckpointRunner(ctx context.Context, storage storage.ExternalStorage,

appendCh: make(chan *CheckpointMessage),
metaCh: make(chan map[string]*RangeGroups),
errCh: make(chan error),
errCh: make(chan error, 1),
}

runner.startCheckpointLoop(ctx, tickDuration)
Expand Down Expand Up @@ -344,6 +345,14 @@ func (r *CheckpointRunner) startCheckpointRunner(ctx context.Context, wg *sync.W
return errCh
}

func (r *CheckpointRunner) sendError(err error) {
select {
case r.errCh <- err:
default:
log.Error("errCh is blocked", logutil.ShortError(err))
}
}

func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration time.Duration) {
r.wg.Add(1)
checkpointLoop := func(ctx context.Context) {
Expand All @@ -360,14 +369,14 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
return
case <-ticker.C:
if err := r.flushMeta(ctx, errCh); err != nil {
r.errCh <- err
r.sendError(err)
return
}
case msg, ok := <-r.appendCh:
if !ok {
log.Info("stop checkpoint runner")
if err := r.flushMeta(ctx, errCh); err != nil {
r.errCh <- err
r.sendError(err)
}
// close the channel to flush worker
// and wait it to consumes all the metas
Expand All @@ -386,7 +395,7 @@ func (r *CheckpointRunner) startCheckpointLoop(ctx context.Context, tickDuration
groups.Groups = append(groups.Groups, msg.Group)
case err := <-errCh:
// pass flush worker's error back
r.errCh <- err
r.sendError(err)
return
}
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
defer func() {
// don't reset the gc-safe-point if checkpoint mode is used and backup is not finished
if cfg.UseCheckpoint && !gcSafePointKeeperRemovable {
log.Info("skip removing gc-safepoint keeper for next retry", zap.String("gc-id", sp.ID))
return
}
log.Info("start to remove gc-safepoint keeper")
Expand Down

0 comments on commit bf2cc45

Please sign in to comment.