Skip to content

Commit

Permalink
.*/: handle etcd retryable error when using etcd client's `Get with r…
Browse files Browse the repository at this point in the history
…evision` (pingcap#518)

* Reset get error in watcher like we did in pingcap#499.
* Get relative etcd info in one etcd transaction to avoid using get with rev.
  • Loading branch information
lichunzhu authored Mar 9, 2020
1 parent 8ff04d2 commit a711a1c
Show file tree
Hide file tree
Showing 19 changed files with 457 additions and 193 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ ErrWorkerRelayIsPurging,[code=40068:class=dm-worker:scope=internal:level=high],"
ErrWorkerHostPortNotValid,[code=40069:class=dm-worker:scope=internal:level=high],"host:port '%s' not valid"
ErrWorkerSourceNotMatch,[code=40072:class=dm-worker:scope=internal:level=high],"source of request does not match with source in worker"
ErrWorkerFailToGetSubtaskConfigFromEtcd,[code=40073:class=dm-worker:scope=internal:level=medium],"there is no relative subtask config for task %s in etcd"
ErrWorkerFailToGetSourceConfigFromEtcd,[code=40074:class=dm-worker:scope=internal:level=medium],"there is no relative source config for source %s in etcd"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium],"parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium],"config toml transform"
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium],"'%s' is an invalid flag"
Expand Down
2 changes: 1 addition & 1 deletion dm/master/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ func (s *Scheduler) GetExpectSubTaskStage(task, source string) ha.Stage {
// recoverSourceCfgs recovers history source configs and expectant relay stages from etcd.
func (s *Scheduler) recoverSources(cli *clientv3.Client) error {
// get all source configs.
cfgM, _, err := ha.GetAllSourceCfg(cli)
cfgM, _, err := ha.GetSourceCfg(cli, "", 0)
if err != nil {
return err
}
Expand Down
31 changes: 21 additions & 10 deletions dm/master/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,16 +473,17 @@ func (t *testScheduler) TestScheduler(c *C) {

func (t *testScheduler) sourceCfgNotExist(c *C, s *Scheduler, source string) {
c.Assert(s.GetSourceCfgByID(source), IsNil)
cfg, _, err := ha.GetSourceCfg(etcdTestCli, source, 0)
scm, _, err := ha.GetSourceCfg(etcdTestCli, source, 0)
c.Assert(err, IsNil)
c.Assert(cfg, DeepEquals, sourceCfgEmpty)
c.Assert(scm, HasLen, 0)
}

func (t *testScheduler) sourceCfgExist(c *C, s *Scheduler, expectCfg config.SourceConfig) {
cfgP := s.GetSourceCfgByID(expectCfg.SourceID)
c.Assert(cfgP, DeepEquals, &expectCfg)
cfgV, _, err := ha.GetSourceCfg(etcdTestCli, expectCfg.SourceID, 0)
scm, _, err := ha.GetSourceCfg(etcdTestCli, expectCfg.SourceID, 0)
c.Assert(err, IsNil)
cfgV := scm[expectCfg.SourceID]
c.Assert(cfgV, DeepEquals, expectCfg)
}

Expand Down Expand Up @@ -551,15 +552,15 @@ func (t *testScheduler) workerFree(c *C, s *Scheduler, worker string) {
func (t *testScheduler) workerBound(c *C, s *Scheduler, bound ha.SourceBound) {
w := s.GetWorkerByName(bound.Worker)
c.Assert(w, NotNil)
c.Assert(w.Bound(), DeepEquals, bound)
boundDeepEqualExcludeRev(c, w.Bound(), bound)
c.Assert(w.Stage(), Equals, WorkerBound)
wm, _, err := ha.GetAllWorkerInfo(etcdTestCli)
c.Assert(err, IsNil)
_, ok := wm[bound.Worker]
c.Assert(ok, IsTrue)
sbm, _, err := ha.GetSourceBound(etcdTestCli, bound.Worker)
c.Assert(err, IsNil)
c.Assert(sbm[bound.Worker], DeepEquals, bound)
boundDeepEqualExcludeRev(c, sbm[bound.Worker], bound)
}

func (t *testScheduler) sourceBounds(c *C, s *Scheduler, expectBounds, expectUnbounds []string) {
Expand All @@ -578,38 +579,48 @@ func (t *testScheduler) sourceBounds(c *C, s *Scheduler, expectBounds, expectUnb
c.Assert(sToB[source], NotNil)
c.Assert(s.GetWorkerBySource(source), NotNil)
c.Assert(s.GetWorkerBySource(source).Stage(), Equals, WorkerBound)
c.Assert(sToB[source], DeepEquals, s.GetWorkerBySource(source).Bound())
boundDeepEqualExcludeRev(c, sToB[source], s.GetWorkerBySource(source).Bound())
}

for _, source := range expectUnbounds {
c.Assert(s.GetWorkerBySource(source), IsNil)
}
}

func boundDeepEqualExcludeRev(c *C, bound, expectBound ha.SourceBound) {
expectBound.Revision = bound.Revision
c.Assert(bound, DeepEquals, expectBound)
}

func stageDeepEqualExcludeRev(c *C, stage, expectStage ha.Stage) {
expectStage.Revision = stage.Revision
c.Assert(stage, DeepEquals, expectStage)
}

func (t *testScheduler) relayStageMatch(c *C, s *Scheduler, source string, expectStage pb.Stage) {
stage := ha.NewRelayStage(expectStage, source)
c.Assert(s.GetExpectRelayStage(source), DeepEquals, stage)
stageDeepEqualExcludeRev(c, s.GetExpectRelayStage(source), stage)

eStage, _, err := ha.GetRelayStage(etcdTestCli, source)
c.Assert(err, IsNil)
switch expectStage {
case pb.Stage_Running, pb.Stage_Paused:
c.Assert(eStage, DeepEquals, stage)
stageDeepEqualExcludeRev(c, eStage, stage)
default:
c.Assert(eStage, DeepEquals, stageEmpty)
}
}

func (t *testScheduler) subTaskStageMatch(c *C, s *Scheduler, task, source string, expectStage pb.Stage) {
stage := ha.NewSubTaskStage(expectStage, source, task)
c.Assert(s.GetExpectSubTaskStage(task, source), DeepEquals, stage)
stageDeepEqualExcludeRev(c, s.GetExpectSubTaskStage(task, source), stage)

eStageM, _, err := ha.GetSubTaskStage(etcdTestCli, source, task)
c.Assert(err, IsNil)
switch expectStage {
case pb.Stage_Running, pb.Stage_Paused:
c.Assert(eStageM, HasLen, 1)
c.Assert(eStageM[task], DeepEquals, stage)
stageDeepEqualExcludeRev(c, eStageM[task], stage)
default:
c.Assert(eStageM, HasLen, 0)
}
Expand Down
16 changes: 10 additions & 6 deletions dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -989,10 +989,9 @@ func (t *testMaster) TestOperateSource(c *check.C) {
Result: true,
Source: sourceID,
}})
cfg, _, err := ha.GetSourceCfg(etcdTestCli, sourceID, 0)
scm, _, err := ha.GetSourceCfg(etcdTestCli, sourceID, 0)
c.Assert(err, check.IsNil)
var emptySourceCfg config.SourceConfig
c.Assert(cfg, check.DeepEquals, emptySourceCfg)
c.Assert(scm, check.HasLen, 0)
cancel()
}

Expand Down Expand Up @@ -1043,15 +1042,20 @@ func (t *testMaster) TestOfflineWorker(c *check.C) {
}
}

func stageDeepEqualExcludeRev(c *check.C, stage, expectStage ha.Stage) {
expectStage.Revision = stage.Revision
c.Assert(stage, check.DeepEquals, expectStage)
}

func (t *testMaster) relayStageMatch(c *check.C, s *scheduler.Scheduler, source string, expectStage pb.Stage) {
stage := ha.NewRelayStage(expectStage, source)
c.Assert(s.GetExpectRelayStage(source), check.DeepEquals, stage)
stageDeepEqualExcludeRev(c, s.GetExpectRelayStage(source), stage)

eStage, _, err := ha.GetRelayStage(etcdTestCli, source)
c.Assert(err, check.IsNil)
switch expectStage {
case pb.Stage_Running, pb.Stage_Paused:
c.Assert(eStage, check.DeepEquals, stage)
stageDeepEqualExcludeRev(c, eStage, stage)
}
}

Expand All @@ -1064,7 +1068,7 @@ func (t *testMaster) subTaskStageMatch(c *check.C, s *scheduler.Scheduler, task,
switch expectStage {
case pb.Stage_Running, pb.Stage_Paused:
c.Assert(eStageM, check.HasLen, 1)
c.Assert(eStageM[task], check.DeepEquals, stage)
stageDeepEqualExcludeRev(c, eStageM[task], stage)
default:
c.Assert(eStageM, check.HasLen, 0)
}
Expand Down
39 changes: 21 additions & 18 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,21 @@ func (s *Server) Start() error {
return err
}

bsm, revBound, err := ha.GetSourceBound(s.etcdClient, s.cfg.Name)
bound, sourceCfg, revBound, err := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err != nil {
// TODO: need retry
return err
}
if bound, ok := bsm[s.cfg.Name]; ok {
if !bound.IsEmpty() {
log.L().Warn("worker has been assigned source before keepalive")
err = s.operateSourceBound(bound)
err = s.startWorker(&sourceCfg)
s.setSourceStatus(bound.Source, err, true)
if err != nil {
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.String("source", bound.Source), zap.Error(err))
zap.Stringer("bound", bound), zap.Error(err))
}
}

s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand Down Expand Up @@ -187,25 +188,25 @@ func (s *Server) observeSourceBound(ctx context.Context, etcdCli *clientv3.Clien
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
bsm, rev1, err1 := ha.GetSourceBound(s.etcdClient, s.cfg.Name)
bound, cfg, rev1, err1 := ha.GetSourceBoundConfig(s.etcdClient, s.cfg.Name)
if err1 != nil {
log.L().Error("get source bound from etcd failed, will retry later", zap.Error(err1), zap.Int("retryNum", retryNum))
break
}
rev = rev1
if bound, ok := bsm[s.cfg.Name]; ok {
if bound.IsEmpty() {
s.stopWorker("")
} else {
if w := s.getWorker(true); w != nil && w.cfg.SourceID == bound.Source {
continue
}
s.stopWorker("")
err1 = s.operateSourceBound(bound)
err1 = s.startWorker(&cfg)
s.setSourceStatus(bound.Source, err1, true)
if err1 != nil {
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.String("source", bound.Source), zap.Error(err1))
zap.Stringer("bound", bound), zap.Error(err1))
}
} else {
s.stopWorker("")
}
}
retryNum++
Expand Down Expand Up @@ -345,7 +346,10 @@ func (s *Server) handleSourceBound(ctx context.Context, boundCh chan ha.SourceBo
// record the reason for operating source bound
// TODO: add better metrics
log.L().Error("fail to operate sourceBound on worker", zap.String("worker", s.cfg.Name),
zap.String("source", bound.Source), zap.Error(err))
zap.Stringer("bound", bound), zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
case err := <-errCh:
// TODO: Deal with err
Expand All @@ -361,11 +365,15 @@ func (s *Server) operateSourceBound(bound ha.SourceBound) error {
if bound.IsDeleted {
return s.stopWorker(bound.Source)
}
sourceCfg, _, err := ha.GetSourceCfg(s.etcdClient, bound.Source, bound.Revision)
scm, _, err := ha.GetSourceCfg(s.etcdClient, bound.Source, bound.Revision)
if err != nil {
// TODO: need retry
return err
}
sourceCfg, ok := scm[bound.Source]
if !ok {
return terror.ErrWorkerFailToGetSourceConfigFromEtcd.Generate(bound.Source)
}
return s.startWorker(&sourceCfg)
}

Expand Down Expand Up @@ -699,12 +707,7 @@ func (s *Server) startWorker(cfg *config.SourceConfig) error {

// we get the newest subtask stages directly which will omit the subtask stage PUT/DELETE event
// because triggering these events is useless now
subTaskStages, revSubTask, err := ha.GetSubTaskStage(s.etcdClient, cfg.SourceID, "")
if err != nil {
// TODO: need retry
return err
}
subTaskCfgm, _, err := ha.GetSubTaskCfg(s.etcdClient, cfg.SourceID, "", revSubTask)
subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(s.etcdClient, cfg.SourceID)
if err != nil {
// TODO: need retry
return err
Expand Down
7 changes: 7 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (t *testServer) TestServer(c *C) {
}), IsTrue)
dir := c.MkDir()

t.testOperateSourceBoundWithoutConfigInEtcd(c, s)

t.testOperateWorker(c, s, dir, true)

// check worker would retry connecting master rather than stop worker directly.
Expand Down Expand Up @@ -325,6 +327,11 @@ func (t *testServer) createClient(c *C, addr string) pb.WorkerClient {
return pb.NewWorkerClient(conn)
}

func (t *testServer) testOperateSourceBoundWithoutConfigInEtcd(c *C, s *Server) {
err := s.operateSourceBound(ha.NewSourceBound("sourceWithoutConfigInEtcd", s.cfg.Name))
c.Assert(terror.ErrWorkerFailToGetSourceConfigFromEtcd.Equal(err), IsTrue)
}

func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) {
// load sourceCfg
sourceCfg := loadSourceConfigWithoutPassword(c)
Expand Down
13 changes: 5 additions & 8 deletions dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,7 @@ func (w *Worker) QueryError(name string) []*pb.SubTaskError {
}

func (w *Worker) resetSubtaskStage(etcdCli *clientv3.Client) (int64, error) {
subTaskStages, revSubTask, err := ha.GetSubTaskStage(etcdCli, w.cfg.SourceID, "")
if err != nil {
return 0, err
}
subTaskCfgm, _, err := ha.GetSubTaskCfg(etcdCli, w.cfg.SourceID, "", revSubTask)
subTaskStages, subTaskCfgm, revSubTask, err := ha.GetSubTaskStageConfig(etcdCli, w.cfg.SourceID)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -402,6 +398,9 @@ func (w *Worker) handleSubTaskStage(ctx context.Context, stageCh chan ha.Stage,
if err != nil {
// TODO: add better metrics
log.L().Error("fail to operate subtask stage", zap.Stringer("stage", stage), zap.Error(err))
if etcdutil.IsRetryableError(err) {
return err
}
}
case err := <-errCh:
// TODO: deal with err
Expand Down Expand Up @@ -483,9 +482,7 @@ func (w *Worker) observeRelayStage(ctx context.Context, etcdCli *clientv3.Client
break
}
rev = rev1
var emptyStage ha.Stage
// emptyStage means this stage has been deleted and relay has been stopped
if emptyStage == stage {
if stage.IsEmpty() {
stage.IsDeleted = true
}
err1 = w.operateRelayStage(ctx, stage)
Expand Down
6 changes: 3 additions & 3 deletions pkg/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func AddMember(client *clientv3.Client, peerAddrs []string) (*clientv3.MemberAdd
}

// DoOpsInOneTxn do multiple etcd operations in one txn.
func DoOpsInOneTxn(cli *clientv3.Client, ops ...clientv3.Op) (int64, error) {
func DoOpsInOneTxn(cli *clientv3.Client, ops ...clientv3.Op) (*clientv3.TxnResponse, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), DefaultRequestTimeout)
defer cancel()

resp, err := cli.Txn(ctx).Then(ops...).Commit()
if err != nil {
return 0, err
return nil, 0, err
}
return resp.Header.Revision, nil
return resp, resp.Header.Revision, nil
}

// IsRetryableError check whether error is retryable error for etcd to build again
Expand Down
Loading

0 comments on commit a711a1c

Please sign in to comment.