Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#4035
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Ehco1996 authored and ti-chi-bot committed Feb 16, 2022
1 parent 520e2b0 commit 8866cbd
Show file tree
Hide file tree
Showing 26 changed files with 979 additions and 261 deletions.
2 changes: 2 additions & 0 deletions dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Unit interface {
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Kill shuts down the process and closes the unit without graceful.
Kill()
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import (
func (t *testServer) testConidtionHub(c *C, s *Server) {
// test condition hub
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true))
c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true))
}
3 changes: 2 additions & 1 deletion dm/dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() {
failpoint.Label("bypass")

// TODO: report the error.
err := s.stopWorker("", true)
// when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737
err := s.stopSourceWorker("", true, false)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return // return if failed to stop the worker.
Expand Down
62 changes: 36 additions & 26 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,13 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
rev = rev1
if relaySource == nil {
<<<<<<< HEAD
=======
if w := s.getSourceWorker(true); w != nil && w.startedRelayBySourceCfg {
break
}
log.L().Info("didn't found relay config after etcd retryable error. Will stop relay now")
>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035))
err = s.disableRelay("")
if err != nil {
log.L().Error("fail to disableRelay after etcd retryable error", zap.Error(err))
Expand All @@ -351,7 +358,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeSourceBound has started a worker
// TODO: add a test for this situation
Expand All @@ -362,7 +369,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand Down Expand Up @@ -437,7 +444,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == bound.Source {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == bound.Source {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeRelayConfig has started a worker
// TODO: add a test for this situation
Expand All @@ -448,7 +455,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand All @@ -475,30 +482,33 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}

func (s *Server) doClose() {
s.cancel()
// close server in advance, stop receiving source bound and relay bound
s.wg.Wait()

s.Lock()
defer s.Unlock()

if s.closed.Load() {
return
}
// close worker and wait for return
if w := s.getWorker(false); w != nil {
w.Close()
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(false); w != nil {
w.Stop(true)
}
s.closed.Store(true)
}

// Close close the RPC server, this function can be called multiple times.
func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()
s.doClose()
if s.etcdClient != nil {
s.etcdClient.Close()
}
}

// if needLock is false, we should make sure Server has been locked in caller.
func (s *Server) getWorker(needLock bool) *SourceWorker {
func (s *Server) getSourceWorker(needLock bool) *SourceWorker {
if needLock {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -531,7 +541,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {
defer s.Unlock()
}
// now setSourceStatus will be concurrently called. skip setting a source status if worker has been closed
if s.getWorker(false) == nil && source != "" {
if s.getSourceWorker(false) == nil && source != "" {
return
}
s.sourceStatus = pb.SourceStatus{
Expand All @@ -549,12 +559,12 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {

// if sourceID is set to "", worker will be closed directly
// if sourceID is not "", we will check sourceID with w.cfg.SourceID.
func (s *Server) stopWorker(sourceID string, needLock bool) error {
func (s *Server) stopSourceWorker(sourceID string, needLock, graceful bool) error {
if needLock {
s.Lock()
defer s.Unlock()
}
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has not been started, no need to stop", zap.String("source", sourceID))
return nil // no need to stop because not started yet
Expand All @@ -565,7 +575,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error {
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.setSourceStatus("", nil, false)
w.Close()
w.Stop(graceful)
return nil
}

Expand Down Expand Up @@ -680,7 +690,7 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b
func (s *Server) disableHandleSubtasks(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source))
return nil
Expand All @@ -689,7 +699,7 @@ func (s *Server) disableHandleSubtasks(source string) error {
var err error
if !w.relayEnabled.Load() {
log.L().Info("relay is not enabled after disabling subtask, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand Down Expand Up @@ -734,7 +744,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro
func (s *Server) disableRelay(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableRelay", zap.Any("relaySource", source))
return nil
Expand All @@ -744,7 +754,7 @@ func (s *Server) disableRelay(source string) error {
var err error
if !w.subTaskEnabled.Load() {
log.L().Info("subtask is not enabled after disabling relay, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand All @@ -760,7 +770,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
SourceStatus: &sourceStatus,
}

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker")
resp.Result = false
Expand All @@ -782,7 +792,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
// PurgeRelay implements WorkerServer.PurgeRelay.
func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req))
w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -799,7 +809,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb
func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -826,7 +836,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou
defer s.Unlock()
}

if w := s.getWorker(false); w != nil {
if w := s.getSourceWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID))
return w, nil
Expand Down Expand Up @@ -917,7 +927,7 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) (
func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "HandleError"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand Down
33 changes: 20 additions & 13 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (t *testServer) TestServer(c *C) {
_, err = ha.DeleteSubTaskStage(s.etcdClient, ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name))
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
return s.getSourceWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
}), IsTrue)

dupServer := NewServer(cfg)
Expand Down Expand Up @@ -338,13 +338,13 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)

_, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
}

Expand Down Expand Up @@ -414,19 +414,19 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
}()
// step 4.1: should stop the running worker, source bound has been deleted, should stop this worker
c.Assert(utils.WaitSomething(20, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
// step 4.2: put a new source bound, source should be started
_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 := s.getWorker(true).cfg
cfg2 := s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel1()
wg.Wait()
c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil)
c.Assert(s.stopSourceWorker(sourceCfg.SourceID, true, true), IsNil)
// step 5: start observeSourceBound from compacted revision again, should start worker
ctx2, cancel2 := context.WithCancel(ctx)
wg.Add(1)
Expand All @@ -435,9 +435,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
c.Assert(s.observeSourceBound(ctx2, startRev), IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 = s.getWorker(true).cfg
cfg2 = s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel2()
wg.Wait()
Expand Down Expand Up @@ -481,13 +481,13 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be started and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := s.getWorker(true)
w := s.getSourceWorker(true)
return w != nil && !w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
} else {
// worker should be started before stopped
w := s.getWorker(true)
w := s.getSourceWorker(true)
c.Assert(w, NotNil)
c.Assert(w.closed.Load(), IsFalse)
_, err := ha.DeleteRelayConfig(s.etcdClient, w.name)
Expand All @@ -496,7 +496,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be closed and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
currentWorker := s.getWorker(true)
currentWorker := s.getSourceWorker(true)
return currentWorker == nil && w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
Expand All @@ -507,7 +507,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, etcd *embed.Etcd, d
etcd.Close()
time.Sleep(6 * time.Second)
// When worker server fail to keepalive with etcd, server should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "http://"+hostName)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -551,8 +551,15 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) {

func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) {
etcd.Close()
<<<<<<< HEAD
time.Sleep(retryConnectSleepTime + time.Duration(defaultKeepAliveTTL+3)*time.Second)
c.Assert(s.getWorker(true), IsNil)
=======
c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool {
return s.getSourceWorker(true) == nil
}), IsTrue)
c.Assert(s.getSourceWorker(true), IsNil)
>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035))
}

func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
Expand Down
10 changes: 7 additions & 3 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (w *SourceWorker) Start() {
}

// Close stops working and releases resources.
func (w *SourceWorker) Close() {
func (w *SourceWorker) Stop(graceful bool) {
if w.closed.Load() {
w.l.Warn("already closed")
return
Expand All @@ -204,8 +204,12 @@ func (w *SourceWorker) Close() {
w.Lock()
defer w.Unlock()

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
// close or kill all subtasks
if graceful {
w.subTaskHolder.closeAllSubTasks()
} else {
w.subTaskHolder.killAllSubTasks()
}

if w.relayHolder != nil {
w.relayHolder.Close()
Expand Down
Loading

0 comments on commit 8866cbd

Please sign in to comment.