Skip to content

Commit

Permalink
ebs_br: allow temporary TiKV unreachable during starting snapshot bac…
Browse files Browse the repository at this point in the history
…kup (pingcap#49154) (pingcap#50444) (pingcap#37)

close pingcap#49152, close pingcap#49153

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and GitHub Enterprise committed Jan 23, 2024
1 parent 991ee45 commit 8ceaa27
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 36 deletions.
19 changes: 15 additions & 4 deletions br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ type PdController struct {

// control the pause schedulers goroutine
schedulerPauseCh chan struct{}
// control the ttl of pausing schedulers
SchedulerPauseTTL time.Duration
}

// NewPdController creates a new PdController.
Expand Down Expand Up @@ -442,7 +444,7 @@ func (p *PdController) getStoreInfoWith(

func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []string, post pdHTTPRequest) ([]string, error) {
// pause this scheduler with 300 seconds
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(pauseTimeout.Seconds())})
body, err := json.Marshal(pauseSchedulerBody{Delay: int64(p.ttlOfPausing().Seconds())})
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -451,9 +453,11 @@ func (p *PdController) doPauseSchedulers(ctx context.Context, schedulers []strin
for _, scheduler := range schedulers {
prefix := fmt.Sprintf("%s/%s", schedulerPrefix, scheduler)
for _, addr := range p.getAllPDAddrs() {
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, bytes.NewBuffer(body))
var resp []byte
_, err = post(ctx, addr, prefix, p.cli, http.MethodPost, body)
if err == nil {
removedSchedulers = append(removedSchedulers, scheduler)
log.Info("Paused scheduler.", zap.String("response", string(resp)), zap.String("on", addr))
break
}
}
Expand Down Expand Up @@ -488,7 +492,7 @@ func (p *PdController) pauseSchedulersAndConfigWith(
}

go func() {
tick := time.NewTicker(pauseTimeout / 3)
tick := time.NewTicker(p.ttlOfPausing() / 3)
defer tick.Stop()

for {
Expand Down Expand Up @@ -634,7 +638,7 @@ func (p *PdController) doUpdatePDScheduleConfig(

func (p *PdController) doPauseConfigs(ctx context.Context, cfg map[string]interface{}, post pdHTTPRequest) error {
// pause this scheduler with 300 seconds
prefix := fmt.Sprintf("%s?ttlSecond=%.0f", configPrefix, pauseTimeout.Seconds())
prefix := fmt.Sprintf("%s?ttlSecond=%.0f", configPrefix, p.ttlOfPausing().Seconds())
return p.doUpdatePDScheduleConfig(ctx, cfg, post, prefix)
}

Expand Down Expand Up @@ -1072,6 +1076,13 @@ func (p *PdController) Close() {
}
}

func (p *PdController) ttlOfPausing() time.Duration {
if p.SchedulerPauseTTL > 0 {
return p.SchedulerPauseTTL
}
return pauseTimeout
}

// FetchPDVersion get pd version
func FetchPDVersion(ctx context.Context, tls *common.TLS, pdAddr string) (*semver.Version, error) {
// An example of PD version API.
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/task/operator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/pingcap/tidb/br/pkg/task/operator",
visibility = ["//visibility:public"],
deps = [
"//br/pkg/errors",
"//br/pkg/logutil",
"//br/pkg/pdutil",
"//br/pkg/task",
Expand All @@ -18,6 +19,7 @@ go_library(
"@com_github_spf13_pflag//:pflag",
"@org_golang_google_grpc//keepalive",
"@org_golang_x_sync//errgroup",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
)
115 changes: 85 additions & 30 deletions br/pkg/task/operator/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,21 @@ package operator
import (
"context"
"crypto/tls"
"fmt"
"math/rand"
"os"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/task"
"github.com/pingcap/tidb/br/pkg/utils"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/keepalive"
Expand All @@ -38,13 +43,16 @@ func dialPD(ctx context.Context, cfg *task.Config) (*pdutil.PdController, error)
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWith(f func(ctx context.Context)) {
_ = cx.cleanUpWithErr(func(ctx context.Context) error { f(ctx); return nil })
cx.cleanUpWithRetErr(nil, func(ctx context.Context) error { f(ctx); return nil })
}

func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithErr(f func(ctx context.Context) error) error {
func (cx *AdaptEnvForSnapshotBackupContext) cleanUpWithRetErr(errOut *error, f func(ctx context.Context) error) {
ctx, cancel := context.WithTimeout(context.Background(), cx.cfg.TTL)
defer cancel()
return f(ctx)
err := f(ctx)
if errOut != nil {
*errOut = multierr.Combine(*errOut, err)
}
}

type AdaptEnvForSnapshotBackupContext struct {
Expand All @@ -58,6 +66,18 @@ type AdaptEnvForSnapshotBackupContext struct {
runGrp *errgroup.Group
}

func (cx *AdaptEnvForSnapshotBackupContext) Close() {
cx.pdMgr.Close()
cx.kvMgr.Close()
}

func (cx *AdaptEnvForSnapshotBackupContext) GetBackOffer(operation string) utils.Backoffer {
state := utils.InitialRetryState(64, 1*time.Second, 10*time.Second)
bo := utils.GiveUpRetryOn(&state, berrors.ErrPossibleInconsistency)
bo = utils.VerboseRetry(bo, logutil.CL(cx).With(zap.String("operation", operation)))
return bo
}

func (cx *AdaptEnvForSnapshotBackupContext) ReadyL(name string, notes ...zap.Field) {
logutil.CL(cx).Info("Stage ready.", append(notes, zap.String("component", name))...)
cx.rdGrp.Done()
Expand All @@ -77,6 +97,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
if err != nil {
return errors.Annotate(err, "failed to dial PD")
}
mgr.SchedulerPauseTTL = cfg.TTL
var tconf *tls.Config
if cfg.TLS.IsEnabled() {
tconf, err = cfg.TLS.ToTLSConfig()
Expand All @@ -97,73 +118,107 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error {
rdGrp: sync.WaitGroup{},
runGrp: eg,
}
defer cx.Close()

cx.rdGrp.Add(3)

eg.Go(func() error { return pauseGCKeeper(cx) })
eg.Go(func() error { return pauseSchedulerKeeper(cx) })
eg.Go(func() error { return pauseImporting(cx) })
go func() {
cx.rdGrp.Wait()
if cfg.OnAllReady != nil {
cfg.OnAllReady()
}
hintAllReady()
}()
defer func() {
if cfg.OnExit != nil {
cfg.OnExit()
}
}()

return eg.Wait()
}

func getCallerName() string {
name, err := os.Hostname()
if err != nil {
name = fmt.Sprintf("UNKNOWN-%d", rand.Int63())
}
return fmt.Sprintf("operator@%sT%d#%d", name, time.Now().Unix(), os.Getpid())
}

func pauseImporting(cx *AdaptEnvForSnapshotBackupContext) error {
denyLightning := utils.NewSuspendImporting("prepare_for_snapshot_backup", cx.kvMgr)
if _, err := denyLightning.DenyAllStores(cx, cx.cfg.TTL); err != nil {
suspendLightning := utils.NewSuspendImporting(getCallerName(), cx.kvMgr)
_, err := utils.WithRetryV2(cx, cx.GetBackOffer("suspend_lightning"), func(_ context.Context) (map[uint64]bool, error) {
return suspendLightning.DenyAllStores(cx, cx.cfg.TTL)
})
if err != nil {
return errors.Trace(err)
}
cx.ReadyL("pause_lightning")
cx.runGrp.Go(func() error {
err := denyLightning.Keeper(cx, cx.cfg.TTL)
cx.runGrp.Go(func() (err error) {
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
if ctx.Err() != nil {
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := utils.WithRetryV2(ctx, cx.GetBackOffer("restore_lightning"),
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
func(ctx context.Context) (map[uint64]bool, error) { return suspendLightning.AllowAllStores(ctx) })
if err != nil {
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return errors.Annotatef(err, "failed to allow all stores")
}
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return suspendLightning.ConsistentWithPrev(res)
})

err = suspendLightning.Keeper(cx, cx.cfg.TTL)
if errors.Cause(err) != context.Canceled {
logutil.CL(cx).Warn("keeper encounters error.", logutil.ShortError(err))
return err
}
return cx.cleanUpWithErr(func(ctx context.Context) error {
for {
if ctx.Err() != nil {
return errors.Annotate(ctx.Err(), "cleaning up timed out")
}
res, err := denyLightning.AllowAllStores(ctx)
if err != nil {
logutil.CL(ctx).Warn("Failed to restore lightning, will retry.", logutil.ShortError(err))
// Retry for 10 times.
time.Sleep(cx.cfg.TTL / 10)
continue
}
return denyLightning.ConsistentWithPrev(res)
}
})
// Clean up the canceled error.
err = nil
return
})
return nil
}

func pauseGCKeeper(ctx *AdaptEnvForSnapshotBackupContext) error {
func pauseGCKeeper(cx *AdaptEnvForSnapshotBackupContext) (err error) {
// Note: should we remove the service safepoint as soon as this exits?
sp := utils.BRServiceSafePoint{
ID: utils.MakeSafePointID(),
TTL: int64(ctx.cfg.TTL.Seconds()),
BackupTS: ctx.cfg.SafePoint,
TTL: int64(cx.cfg.TTL.Seconds()),
BackupTS: cx.cfg.SafePoint,
}
if sp.BackupTS == 0 {
rts, err := ctx.pdMgr.GetMinResolvedTS(ctx)
rts, err := cx.pdMgr.GetMinResolvedTS(cx)
if err != nil {
return err
}
logutil.CL(ctx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
logutil.CL(cx).Info("No service safepoint provided, using the minimal resolved TS.", zap.Uint64("min-resolved-ts", rts))
sp.BackupTS = rts
}
err := utils.StartServiceSafePointKeeper(ctx, ctx.pdMgr.GetPDClient(), sp)
err = utils.StartServiceSafePointKeeper(cx, cx.pdMgr.GetPDClient(), sp)
if err != nil {
return err
}
ctx.ReadyL("pause_gc", zap.Object("safepoint", sp))
cx.ReadyL("pause_gc", zap.Object("safepoint", sp))
defer cx.cleanUpWithRetErr(&err, func(ctx context.Context) error {
cancelSP := utils.BRServiceSafePoint{
ID: sp.ID,
TTL: 0,
}
//nolint: all_revive,revive // There is a false positive on returning in `defer`.
return utils.UpdateServiceSafePoint(ctx, cx.pdMgr.GetPDClient(), cancelSP)
})
// Note: in fact we can directly return here.
// But the name `keeper` implies once the function exits,
// the GC should be resume, so let's block here.
<-ctx.Done()
<-cx.Done()
return nil
}

Expand Down
5 changes: 4 additions & 1 deletion br/pkg/task/operator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@ type PauseGcConfig struct {

SafePoint uint64 `json:"safepoint" yaml:"safepoint"`
TTL time.Duration `json:"ttl" yaml:"ttl"`

OnAllReady func() `json:"-" yaml:"-"`
OnExit func() `json:"-" yaml:"-"`
}

func DefineFlagsForPrepareSnapBackup(f *pflag.FlagSet) {
_ = f.DurationP("ttl", "i", 5*time.Minute, "The time-to-live of the safepoint.")
_ = f.DurationP("ttl", "i", 2*time.Minute, "The time-to-live of the safepoint.")
_ = f.Uint64P("safepoint", "t", 0, "The GC safepoint to be kept.")
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/utils/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ go_test(
],
embed = [":utils"],
flaky = True,
shard_count = 29,
shard_count = 37,
deps = [
"//br/pkg/errors",
"//br/pkg/metautil",
Expand Down
Loading

0 comments on commit 8ceaa27

Please sign in to comment.