Skip to content

Commit

Permalink
ddl: handle cluster state retrieval failure (#53816)
Browse files Browse the repository at this point in the history
close #53808
  • Loading branch information
tangenta authored Jun 6, 2024
1 parent 793530a commit b8c0865
Showing 1 changed file with 25 additions and 5 deletions.
30 changes: 25 additions & 5 deletions pkg/ddl/job_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,29 @@ func (d *ddl) startLocalWorkerLoop() {
}

func (s *jobScheduler) startDispatchLoop() {
const retryInterval = 3 * time.Second
for {
err := s.startDispatch()
if err == context.Canceled {
logutil.DDLLogger().Info("startDispatchLoop quit due to context canceled")
return
}
logutil.DDLLogger().Warn("startDispatchLoop failed, retrying",
zap.Error(err))

select {
case <-s.schCtx.Done():
logutil.DDLLogger().Info("startDispatchLoop quit due to context done")
return
case <-time.After(retryInterval):
}
}
}

func (s *jobScheduler) startDispatch() error {
sessCtx, err := s.sessPool.Get()
if err != nil {
logutil.DDLLogger().Fatal("dispatch loop get session failed, it should not happen, please try restart TiDB", zap.Error(err))
return errors.Trace(err)
}
defer s.sessPool.Put(sessCtx)
se := sess.NewSession(sessCtx)
Expand All @@ -363,15 +383,15 @@ func (s *jobScheduler) startDispatchLoop() {
notifyDDLJobByEtcdCh = s.etcdCli.Watch(s.schCtx, addingDDLJobNotifyKey)
}
if err := s.checkAndUpdateClusterState(true); err != nil {
logutil.DDLLogger().Fatal("dispatch loop get cluster state failed, it should not happen, please try restart TiDB", zap.Error(err))
return errors.Trace(err)
}
ticker := time.NewTicker(dispatchLoopWaitingDuration)
defer ticker.Stop()
// TODO move waitSchemaSyncedController out of ddlCtx.
s.clearOnceMap()
for {
if s.schCtx.Err() != nil {
return
if err := s.schCtx.Err(); err != nil {
return err
}
failpoint.Inject("ownerResignAfterDispatchLoopCheck", func() {
if ingest.ResignOwnerForTest.Load() {
Expand All @@ -393,7 +413,7 @@ func (s *jobScheduler) startDispatchLoop() {
continue
}
case <-s.schCtx.Done():
return
return s.schCtx.Err()
}
if err := s.checkAndUpdateClusterState(false); err != nil {
continue
Expand Down

0 comments on commit b8c0865

Please sign in to comment.