Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#1433
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
joccau authored and ti-chi-bot committed Sep 17, 2021
1 parent 1eb2415 commit 4b2d5d8
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 17 deletions.
2 changes: 1 addition & 1 deletion pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (bc *Client) BackupRange(
zap.Uint32("concurrency", req.Concurrency))

var allStores []*metapb.Store
allStores, err = conn.GetAllTiKVStores(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
allStores, err = conn.GetAllTiKVStoresWithRetry(ctx, bc.mgr.GetPDClient(), conn.SkipTiFlash)
if err != nil {
return errors.Trace(err)
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/conn/conn.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

backuppb "github.com/pingcap/kvproto/pkg/backup"
"github.com/pingcap/kvproto/pkg/metapb"
Expand All @@ -30,6 +32,7 @@ import (
"github.com/pingcap/br/pkg/glue"
"github.com/pingcap/br/pkg/logutil"
"github.com/pingcap/br/pkg/pdutil"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/version"
)

Expand Down Expand Up @@ -163,6 +166,32 @@ func GetAllTiKVStores(
return stores[:j], nil
}

// GetAllTiKVStoresWithRetry call GetAllTiKVStores and to retry when meet errors
func GetAllTiKVStoresWithRetry(ctx context.Context,
pdClient pd.Client,
storeBehavior StoreBehavior,
) ([]*metapb.Store, error) {
stores := make([]*metapb.Store, 0)
var err error

errRetry := utils.WithRetry(
ctx,
func() error {
stores, err = GetAllTiKVStores(ctx, pdClient, storeBehavior)
failpoint.Inject("hint-GetAllTiKVStores-error", func(val failpoint.Value) {
if val.(bool) {
err = status.Error(codes.Unknown, "Retryable error")
}
})

return errors.Trace(err)
},
utils.NewPDReqBackoffer(),
)

return stores, errors.Trace(errRetry)
}

// NewMgr creates a new Mgr.
//
// Domain is optional for Backup, set `needDomain` to false to disable
Expand Down
54 changes: 54 additions & 0 deletions pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,60 @@ func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (s *testClientSuite) TestCheckStoresAlive(c *C) {
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Offline,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 3,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 4,
State: metapb.StoreState_Offline,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
}

fpdc := fakePDClient{
stores: stores,
}

kvStores, err := GetAllTiKVStoresWithRetry(s.ctx, fpdc, SkipTiFlash)
c.Assert(err, IsNil)
c.Assert(len(kvStores), Equals, 2)
c.Assert(kvStores, DeepEquals, stores[2:])
}

func (s *testClientSuite) TestGetAllTiKVStores(c *C) {
testCases := []struct {
stores []*metapb.Store
Expand Down
4 changes: 2 additions & 2 deletions pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (rc *Client) ResetTS(ctx context.Context, pdAddrs []string) error {
idx := i % len(pdAddrs)
i++
return pdutil.ResetTS(ctx, pdAddrs[idx], restoreTS, rc.tlsConf)
}, newPDReqBackoffer())
}, utils.NewPDReqBackoffer())
}

// GetPlacementRules return the current placement rules.
Expand All @@ -307,7 +307,7 @@ func (rc *Client) GetPlacementRules(ctx context.Context, pdAddrs []string) ([]pl
i++
placementRules, err = pdutil.GetPlacementRules(ctx, pdAddrs[idx], rc.tlsConf)
return errors.Trace(err)
}, newPDReqBackoffer())
}, utils.NewPDReqBackoffer())
return placementRules, errors.Trace(errRetry)
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/restore/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,19 @@ func (importer *FileImporter) Import(
}
downloadMetas = append(downloadMetas, downloadMeta)
}
<<<<<<< HEAD

