Skip to content

Commit

Permalink
pkg/: add retry for etcd put/delete operation (pingcap#528)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Mar 17, 2020
1 parent d089776 commit 478e6c6
Show file tree
Hide file tree
Showing 9 changed files with 59 additions and 20 deletions.
45 changes: 42 additions & 3 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/pingcap/errors"
"go.etcd.io/etcd/clientv3"
v3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"

tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/retry"
)

const (
Expand All @@ -33,6 +37,32 @@ const (
DefaultRequestTimeout = 10 * time.Second
)

var etcdDefaultTxnRetryParam = retry.Params{
RetryCount: 5,
FirstRetryDuration: time.Second,
BackoffStrategy: retry.Stable,
IsRetryableFn: func(retryTime int, err error) bool {
switch err {
// Etcd ResourceExhausted errors, may recover after some time
case v3rpc.ErrNoSpace, v3rpc.ErrTooManyRequests:
return true
// Etcd Unavailable errors, may be available after some time
// https://github.com/etcd-io/etcd/pull/9934/files#diff-6d8785d0c9eaf96bc3e2b29c36493c04R162-R167
// ErrStopped:
// one of the etcd nodes stopped from failure injection
// ErrNotCapable:
// capability check has not been done (in the beginning)
case v3rpc.ErrNoLeader, v3rpc.ErrLeaderChanged, v3rpc.ErrNotCapable, v3rpc.ErrStopped, v3rpc.ErrTimeout,
v3rpc.ErrTimeoutDueToLeaderFail, v3rpc.ErrGRPCTimeoutDueToConnectionLost, v3rpc.ErrUnhealthy:
return true
default:
return false
}
},
}

var etcdDefaultTxnStrategy = retry.FiniteRetryStrategy{}

// CreateClient creates an etcd client with some default config items.
func CreateClient(endpoints []string) (*clientv3.Client, error) {
return clientv3.New(clientv3.Config{
Expand All @@ -55,15 +85,24 @@ func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAdd
return client.MemberAdd(ctx, peerAddrs)
}

// DoOpsInOneTxn do multiple etcd operations in one txn.
func DoOpsInOneTxn(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) {
// DoOpsInOneTxnWithRetry do multiple etcd operations in one txn.
// TODO: add unit test to test encountered an retryable error first but then recovered
func DoOpsInOneTxnWithRetry(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()
tctx := tcontext.NewContext(ctx, log.L())
ret, _, err := etcdDefaultTxnStrategy.Apply(tctx, etcdDefaultTxnRetryParam, func(t *tcontext.Context) (ret interface{}, err error) {
resp, err := cli.Txn(ctx).Then(ops...).Commit()
if err != nil {
return nil, err
}
return resp, nil
})

resp, err := cli.Txn(ctx).Then(ops...).Commit()
if err != nil {
return nil, 0, err
}
resp := ret.(*clientv3.TxnResponse)
return resp, resp.Header.Revision, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func PutSourceBound(cli *clientv3.Client, bounds ...SourceBound) (int64, error)
}
ops = append(ops, op)
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand All @@ -102,7 +102,7 @@ func DeleteSourceBound(cli *clientv3.Client, workers ...string) (int64, error) {
for _, worker := range workers {
ops = append(ops, deleteSourceBoundOp(worker))
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand Down Expand Up @@ -159,7 +159,7 @@ func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, con
}

for retryCnt := 1; retryCnt <= retryNum; retryCnt++ {
txnResp, rev2, err2 := etcdutil.DoOpsInOneTxn(cli, clientv3.OpGet(common.UpstreamBoundWorkerKeyAdapter.Encode(worker)),
txnResp, rev2, err2 := etcdutil.DoOpsInOneTxnWithRetry(cli, clientv3.OpGet(common.UpstreamBoundWorkerKeyAdapter.Encode(worker)),
clientv3.OpGet(common.UpstreamConfigKeyAdapter.Encode(bound.Source)))
if err2 != nil {
return bound, cfg, 0, err2
Expand Down
6 changes: 3 additions & 3 deletions pkg/ha/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func PutRelayStageSourceBound(cli *clientv3.Client, stage Stage, bound SourceBou
ops := make([]clientv3.Op, 0, 2)
ops = append(ops, ops1...)
ops = append(ops, op2)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand All @@ -48,7 +48,7 @@ func DeleteSourceCfgRelayStageSourceBound(cli *clientv3.Client, source, worker s
sourceCfgOp := deleteSourceCfgOp(source)
relayStageOp := deleteRelayStageOp(source)
sourceBoundOp := deleteSourceBoundOp(worker)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, sourceCfgOp, relayStageOp, sourceBoundOp)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, sourceCfgOp, relayStageOp, sourceBoundOp)
return rev, err
}

Expand Down Expand Up @@ -94,6 +94,6 @@ func opSubTaskCfgStage(cli *clientv3.Client, evType mvccpb.Event_EventType,
ops := make([]clientv3.Op, 0, len(ops1)+len(ops2))
ops = append(ops, ops1...)
ops = append(ops, ops2...)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}
4 changes: 2 additions & 2 deletions pkg/ha/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func PutSourceCfg(cli *clientv3.Client, cfg config.SourceConfig) (int64, error)
return 0, err
}
key := common.UpstreamConfigKeyAdapter.Encode(cfg.SourceID)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, clientv3.OpPut(key, value))
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clientv3.OpPut(key, value))
return rev, err
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func ClearTestInfoOperation(cli *clientv3.Client) error {
clearBound := clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Path(), clientv3.WithPrefix())
clearRelayStage := clientv3.OpDelete(common.StageRelayKeyAdapter.Path(), clientv3.WithPrefix())
clearSubTaskStage := clientv3.OpDelete(common.StageSubTaskKeyAdapter.Path(), clientv3.WithPrefix())
_, _, err := etcdutil.DoOpsInOneTxn(cli, clearSource, clearSubTask, clearWorkerInfo, clearBound,
_, _, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clearSource, clearSubTask, clearWorkerInfo, clearBound,
clearWorkerKeepAlive, clearRelayStage, clearSubTaskStage)
return err
}
8 changes: 4 additions & 4 deletions pkg/ha/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func PutRelayStage(cli *clientv3.Client, stages ...Stage) (int64, error) {
if err != nil {
return 0, err
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand All @@ -106,7 +106,7 @@ func PutSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error) {
if err != nil {
return 0, err
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand Down Expand Up @@ -221,7 +221,7 @@ func GetSubTaskStageConfig(cli *clientv3.Client, source string) (map[string]Stag
stm = make(map[string]Stage)
scm = make(map[string]config.SubTaskConfig)
)
txnResp, rev, err := etcdutil.DoOpsInOneTxn(cli, clientv3.OpGet(common.StageSubTaskKeyAdapter.Encode(source), clientv3.WithPrefix()),
txnResp, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clientv3.OpGet(common.StageSubTaskKeyAdapter.Encode(source), clientv3.WithPrefix()),
clientv3.OpGet(common.UpstreamSubTaskKeyAdapter.Encode(source), clientv3.WithPrefix()))
if err != nil {
return stm, scm, 0, err
Expand Down Expand Up @@ -262,7 +262,7 @@ func WatchSubTaskStage(ctx context.Context, cli *clientv3.Client,
// DeleteSubTaskStage deletes the subtask stage.
func DeleteSubTaskStage(cli *clientv3.Client, stages ...Stage) (int64, error) {
ops := deleteSubTaskStageOp(stages...)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ha/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func PutSubTaskCfg(cli *clientv3.Client, cfgs ...config.SubTaskConfig) (int64, e
if err != nil {
return 0, err
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, ops...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, ops...)
return rev, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/ha/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func PutWorkerInfo(cli *clientv3.Client, info WorkerInfo) (int64, error) {
return 0, err
}
key := common.WorkerRegisterKeyAdapter.Encode(info.Name)
_, rev, err := etcdutil.DoOpsInOneTxn(cli, clientv3.OpPut(key, value))
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clientv3.OpPut(key, value))
return rev, err
}

Expand Down Expand Up @@ -96,6 +96,6 @@ func GetAllWorkerInfo(cli *clientv3.Client) (map[string]WorkerInfo, int64, error

// DeleteWorkerInfo deletes the specified DM-worker information.
func DeleteWorkerInfo(cli *clientv3.Client, worker string) (int64, error) {
_, rev, err := etcdutil.DoOpsInOneTxn(cli, clientv3.OpDelete(common.WorkerRegisterKeyAdapter.Encode(worker)))
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, clientv3.OpDelete(common.WorkerRegisterKeyAdapter.Encode(worker)))
return rev, err
}
2 changes: 1 addition & 1 deletion pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func DeleteOperations(cli *clientv3.Client, ops ...Operation) (int64, error) {
for _, op := range ops {
opsDel = append(opsDel, deleteOperationOp(op))
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, opsDel...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...)
return rev, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/shardddl/pessimism/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ func DeleteInfosOperations(cli *clientv3.Client, infos []Info, ops []Operation)
for _, op := range ops {
opsDel = append(opsDel, deleteOperationOp(op))
}
_, rev, err := etcdutil.DoOpsInOneTxn(cli, opsDel...)
_, rev, err := etcdutil.DoOpsInOneTxnWithRetry(cli, opsDel...)
return rev, err
}

0 comments on commit 478e6c6

Please sign in to comment.