Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unit(dm): add Kill func for unit #4035

Merged
merged 63 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
c7c58bd
change interface
Ehco1996 Dec 23, 2021
b2b8a77
rename
Ehco1996 Dec 23, 2021
273cdbb
adjust close
Ehco1996 Dec 23, 2021
c7f833c
graceful stop worker when lost ha
Ehco1996 Dec 23, 2021
bb57be6
no graceful end in syncer
Ehco1996 Dec 24, 2021
5acafed
save work
Ehco1996 Dec 29, 2021
0e68f24
fix comment
Ehco1996 Dec 30, 2021
6081bce
address comment and trigger test
Ehco1996 Jan 4, 2022
1ec3c56
rollback checkpoint before close schematracker
Ehco1996 Jan 12, 2022
63e87ca
use runctx to cancel dml worker
Ehco1996 Jan 12, 2022
5798b60
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 13, 2022
d743f1d
address comment
Ehco1996 Jan 14, 2022
9b9a558
address some comments
Ehco1996 Jan 17, 2022
5d88e4b
savework: refine syncer ctx
Ehco1996 Jan 17, 2022
5ca9276
refine syncer.RUN
Ehco1996 Jan 19, 2022
b63daa7
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 19, 2022
1302a75
revert test
Ehco1996 Jan 19, 2022
d792a12
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 19, 2022
a813189
make test steable
Ehco1996 Jan 19, 2022
922faa0
fix ha test lib
Ehco1996 Jan 20, 2022
9ffaf25
fix lazy_init_tracker
Ehco1996 Jan 20, 2022
9c2ea48
fix clean up data
Ehco1996 Jan 20, 2022
937cdcc
fix event ctx
Ehco1996 Jan 20, 2022
57ba616
fix syncer.done
Ehco1996 Jan 20, 2022
adb7901
fix kill
Ehco1996 Jan 20, 2022
a5acf28
addresss comment
Ehco1996 Jan 20, 2022
d9f42f6
add log for debug
Ehco1996 Jan 20, 2022
31727e0
fix test case
Ehco1996 Jan 20, 2022
1c59142
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 21, 2022
22a7e72
fix comments
Ehco1996 Jan 21, 2022
e01e2ea
rename func about sourceWorker
Ehco1996 Jan 21, 2022
776a2a2
Merge branch 'master' into graceful-close-unit
Ehco1996 Jan 21, 2022
06d4363
fix comments
Ehco1996 Jan 21, 2022
7a39654
merge one ctx
Ehco1996 Jan 25, 2022
a73aa0b
rename ctx
Ehco1996 Jan 25, 2022
705c228
address comments
Ehco1996 Jan 26, 2022
e881437
fix shell && revert some logic of stop worker
Ehco1996 Jan 28, 2022
a06c5e8
add ui for waitBeforeRunExit
Ehco1996 Jan 28, 2022
1e51bec
address comment
Ehco1996 Feb 8, 2022
63a235e
fix lint
Ehco1996 Feb 8, 2022
fb5f21f
ungraceful exit the dml pipeline
Ehco1996 Feb 8, 2022
f612399
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 8, 2022
54f1edc
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 8, 2022
b851b62
fix merge master
Ehco1996 Feb 8, 2022
6e9d1ad
fix datarace
Ehco1996 Feb 8, 2022
7aca258
add syncer ctx to sync pipeline
Ehco1996 Feb 8, 2022
0f5e220
fix ut panic
Ehco1996 Feb 8, 2022
60fc3c1
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 10, 2022
cd7903c
address comments
Ehco1996 Feb 10, 2022
05ca26b
revert ungraceful close job pipeline
Ehco1996 Feb 11, 2022
9204bbf
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 11, 2022
4fcad7d
fix datarace
Ehco1996 Feb 11, 2022
ddc3ea5
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 13, 2022
c7c1e82
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 14, 2022
d6484a6
move init checkpoint worker to init && revert a kill
Ehco1996 Feb 14, 2022
270925c
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 14, 2022
76180db
fix check graceful stop
Ehco1996 Feb 14, 2022
b15a9b2
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 15, 2022
4809d42
address comments
Ehco1996 Feb 15, 2022
f7df28a
remove sleep
Ehco1996 Feb 16, 2022
bdf8ac8
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 16, 2022
603bf19
Merge branch 'master' into graceful-close-unit
Ehco1996 Feb 16, 2022
311e86f
Merge branch 'master' into graceful-close-unit
ti-chi-bot Feb 16, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/cmd/dm-syncer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func main() {
}

