Skip to content

Commit

Permalink
unit(dm): add Kill func for unit (pingcap#4035)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 authored Feb 16, 2022
1 parent 744a26e commit e7b0aae
Show file tree
Hide file tree
Showing 26 changed files with 614 additions and 402 deletions.
2 changes: 2 additions & 0 deletions dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Unit interface {
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Kill shuts down the process and closes the unit without graceful.
Kill()
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import (
func (t *testServer) testConidtionHub(c *C, s *Server) {
// test condition hub
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true))
c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true))
}
3 changes: 2 additions & 1 deletion dm/dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() {
failpoint.Label("bypass")

// TODO: report the error.
err := s.stopWorker("", true)
// when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737
err := s.stopSourceWorker("", true, false)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return // return if failed to stop the worker.
Expand Down
57 changes: 30 additions & 27 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
rev = rev1
if relaySource == nil {
if w := s.getWorker(true); w != nil && w.startedRelayBySourceCfg {
if w := s.getSourceWorker(true); w != nil && w.startedRelayBySourceCfg {
break
}
log.L().Info("didn't found relay config after etcd retryable error. Will stop relay now")
Expand All @@ -314,7 +314,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeSourceBound has started a worker
// TODO: add a test for this situation
Expand All @@ -325,7 +325,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand Down Expand Up @@ -403,7 +403,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == bound.Source {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == bound.Source {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeRelayConfig has started a worker
// TODO: add a test for this situation
Expand All @@ -414,7 +414,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand All @@ -441,30 +441,33 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}

func (s *Server) doClose() {
s.cancel()
// close server in advance, stop receiving source bound and relay bound
s.wg.Wait()

s.Lock()
defer s.Unlock()

if s.closed.Load() {
return
}
// close worker and wait for return
if w := s.getWorker(false); w != nil {
w.Close()
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(false); w != nil {
w.Stop(true)
}
s.closed.Store(true)
}

// Close close the RPC server, this function can be called multiple times.
func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()
s.doClose()
if s.etcdClient != nil {
s.etcdClient.Close()
}
}

// if needLock is false, we should make sure Server has been locked in caller.
func (s *Server) getWorker(needLock bool) *SourceWorker {
func (s *Server) getSourceWorker(needLock bool) *SourceWorker {
if needLock {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -497,7 +500,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {
defer s.Unlock()
}
// now setSourceStatus will be concurrently called. skip setting a source status if worker has been closed
if s.getWorker(false) == nil && source != "" {
if s.getSourceWorker(false) == nil && source != "" {
return
}
s.sourceStatus = pb.SourceStatus{
Expand All @@ -515,12 +518,12 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {

// if sourceID is set to "", worker will be closed directly
// if sourceID is not "", we will check sourceID with w.cfg.SourceID.
func (s *Server) stopWorker(sourceID string, needLock bool) error {
func (s *Server) stopSourceWorker(sourceID string, needLock, graceful bool) error {
if needLock {
s.Lock()
defer s.Unlock()
}
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has not been started, no need to stop", zap.String("source", sourceID))
return nil // no need to stop because not started yet
Expand All @@ -531,7 +534,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error {
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.setSourceStatus("", nil, false)
w.Close()
w.Stop(graceful)
return nil
}

Expand Down Expand Up @@ -659,7 +662,7 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b
func (s *Server) disableHandleSubtasks(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source))
return nil
Expand All @@ -676,7 +679,7 @@ func (s *Server) disableHandleSubtasks(source string) error {
var err error
if !w.relayEnabled.Load() {
log.L().Info("relay is not enabled after disabling subtask, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand Down Expand Up @@ -721,7 +724,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro
func (s *Server) disableRelay(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableRelay", zap.Any("relaySource", source))
return nil
Expand All @@ -731,7 +734,7 @@ func (s *Server) disableRelay(source string) error {
var err error
if !w.subTaskEnabled.Load() {
log.L().Info("subtask is not enabled after disabling relay, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand All @@ -747,7 +750,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
SourceStatus: &sourceStatus,
}

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker")
resp.Result = false
Expand All @@ -769,7 +772,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
// PurgeRelay implements WorkerServer.PurgeRelay.
func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req))
w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -786,7 +789,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb
func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -813,7 +816,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou
defer s.Unlock()
}

if w := s.getWorker(false); w != nil {
if w := s.getSourceWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID))
return w, nil
Expand Down Expand Up @@ -904,7 +907,7 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) (
func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "HandleError"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand Down
30 changes: 15 additions & 15 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (t *testServer) TestServer(c *C) {
_, err = ha.DeleteSubTaskStage(s.etcdClient, ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name))
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
return s.getSourceWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
}), IsTrue)

dupServer := NewServer(cfg)
Expand Down Expand Up @@ -337,13 +337,13 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)

_, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
}

Expand Down Expand Up @@ -413,19 +413,19 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
}()
// step 4.1: should stop the running worker, source bound has been deleted, should stop this worker
c.Assert(utils.WaitSomething(20, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
// step 4.2: put a new source bound, source should be started
_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 := s.getWorker(true).cfg
cfg2 := s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel1()
wg.Wait()
c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil)
c.Assert(s.stopSourceWorker(sourceCfg.SourceID, true, true), IsNil)
// step 5: start observeSourceBound from compacted revision again, should start worker
ctx2, cancel2 := context.WithCancel(ctx)
wg.Add(1)
Expand All @@ -434,9 +434,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
c.Assert(s.observeSourceBound(ctx2, startRev), IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 = s.getWorker(true).cfg
cfg2 = s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel2()
wg.Wait()
Expand Down Expand Up @@ -480,13 +480,13 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be started and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := s.getWorker(true)
w := s.getSourceWorker(true)
return w != nil && !w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
} else {
// worker should be started before stopped
w := s.getWorker(true)
w := s.getSourceWorker(true)
c.Assert(w, NotNil)
c.Assert(w.closed.Load(), IsFalse)
_, err := ha.DeleteRelayConfig(s.etcdClient, w.name)
Expand All @@ -495,7 +495,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be closed and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
currentWorker := s.getWorker(true)
currentWorker := s.getSourceWorker(true)
return currentWorker == nil && w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
Expand All @@ -506,7 +506,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, etcd *embed.Etcd, d
etcd.Close()
time.Sleep(6 * time.Second)
// When worker server fail to keepalive with etcd, server should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "http://"+hostName)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -551,9 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) {
func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) {
etcd.Close()
c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceWorker(true), IsNil)
}

func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
Expand Down
10 changes: 7 additions & 3 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (w *SourceWorker) Start() {
}

// Close stops working and releases resources.
func (w *SourceWorker) Close() {
func (w *SourceWorker) Stop(graceful bool) {
if w.closed.Load() {
w.l.Warn("already closed")
return
Expand All @@ -218,8 +218,12 @@ func (w *SourceWorker) Close() {
w.Lock()
defer w.Unlock()

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
// close or kill all subtasks
if graceful {
w.subTaskHolder.closeAllSubTasks()
} else {
w.subTaskHolder.killAllSubTasks()
}

if w.relayHolder != nil {
w.relayHolder.Close()
Expand Down
Loading

0 comments on commit e7b0aae

Please sign in to comment.