Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ebs_br: allow temporary TiKV unreachable during starting snapshot backup (#49154) #50444

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ type PdController struct {

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
// control the ttl of pausing schedulers
SchedulerPauseTTL time.Duration
}

// NewPdController creates a new PdController.
Expand Down Expand Up @@ -445,7 +447,7 @@ func (p *PdController) getStoreInfoWith(

func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout.Seconds())})
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(p.ttlOfPausing().Seconds())})
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -454,9 +456,11 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []strin
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.getAllPDAddrs() {
var resp []byte
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, body)
if err == nil {
removedSchedulers = append(removedSchedulers, scheduler)
log.Info("Paused scheduler.", zap.String("response", string(resp)), zap.String("on", addr))
break
}
}
Expand Down Expand Up @@ -491,7 +495,7 @@ func (p *PdController) pauseSchedulersAndConfigWith(
}

go func() {
tick := time.NewTicker(pauseTimeout / 3)
tick := time.NewTicker(p.ttlOfPausing() / 3)
defer tick.Stop()

for {
Expand Down Expand Up @@ -637,7 +641,7 @@ func (p *PdController) doUpdatePDScheduleConfig(

func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error {
// pause this scheduler with 300 seconds
prefix := fmt.Sprintf("%s?ttlSecond=%.0f", configPrefix, pauseTimeout.Seconds())
prefix := fmt.Sprintf("%s?ttlSecond=%.0f", configPrefix, p.ttlOfPausing().Seconds())
return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix)
}

Expand Down Expand Up @@ -1075,6 +1079,13 @@ func (p *PdController) Close() {
}
}

func (p *PdController) ttlOfPausing() time.Duration {
if p.SchedulerPauseTTL > 0 {
return p.SchedulerPauseTTL
}
return pauseTimeout
}

// FetchPDVersion get pd version
func FetchPDVersion(ctx context.Context, tls *common.TLS, pdAddr string) (*semver.Version, error) {
// An example of PD version API.
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/task",
Expand All @@ -18,6 +19,7 @@ go_library(
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
115 changes: 85 additions & 30 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
Expand All @@ -38,13 +43,16 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) {
_ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil })
cx.cleanUpWithRetErr(nil, func(ctx context.Context) error { f(ctx); return nil })
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error {
func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithRetErr(errOut *error, f func(ctx context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL)
defer cancel()
return f(ctx)
err := f(ctx)
if errOut != nil {
*errOut = multierr.Combine(*errOut, err)
}
}

type AdaptEnvForSnapshotBackupContext struct {
Expand All @@ -58,6 +66,18 @@ type AdaptEnvForSnapshotBackupContext struct {
runGrp *errgroup.Group
}

func (cx *AdaptEnvForSnapshotBackupContext) Close() {
cx.pdMgr.Close()
cx.kvMgr.Close()
}

func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.Backoffer {
state := utils.InitialRetryState(64, 1*time.Second, 10*time.Second)
bo := utils.GiveUpRetryOn(&state, berrors.ErrPossibleInconsistency)
bo = utils.VerboseRetry(bo, logutil.CL(cx).With(zap.String("operation", operation)))
return bo
}

func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) {
logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...)
cx.rdGrp.Done()
Expand All @@ -77,6 +97,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
if err != nil {
return errors.Annotate(err, "failed to dial PD")
}
mgr.SchedulerPauseTTL = cfg.TTL
var tconf *tls.Config
if cfg.TLS.IsEnabled() {
tconf, err = cfg.TLS.ToTLSConfig()
Expand All @@ -97,73 +118,107 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
rdGrp: sync.WaitGroup{},
runGrp: eg,
}
defer cx.Close()

cx.rdGrp.Add(3)

eg.Go(func() error { return pauseGCKeeper(cx) })
eg.Go(func() error { return pauseSchedulerKeeper(cx) })
eg.Go(func() error { return pauseImporting(cx) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
}
hintAllReady()
}()
defer func() {
if cfg.OnExit != nil {
cfg.OnExit()
}
}()

return eg.Wait()
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error {
denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr)
if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil {
suspendLightning := utils.NewSuspendImporting(getCallerName(), cx.kvMgr)
_, err := utils.WithRetryV2(cx, cx.GetBackOffer("suspend_lightning"), func(_ context.Context) (map[uint64]bool, error) {
return suspendLightning.DenyAllStores(cx, cx.cfg.TTL)
})
if err != nil {
return errors.Trace(err)
}
cx.ReadyL("pause_lightning")
cx.runGrp.Go(func() error {
err := denyLightning.Keeper(cx, cx.cfg.TTL)
cx.runGrp.Go(func() (err error) {
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
if ctx.Err() != nil {
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := utils.WithRetryV2(ctx, cx.GetBackOffer("restore_lightning"),
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
func(ctx context.Context) (map[uint64]bool, error) { return suspendLightning.AllowAllStores(ctx) })
if err != nil {
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return errors.Annotatef(err, "failed to allow all stores")
}
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return suspendLightning.ConsistentWithPrev(res)
})

err = suspendLightning.Keeper(cx, cx.cfg.TTL)
if errors.Cause(err) != context.Canceled {
logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err))
return err
}
return cx.cleanUpWithErr(func(ctx context.Context) error {
for {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err))
// Retry for 10 times.
time.Sleep(cx.cfg.TTL / 10)
continue
}
return denyLightning.ConsistentWithPrev(res)
}
})
// Clean up the canceled error.
err = nil
return
})
return nil
}

func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(ctx.cfg.TTL.Seconds()),
BackupTS: ctx.cfg.SafePoint,
TTL: int64(cx.cfg.TTL.Seconds()),
BackupTS: cx.cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctx.pdMgr.GetMinResolvedTS(ctx)
rts, err := cx.pdMgr.GetMinResolvedTS(cx)
if err != nil {
return err
}
logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
logutil.CL(cx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp)
err = utils.StartServiceSafePointKeeper(cx, cx.pdMgr.GetPDClient(), sp)
if err != nil {
return err
}
ctx.ReadyL("pause_gc", zap.Object("safepoint", sp))
cx.ReadyL("pause_gc", zap.Object("safepoint", sp))
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
cancelSP := utils.BRServiceSafePoint{
ID: sp.ID,
TTL: 0,
}
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return utils.UpdateServiceSafePoint(ctx, cx.pdMgr.GetPDClient(), cancelSP)
})
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
<-cx.Done()
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ type PauseGcConfig struct {

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`

OnAllReady func() `json:"-" yaml:"-"`
OnExit func() `json:"-" yaml:"-"`
}

func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.DurationP("ttl", "i", 2*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 29,
shard_count = 37,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
Loading