return nil
}, newDownloadSSTBackoffer())
=======
failpoint.Inject("restore-storage-error", func(val failpoint.Value) {
msg := val.(string)
log.Debug("failpoint restore-storage-error injected.", zap.String("msg", msg))
e = errors.Annotate(e, msg)
})
return errors.Trace(e)
}, utils.NewDownloadSSTBackoffer())
>>>>>>> 54a83770 (BR: retry for PD request error and TiKV IO error (#27803) (#1433))
if errDownload != nil {
for _, e := range multierr.Errors(errDownload) {
switch errors.Cause(e) { // nolint:errorlint
Expand Down Expand Up @@ -421,7 +431,7 @@ func (importer *FileImporter) Import(
}

return nil
}, newImportSSTBackoffer())
}, utils.NewImportSSTBackoffer())
return errors.Trace(err)
}

Expand Down
17 changes: 10 additions & 7 deletions pkg/restore/backoff.go → pkg/utils/backoff.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore
package utils

import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/utils"
)

const (
Expand All @@ -36,24 +36,26 @@ type importerBackoffer struct {
}

// NewBackoffer creates a new controller regulating a truncated exponential backoff.
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) utils.Backoffer {
func NewBackoffer(attempt int, delayTime, maxDelayTime time.Duration) Backoffer {
return &importerBackoffer{
attempt: attempt,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
}
}

func newImportSSTBackoffer() utils.Backoffer {
// NewImportSSTBackoffer create a BackOffer for ImportSST
func NewImportSSTBackoffer() Backoffer {
return NewBackoffer(importSSTRetryTimes, importSSTWaitInterval, importSSTMaxWaitInterval)
}

func newDownloadSSTBackoffer() utils.Backoffer {
// NewDownloadSSTBackoffer create a BackOffer for DownLoadSST
func NewDownloadSSTBackoffer() Backoffer {
return NewBackoffer(downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval)
}

func (bo *importerBackoffer) NextBackoff(err error) time.Duration {
if utils.MessageIsRetryableStorageError(err.Error()) {
if MessageIsRetryableStorageError(err.Error()) {
bo.delayTime = 2 * bo.delayTime
bo.attempt--
} else {
Expand Down Expand Up @@ -94,7 +96,8 @@ type pdReqBackoffer struct {
maxDelayTime time.Duration
}

func newPDReqBackoffer() utils.Backoffer {
// NewPDReqBackoffer create a BackOffer for pd request
func NewPDReqBackoffer() Backoffer {
return &pdReqBackoffer{
attempt: resetTSRetryTime,
delayTime: resetTSWaitInterval,
Expand Down
11 changes: 5 additions & 6 deletions pkg/restore/backoff_test.go → pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright 2020 PingCAP, Inc. Licensed under Apache-2.0.

package restore_test
package utils_test

import (
"context"
Expand All @@ -14,7 +14,6 @@ import (

berrors "github.com/pingcap/br/pkg/errors"
"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/restore"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -36,7 +35,7 @@ func (s *testBackofferSuite) TearDownSuite(c *C) {

func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
switch counter {
Expand All @@ -55,7 +54,7 @@ func (s *testBackofferSuite) TestBackoffWithSuccess(c *C) {

func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
gRPCError := status.Error(codes.Unavailable, "transport is closing")
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
Expand Down Expand Up @@ -83,7 +82,7 @@ func (s *testBackofferSuite) TestBackoffWithFatalError(c *C) {
func (s *testBackofferSuite) TestBackoffWithFatalRawGRPCError(c *C) {
var counter int
canceledError := status.Error(codes.Canceled, "context canceled")
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return canceledError // nolint:wrapcheck
Expand All @@ -96,7 +95,7 @@ func (s *testBackofferSuite) TestBackoffWithFatalRawGRPCError(c *C) {

func (s *testBackofferSuite) TestBackoffWithRetryableError(c *C) {
var counter int
backoffer := restore.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
backoffer := utils.NewBackoffer(10, time.Nanosecond, time.Nanosecond)
err := utils.WithRetry(context.Background(), func() error {
defer func() { counter++ }()
return berrors.ErrKVEpochNotMatch
Expand Down
1 change: 1 addition & 0 deletions pkg/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var retryableServerError = []string{
"connection closed before message completed",
"body write aborted",
"error during dispatch",
"put object timeout",
}

// RetryableFunc presents a retryable operation.
Expand Down

0 comments on commit 4b2d5d8

Please sign in to comment.