Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unit(dm): add Kill func for unit #4035

Merged
merged 63 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c7c58bd
change interface
Ehco1996 Dec 23, 2021
b2b8a77
rename
Ehco1996 Dec 23, 2021
273cdbb
adjust close
Ehco1996 Dec 23, 2021
c7f833c
graceful stop worker when lost ha
Ehco1996 Dec 23, 2021
bb57be6
no graceful end in syncer
Ehco1996 Dec 24, 2021
5acafed
save work
Ehco1996 Dec 29, 2021
0e68f24
fix comment
Ehco1996 Dec 30, 2021
6081bce
address comment and trigger test
Ehco1996 Jan 4, 2022
1ec3c56
rollback checkpoint before close schematracker
Ehco1996 Jan 12, 2022
63e87ca
use runctx to cancel dml worker
Ehco1996 Jan 12, 2022
5798b60
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 13, 2022
d743f1d
address comment
Ehco1996 Jan 14, 2022
9b9a558
address some comments
Ehco1996 Jan 17, 2022
5d88e4b
savework: refine syncer ctx
Ehco1996 Jan 17, 2022
5ca9276
refine syncer.RUN
Ehco1996 Jan 19, 2022
b63daa7
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 19, 2022
1302a75
revert test
Ehco1996 Jan 19, 2022
d792a12
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 19, 2022
a813189
make test steable
Ehco1996 Jan 19, 2022
922faa0
fix ha test lib
Ehco1996 Jan 20, 2022
9ffaf25
fix lazy_init_tracker
Ehco1996 Jan 20, 2022
9c2ea48
fix clean up data
Ehco1996 Jan 20, 2022
937cdcc
fix event ctx
Ehco1996 Jan 20, 2022
57ba616
fix syncer.done
Ehco1996 Jan 20, 2022
adb7901
fix kill
Ehco1996 Jan 20, 2022
a5acf28
addresss comment
Ehco1996 Jan 20, 2022
d9f42f6
add log for debug
Ehco1996 Jan 20, 2022
31727e0
fix test case
Ehco1996 Jan 20, 2022
1c59142
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 21, 2022
22a7e72
fix comments
Ehco1996 Jan 21, 2022
e01e2ea
rename func about sourceWorker
Ehco1996 Jan 21, 2022
776a2a2
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 21, 2022
06d4363
fix comments
Ehco1996 Jan 21, 2022
7a39654
merge one ctx
Ehco1996 Jan 25, 2022
a73aa0b
rename ctx
Ehco1996 Jan 25, 2022
705c228
address comments
Ehco1996 Jan 26, 2022
e881437
fix shell && revert some logic of stop worker
Ehco1996 Jan 28, 2022
a06c5e8
add ui for waitBeforeRunExit
Ehco1996 Jan 28, 2022
1e51bec
address comment
Ehco1996 Feb 8, 2022
63a235e
fix lint
Ehco1996 Feb 8, 2022
fb5f21f
ungraceful exit the dml pipeline
Ehco1996 Feb 8, 2022
f612399
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 8, 2022
54f1edc
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 8, 2022
b851b62
fix merge master
Ehco1996 Feb 8, 2022
6e9d1ad
fix datarace
Ehco1996 Feb 8, 2022
7aca258
add syncer ctx to sync pipeline
Ehco1996 Feb 8, 2022
0f5e220
fix ut panic
Ehco1996 Feb 8, 2022
60fc3c1
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 10, 2022
cd7903c
address comments
Ehco1996 Feb 10, 2022
05ca26b
revert ungraceful close job pipeline
Ehco1996 Feb 11, 2022
9204bbf
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 11, 2022
4fcad7d
fix datarace
Ehco1996 Feb 11, 2022
ddc3ea5
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 13, 2022
c7c1e82
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 14, 2022
d6484a6
move init checkpoint worker to init && revert a kill
Ehco1996 Feb 14, 2022
270925c
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 14, 2022
76180db
fix check graceful stop
Ehco1996 Feb 14, 2022
b15a9b2
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 15, 2022
4809d42
address comments
Ehco1996 Feb 15, 2022
f7df28a
remove sleep
Ehco1996 Feb 16, 2022
bdf8ac8
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 16, 2022
603bf19
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 16, 2022
311e86f
Merge branch 'master' into graceful-close-unit
ti-chi-bot Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Unit interface {
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Kill shuts down the process and closes the unit without graceful.
Kill()
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import (
func (t *testServer) testConidtionHub(c *C, s *Server) {
// test condition hub
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true))
c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true))
}
3 changes: 2 additions & 1 deletion dm/dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() {
failpoint.Label("bypass")

// TODO: report the error.
err := s.stopWorker("", true)
// when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737
err := s.stopSourceWorker("", true, false)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return // return if failed to stop the worker.
Expand Down
57 changes: 30 additions & 27 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
rev = rev1
if relaySource == nil {
if w := s.getWorker(true); w != nil && w.startedRelayBySourceCfg {
if w := s.getSourceWorker(true); w != nil && w.startedRelayBySourceCfg {
break
}
log.L().Info("didn't found relay config after etcd retryable error. Will stop relay now")
Expand All @@ -314,7 +314,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeSourceBound has started a worker
// TODO: add a test for this situation
Expand All @@ -325,7 +325,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand Down Expand Up @@ -403,7 +403,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
s.Lock()
defer s.Unlock()

if w := s.getWorker(false); w != nil && w.cfg.SourceID == bound.Source {
if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == bound.Source {
// we may face both relay config and subtask bound changed in a compaction error, so here
// we check if observeRelayConfig has started a worker
// TODO: add a test for this situation
Expand All @@ -414,7 +414,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopSourceWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand All @@ -441,30 +441,33 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}

func (s *Server) doClose() {
s.cancel()
// close server in advance, stop receiving source bound and relay bound
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
s.wg.Wait()
lance6716 marked this conversation as resolved.
Show resolved Hide resolved

s.Lock()
defer s.Unlock()

if s.closed.Load() {
return
}
// close worker and wait for return
if w := s.getWorker(false); w != nil {
w.Close()
// stop server in advance, stop receiving source bound and relay bound
s.cancel()
s.wg.Wait()
// stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker)
if w := s.getSourceWorker(false); w != nil {
w.Stop(true)
}
s.closed.Store(true)
}

// Close close the RPC server, this function can be called multiple times.
func (s *Server) Close() {
s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing
s.stopKeepAlive()
s.doClose()
if s.etcdClient != nil {
s.etcdClient.Close()
}
}

// if needLock is false, we should make sure Server has been locked in caller.
func (s *Server) getWorker(needLock bool) *SourceWorker {
func (s *Server) getSourceWorker(needLock bool) *SourceWorker {
if needLock {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -497,7 +500,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {
defer s.Unlock()
}
// now setSourceStatus will be concurrently called. skip setting a source status if worker has been closed
if s.getWorker(false) == nil && source != "" {
if s.getSourceWorker(false) == nil && source != "" {
return
}
s.sourceStatus = pb.SourceStatus{
Expand All @@ -515,12 +518,12 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {

// if sourceID is set to "", worker will be closed directly
// if sourceID is not "", we will check sourceID with w.cfg.SourceID.
func (s *Server) stopWorker(sourceID string, needLock bool) error {
func (s *Server) stopSourceWorker(sourceID string, needLock, graceful bool) error {
if needLock {
s.Lock()
defer s.Unlock()
}
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has not been started, no need to stop", zap.String("source", sourceID))
return nil // no need to stop because not started yet
Expand All @@ -531,7 +534,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error {
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.setSourceStatus("", nil, false)
w.Close()
w.Stop(graceful)
return nil
}

Expand Down Expand Up @@ -659,7 +662,7 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b
func (s *Server) disableHandleSubtasks(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source))
return nil
Expand All @@ -676,7 +679,7 @@ func (s *Server) disableHandleSubtasks(source string) error {
var err error
if !w.relayEnabled.Load() {
log.L().Info("relay is not enabled after disabling subtask, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand Down Expand Up @@ -721,7 +724,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro
func (s *Server) disableRelay(source string) error {
s.Lock()
defer s.Unlock()
w := s.getWorker(false)
w := s.getSourceWorker(false)
if w == nil {
log.L().Warn("worker has already stopped before DisableRelay", zap.Any("relaySource", source))
return nil
Expand All @@ -731,7 +734,7 @@ func (s *Server) disableRelay(source string) error {
var err error
if !w.subTaskEnabled.Load() {
log.L().Info("subtask is not enabled after disabling relay, so stop worker")
err = s.stopWorker(source, false)
err = s.stopSourceWorker(source, false, true)
}
return err
}
Expand All @@ -747,7 +750,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
SourceStatus: &sourceStatus,
}

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker")
resp.Result = false
Expand All @@ -769,7 +772,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (*
// PurgeRelay implements WorkerServer.PurgeRelay.
func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req))
w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -786,7 +789,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb
func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand All @@ -813,7 +816,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou
defer s.Unlock()
}

if w := s.getWorker(false); w != nil {
if w := s.getSourceWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID))
return w, nil
Expand Down Expand Up @@ -904,7 +907,7 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) (
func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (*pb.CommonWorkerResponse, error) {
log.L().Info("", zap.String("request", "HandleError"), zap.Stringer("payload", req))

w := s.getWorker(true)
w := s.getSourceWorker(true)
if w == nil {
log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker")
return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil
Expand Down
30 changes: 15 additions & 15 deletions dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func (t *testServer) TestServer(c *C) {
_, err = ha.DeleteSubTaskStage(s.etcdClient, ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name))
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
return s.getSourceWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil
}), IsTrue)

dupServer := NewServer(cfg)
Expand Down Expand Up @@ -337,13 +337,13 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) {
_, err = ha.PutSourceCfg(etcdCli, sourceCfg)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)

_, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
}

Expand Down Expand Up @@ -413,19 +413,19 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
}()
// step 4.1: should stop the running worker, source bound has been deleted, should stop this worker
c.Assert(utils.WaitSomething(20, 100*time.Millisecond, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
// step 4.2: put a new source bound, source should be started
_, err = ha.PutSourceBound(etcdCli, sourceBound)
c.Assert(err, IsNil)
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 := s.getWorker(true).cfg
cfg2 := s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel1()
wg.Wait()
c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil)
c.Assert(s.stopSourceWorker(sourceCfg.SourceID, true, true), IsNil)
// step 5: start observeSourceBound from compacted revision again, should start worker
ctx2, cancel2 := context.WithCancel(ctx)
wg.Add(1)
Expand All @@ -434,9 +434,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
c.Assert(s.observeSourceBound(ctx2, startRev), IsNil)
}()
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
return s.getWorker(true) != nil
return s.getSourceWorker(true) != nil
}), IsTrue)
cfg2 = s.getWorker(true).cfg
cfg2 = s.getSourceWorker(true).cfg
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel2()
wg.Wait()
Expand Down Expand Up @@ -480,13 +480,13 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be started and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
w := s.getWorker(true)
w := s.getSourceWorker(true)
return w != nil && !w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
} else {
// worker should be started before stopped
w := s.getWorker(true)
w := s.getSourceWorker(true)
c.Assert(w, NotNil)
c.Assert(w.closed.Load(), IsFalse)
_, err := ha.DeleteRelayConfig(s.etcdClient, w.name)
Expand All @@ -495,7 +495,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool)
c.Assert(err, IsNil)
// worker should be closed and without error
c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool {
currentWorker := s.getWorker(true)
currentWorker := s.getSourceWorker(true)
return currentWorker == nil && w.closed.Load()
}), IsTrue)
c.Assert(s.getSourceStatus(true).Result, IsNil)
Expand All @@ -506,7 +506,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, etcd *embed.Etcd, d
etcd.Close()
time.Sleep(6 * time.Second)
// When worker server fail to keepalive with etcd, server should close its worker
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceWorker(true), IsNil)
c.Assert(s.getSourceStatus(true).Result, IsNil)
ETCD, err := createMockETCD(dir, "http://"+hostName)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -551,9 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) {
func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) {
etcd.Close()
c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool {
return s.getWorker(true) == nil
return s.getSourceWorker(true) == nil
}), IsTrue)
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.getSourceWorker(true), IsNil)
}

func (t *testServer) TestGetMinLocInAllSubTasks(c *C) {
Expand Down
10 changes: 7 additions & 3 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (w *SourceWorker) Start() {
}

// Close stops working and releases resources.
func (w *SourceWorker) Close() {
func (w *SourceWorker) Stop(graceful bool) {
if w.closed.Load() {
w.l.Warn("already closed")
return
Expand All @@ -218,8 +218,12 @@ func (w *SourceWorker) Close() {
w.Lock()
defer w.Unlock()

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
// close or kill all subtasks
if graceful {
w.subTaskHolder.closeAllSubTasks()
} else {
w.subTaskHolder.killAllSubTasks()
}

if w.relayHolder != nil {
w.relayHolder.Close()
Expand Down
Loading