// 5. close the syncer
sync.Close()
sync.Close(true)
log.L().Info("dm-syncer exit")

// 6. flush log
Expand Down
3 changes: 2 additions & 1 deletion dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ type Unit interface {
Process(ctx context.Context, pr chan pb.ProcessResult)
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// if graceful is true, unit is allowed to wait for its internal logic to complete, otherwise it should be closed immediately
Close(graceful bool)
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
Expand Down
3 changes: 2 additions & 1 deletion dm/dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() {
failpoint.Label("bypass")

// TODO: report the error.
err := s.stopWorker("", true)
// when lost keepalive, stop the worker with not graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737
err := s.stopWorker("", true, false)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return // return if failed to stop the worker.
Expand Down
14 changes: 7 additions & 7 deletions dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand Down Expand Up @@ -414,7 +414,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error {
}
return nil
}
err = s.stopWorker("", false)
err = s.stopWorker("", false, true)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return err // return if failed to stop the worker.
Expand Down Expand Up @@ -452,7 +452,7 @@ func (s *Server) doClose() {
}
// close worker and wait for return
if w := s.getWorker(false); w != nil {
w.Close()
w.Close(true)
}
s.closed.Store(true)
}
Expand Down Expand Up @@ -515,7 +515,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) {

// if sourceID is set to "", worker will be closed directly
// if sourceID is not "", we will check sourceID with w.cfg.SourceID.
func (s *Server) stopWorker(sourceID string, needLock bool) error {
func (s *Server) stopWorker(sourceID string, needLock, graceful bool) error {
if needLock {
s.Lock()
defer s.Unlock()
Expand All @@ -531,7 +531,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error {
s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL)
s.setWorker(nil, false)
s.setSourceStatus("", nil, false)
w.Close()
w.Close(graceful)
return nil
}

Expand Down Expand Up @@ -676,7 +676,7 @@ func (s *Server) disableHandleSubtasks(source string) error {
var err error
if !w.relayEnabled.Load() {
log.L().Info("relay is not enabled after disabling subtask, so stop worker")
err = s.stopWorker(source, false)
err = s.stopWorker(source, false, true)
}
return err
}
Expand Down Expand Up @@ -731,7 +731,7 @@ func (s *Server) disableRelay(source string) error {
var err error
if !w.subTaskEnabled.Load() {
log.L().Info("subtask is not enabled after disabling relay, so stop worker")
err = s.stopWorker(source, false)
err = s.stopWorker(source, false, true)
}
return err
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) {
c.Assert(cfg2, DeepEquals, sourceCfg)
cancel1()
wg.Wait()
c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil)
c.Assert(s.stopWorker(sourceCfg.SourceID, true, true), IsNil)
// step 5: start observeSourceBound from compacted revision again, should start worker
ctx2, cancel2 := context.WithCancel(ctx)
wg.Add(1)
Expand Down
12 changes: 6 additions & 6 deletions dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func NewSourceWorker(
if err != nil { // when err != nil, `w` will become nil in this func, so we pass `w` in defer.
// release resources, NOTE: we need to refactor New/Init/Start/Close for components later.
w2.cancel()
w2.subTaskHolder.closeAllSubTasks()
w2.subTaskHolder.closeAllSubTasks(true)
}
}(w)

Expand Down Expand Up @@ -196,7 +196,7 @@ func (w *SourceWorker) Start() {
}

// Close stops working and releases resources.
func (w *SourceWorker) Close() {
func (w *SourceWorker) Close(graceful bool) {
if w.closed.Load() {
w.l.Warn("already closed")
return
Expand All @@ -212,7 +212,7 @@ func (w *SourceWorker) Close() {
defer w.Unlock()

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
w.subTaskHolder.closeAllSubTasks(graceful)

if w.relayHolder != nil {
w.relayHolder.Close()
Expand Down Expand Up @@ -464,7 +464,7 @@ func (w *SourceWorker) EnableHandleSubtasks() error {
// "for range" of a map will use same value address, so we'd better not pass value address to other function
clone := subTaskCfg
if err2 := w.StartSubTask(&clone, expectStage.Expect, false); err2 != nil {
w.subTaskHolder.closeAllSubTasks()
w.subTaskHolder.closeAllSubTasks(true)
return err2
}
}
Expand Down Expand Up @@ -497,7 +497,7 @@ func (w *SourceWorker) DisableHandleSubtasks() {
defer w.Unlock()

// close all sub tasks
w.subTaskHolder.closeAllSubTasks()
w.subTaskHolder.closeAllSubTasks(true)
w.l.Info("handling subtask enabled")
}

Expand Down Expand Up @@ -604,7 +604,7 @@ func (w *SourceWorker) OperateSubTask(name string, op pb.TaskOp) error {
switch op {
case pb.TaskOp_Stop:
w.l.Info("stop sub task", zap.String("task", name))
st.Close()
st.Close(true)
w.subTaskHolder.removeSubTask(name)
case pb.TaskOp_Pause:
w.l.Info("pause sub task", zap.String("task", name))
Expand Down
14 changes: 7 additions & 7 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ func (t *testServer) testWorker(c *C) {
c.Assert(w.GetUnitAndSourceStatusJSON("", nil), HasLen, emptyWorkerStatusInfoJSONLength)

// close twice
w.Close()
w.Close(true)
c.Assert(w.closed.Load(), IsTrue)
c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0)
w.Close()
w.Close(true)
c.Assert(w.closed.Load(), IsTrue)
c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0)
c.Assert(w.closed.Load(), IsTrue)
Expand Down Expand Up @@ -294,7 +294,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) {
// start worker
w, err := NewSourceWorker(sourceCfg, etcdCli, "", "")
c.Assert(err, IsNil)
defer w.Close()
defer w.Close(true)
go func() {
w.Start()
}()
Expand Down Expand Up @@ -467,7 +467,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer w.Close()
defer w.Close(true)
go func() {
w.Start()
}()
Expand Down Expand Up @@ -530,7 +530,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
c.Assert(status[0].Stage, Equals, pb.Stage_Running)
cancel1()
wg.Wait()
w.subTaskHolder.closeAllSubTasks()
w.subTaskHolder.closeAllSubTasks(true)
// step 5: restart observe and start from startRev, this subtask should be added
ctx2, cancel2 := context.WithCancel(ctx)
wg.Add(1)
Expand All @@ -548,7 +548,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
c.Assert(status, HasLen, 1)
c.Assert(status[0].Name, Equals, subtaskCfg.Name)
c.Assert(status[0].Stage, Equals, pb.Stage_Running)
w.Close()
w.Close(true)
cancel2()
wg.Wait()
}
Expand Down Expand Up @@ -586,7 +586,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
defer w.Close()
defer w.Close(true)
go func() {
c.Assert(w.EnableRelay(false), IsNil)
w.Start()
Expand Down
20 changes: 13 additions & 7 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (st *SubTask) initUnits(relay relay.Process) error {
var needCloseUnits []unit.Unit
defer func() {
for _, u := range needCloseUnits {
u.Close()
u.Close(true)
}

st.initialized.Store(initializeUnitSuccess)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (st *SubTask) fetchResultAndUpdateStage(pr chan pb.ProcessResult) {

switch stage {
case pb.Stage_Finished:
cu.Close()
cu.Close(true)
nu := st.getNextUnit()
if nu == nil {
// Now, when finished, it only stops the process
Expand Down Expand Up @@ -358,7 +358,13 @@ func (st *SubTask) PrevUnit() unit.Unit {
}

// closeUnits closes all un-closed units (current unit and all the subsequent units).
func (st *SubTask) closeUnits() {
func (st *SubTask) closeUnits(graceful bool) {
// when not graceful, we want to syncer to exit immediately, so we call u.Close(false) before call cancel
// note that we only implement ungraceful close for sync unit
if !graceful && st.CurrUnit().Type() == pb.UnitType_Sync {
st.l.Info("closing syncer without graceful", zap.String("task", st.cfg.Name))
st.CurrUnit().Close(false)
}
st.cancel()
st.resultWg.Wait()

Expand All @@ -379,8 +385,8 @@ func (st *SubTask) closeUnits() {

for i := cui; i < len(st.units); i++ {
u := st.units[i]
st.l.Info("closing unit process", zap.Stringer("unit", cu.Type()))
u.Close()
st.l.Info("closing unit process", zap.Stringer("unit", cu.Type()), zap.Bool("graceful", graceful))
u.Close(graceful)
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -477,14 +483,14 @@ func (st *SubTask) Result() *pb.ProcessResult {
}

// Close stops the sub task.
func (st *SubTask) Close() {
func (st *SubTask) Close(graceful bool) {
st.l.Info("closing")
if !st.setStageIfNotIn([]pb.Stage{pb.Stage_Stopped, pb.Stage_Stopping, pb.Stage_Finished}, pb.Stage_Stopping) {
st.l.Info("subTask is already closed, no need to close")
return
}

st.closeUnits() // close all un-closed units
st.closeUnits(graceful) // close all un-closed units
updateTaskMetric(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped, st.workerName)
}

Expand Down
6 changes: 3 additions & 3 deletions dm/dm/worker/subtask_holder.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (h *subTaskHolder) resetAllSubTasks(relay relay.Process) {
defer h.mu.Unlock()
for _, st := range h.subTasks {
stage := st.Stage()
st.Close()
st.Close(true)
// TODO: make a st.reset
st.ctx, st.cancel = context.WithCancel(context.Background())
st.cfg.UseRelay = relay != nil
Expand All @@ -64,11 +64,11 @@ func (h *subTaskHolder) resetAllSubTasks(relay relay.Process) {
}

// closeAllSubTasks closes all subtask instances.
func (h *subTaskHolder) closeAllSubTasks() {
func (h *subTaskHolder) closeAllSubTasks(graceful bool) {
h.mu.Lock()
defer h.mu.Unlock()
for _, st := range h.subTasks {
st.Close()
st.Close(graceful)
}
h.subTasks = make(map[string]*SubTask)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/subtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (m *MockUnit) Process(ctx context.Context, pr chan pb.ProcessResult) {
}
}

func (m *MockUnit) Close() {}
func (m *MockUnit) Close(graceful bool) {}

func (m MockUnit) Pause() {}

Expand Down Expand Up @@ -512,7 +512,7 @@ func (t *testSubTask) TestSubtaskFastQuit(c *C) {
return st.Stage() == pb.Stage_Running
}), IsTrue)
// test Close
st.Close()
st.Close(true)
select {
case <-time.After(500 * time.Millisecond):
c.Fatal("fail to stop subtask in 0.5s when stuck into unitTransWaitCondition")
Expand Down
2 changes: 1 addition & 1 deletion dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (m *Dumpling) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

// Close implements Unit.Close.
func (m *Dumpling) Close() {
func (m *Dumpling) Close(graceful bool) {
if m.closed.Load() {
return
}
Expand Down
2 changes: 1 addition & 1 deletion dm/dumpling/dumpling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (t *testDumplingSuite) TestCallStatus(c *C) {
dumpling, _ := export.NewDumper(ctx, dumpConf)
m.core = dumpling

m.Close()
m.Close(true)
s = m.Status(nil).(*pb.DumpStatus)
c.Assert(s.CompletedTables, Equals, float64(0))
c.Assert(s.FinishedBytes, Equals, float64(0))
Expand Down
2 changes: 1 addition & 1 deletion dm/loader/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (l *LightningLoader) IsFreshTask(ctx context.Context) (bool, error) {
}

// Close does graceful shutdown.
func (l *LightningLoader) Close() {
func (l *LightningLoader) Close(graceful bool) {
l.Pause()
l.checkPointList.Close()
l.closed.Store(true)
Expand Down
2 changes: 1 addition & 1 deletion dm/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ func (l *Loader) loadFinishedSize() {
}

// Close does graceful shutdown.
func (l *Loader) Close() {
func (l *Loader) Close(graceful bool) {
l.Lock()
defer l.Unlock()
if l.isClosed() {
Expand Down
Loading