diff --git a/dm/dm/unit/unit.go b/dm/dm/unit/unit.go index 4451a652e86..363d17bf2cc 100644 --- a/dm/dm/unit/unit.go +++ b/dm/dm/unit/unit.go @@ -48,6 +48,8 @@ type Unit interface { // Close shuts down the process and closes the unit, after that can not call Process to resume // The implementation should not block for a long time. Close() + // Kill shuts down the process and closes the unit without graceful. + Kill() // Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned. // The implementation should not block for a long time. Pause() diff --git a/dm/dm/worker/hub_test.go b/dm/dm/worker/hub_test.go index d76697b1e6a..ca20fd437c3 100644 --- a/dm/dm/worker/hub_test.go +++ b/dm/dm/worker/hub_test.go @@ -20,5 +20,5 @@ import ( func (t *testServer) testConidtionHub(c *C, s *Server) { // test condition hub c.Assert(GetConditionHub(), NotNil) - c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true)) + c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true)) } diff --git a/dm/dm/worker/join.go b/dm/dm/worker/join.go index ece56ec1a4d..a50c47d44fe 100644 --- a/dm/dm/worker/join.go +++ b/dm/dm/worker/join.go @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() { failpoint.Label("bypass") // TODO: report the error. - err := s.stopWorker("", true) + // when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737 + err := s.stopSourceWorker("", true, false) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return // return if failed to stop the worker. diff --git a/dm/dm/worker/server.go b/dm/dm/worker/server.go index 3ce68d01b18..6cfcbd481bc 100644 --- a/dm/dm/worker/server.go +++ b/dm/dm/worker/server.go @@ -300,7 +300,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { } rev = rev1 if relaySource == nil { - if w := s.getWorker(true); w != nil && w.startedRelayBySourceCfg { + if w := s.getSourceWorker(true); w != nil && w.startedRelayBySourceCfg { break } log.L().Info("didn't found relay config after etcd retryable error. Will stop relay now") @@ -314,7 +314,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { s.Lock() defer s.Unlock() - if w := s.getWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID { + if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID { // we may face both relay config and subtask bound changed in a compaction error, so here // we check if observeSourceBound has started a worker // TODO: add a test for this situation @@ -325,7 +325,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { } return nil } - err = s.stopWorker("", false) + err = s.stopSourceWorker("", false, true) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return err // return if failed to stop the worker. @@ -403,7 +403,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { s.Lock() defer s.Unlock() - if w := s.getWorker(false); w != nil && w.cfg.SourceID == bound.Source { + if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == bound.Source { // we may face both relay config and subtask bound changed in a compaction error, so here // we check if observeRelayConfig has started a worker // TODO: add a test for this situation @@ -414,7 +414,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } return nil } - err = s.stopWorker("", false) + err = s.stopSourceWorker("", false, true) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return err // return if failed to stop the worker. @@ -441,30 +441,33 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } func (s *Server) doClose() { - s.cancel() - // close server in advance, stop receiving source bound and relay bound - s.wg.Wait() - s.Lock() defer s.Unlock() + if s.closed.Load() { return } - // close worker and wait for return - if w := s.getWorker(false); w != nil { - w.Close() + // stop server in advance, stop receiving source bound and relay bound + s.cancel() + s.wg.Wait() + // stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker) + if w := s.getSourceWorker(false); w != nil { + w.Stop(true) } s.closed.Store(true) } // Close close the RPC server, this function can be called multiple times. func (s *Server) Close() { + s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing s.stopKeepAlive() - s.doClose() + if s.etcdClient != nil { + s.etcdClient.Close() + } } // if needLock is false, we should make sure Server has been locked in caller. -func (s *Server) getWorker(needLock bool) *SourceWorker { +func (s *Server) getSourceWorker(needLock bool) *SourceWorker { if needLock { s.Lock() defer s.Unlock() @@ -497,7 +500,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) { defer s.Unlock() } // now setSourceStatus will be concurrently called. skip setting a source status if worker has been closed - if s.getWorker(false) == nil && source != "" { + if s.getSourceWorker(false) == nil && source != "" { return } s.sourceStatus = pb.SourceStatus{ @@ -515,12 +518,12 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) { // if sourceID is set to "", worker will be closed directly // if sourceID is not "", we will check sourceID with w.cfg.SourceID. -func (s *Server) stopWorker(sourceID string, needLock bool) error { +func (s *Server) stopSourceWorker(sourceID string, needLock, graceful bool) error { if needLock { s.Lock() defer s.Unlock() } - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has not been started, no need to stop", zap.String("source", sourceID)) return nil // no need to stop because not started yet @@ -531,7 +534,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error { s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL) s.setWorker(nil, false) s.setSourceStatus("", nil, false) - w.Close() + w.Stop(graceful) return nil } @@ -659,7 +662,7 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b func (s *Server) disableHandleSubtasks(source string) error { s.Lock() defer s.Unlock() - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source)) return nil @@ -676,7 +679,7 @@ func (s *Server) disableHandleSubtasks(source string) error { var err error if !w.relayEnabled.Load() { log.L().Info("relay is not enabled after disabling subtask, so stop worker") - err = s.stopWorker(source, false) + err = s.stopSourceWorker(source, false, true) } return err } @@ -721,7 +724,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro func (s *Server) disableRelay(source string) error { s.Lock() defer s.Unlock() - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has already stopped before DisableRelay", zap.Any("relaySource", source)) return nil @@ -731,7 +734,7 @@ func (s *Server) disableRelay(source string) error { var err error if !w.subTaskEnabled.Load() { log.L().Info("subtask is not enabled after disabling relay, so stop worker") - err = s.stopWorker(source, false) + err = s.stopSourceWorker(source, false, true) } return err } @@ -747,7 +750,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* SourceStatus: &sourceStatus, } - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker") resp.Result = false @@ -769,7 +772,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* // PurgeRelay implements WorkerServer.PurgeRelay. func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil @@ -786,7 +789,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil @@ -813,7 +816,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou defer s.Unlock() } - if w := s.getWorker(false); w != nil { + if w := s.getSourceWorker(false); w != nil { if w.cfg.SourceID == cfg.SourceID { log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID)) return w, nil @@ -904,7 +907,7 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) ( func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "HandleError"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil diff --git a/dm/dm/worker/server_test.go b/dm/dm/worker/server_test.go index 6220ef3f45c..984240f3846 100644 --- a/dm/dm/worker/server_test.go +++ b/dm/dm/worker/server_test.go @@ -208,7 +208,7 @@ func (t *testServer) TestServer(c *C) { _, err = ha.DeleteSubTaskStage(s.etcdClient, ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil + return s.getSourceWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil }), IsTrue) dupServer := NewServer(cfg) @@ -337,13 +337,13 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) { _, err = ha.PutSourceCfg(etcdCli, sourceCfg) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) _, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) == nil + return s.getSourceWorker(true) == nil }), IsTrue) } @@ -413,19 +413,19 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { }() // step 4.1: should stop the running worker, source bound has been deleted, should stop this worker c.Assert(utils.WaitSomething(20, 100*time.Millisecond, func() bool { - return s.getWorker(true) == nil + return s.getSourceWorker(true) == nil }), IsTrue) // step 4.2: put a new source bound, source should be started _, err = ha.PutSourceBound(etcdCli, sourceBound) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) - cfg2 := s.getWorker(true).cfg + cfg2 := s.getSourceWorker(true).cfg c.Assert(cfg2, DeepEquals, sourceCfg) cancel1() wg.Wait() - c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil) + c.Assert(s.stopSourceWorker(sourceCfg.SourceID, true, true), IsNil) // step 5: start observeSourceBound from compacted revision again, should start worker ctx2, cancel2 := context.WithCancel(ctx) wg.Add(1) @@ -434,9 +434,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { c.Assert(s.observeSourceBound(ctx2, startRev), IsNil) }() c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) - cfg2 = s.getWorker(true).cfg + cfg2 = s.getSourceWorker(true).cfg c.Assert(cfg2, DeepEquals, sourceCfg) cancel2() wg.Wait() @@ -480,13 +480,13 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) c.Assert(err, IsNil) // worker should be started and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - w := s.getWorker(true) + w := s.getSourceWorker(true) return w != nil && !w.closed.Load() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) } else { // worker should be started before stopped - w := s.getWorker(true) + w := s.getSourceWorker(true) c.Assert(w, NotNil) c.Assert(w.closed.Load(), IsFalse) _, err := ha.DeleteRelayConfig(s.etcdClient, w.name) @@ -495,7 +495,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) c.Assert(err, IsNil) // worker should be closed and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - currentWorker := s.getWorker(true) + currentWorker := s.getSourceWorker(true) return currentWorker == nil && w.closed.Load() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) @@ -506,7 +506,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, etcd *embed.Etcd, d etcd.Close() time.Sleep(6 * time.Second) // When worker server fail to keepalive with etcd, server should close its worker - c.Assert(s.getWorker(true), IsNil) + c.Assert(s.getSourceWorker(true), IsNil) c.Assert(s.getSourceStatus(true).Result, IsNil) ETCD, err := createMockETCD(dir, "http://"+hostName) c.Assert(err, IsNil) @@ -551,9 +551,9 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) { func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) { etcd.Close() c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool { - return s.getWorker(true) == nil + return s.getSourceWorker(true) == nil }), IsTrue) - c.Assert(s.getWorker(true), IsNil) + c.Assert(s.getSourceWorker(true), IsNil) } func (t *testServer) TestGetMinLocInAllSubTasks(c *C) { diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index 851a3245317..5c85300590f 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -203,7 +203,7 @@ func (w *SourceWorker) Start() { } // Close stops working and releases resources. -func (w *SourceWorker) Close() { +func (w *SourceWorker) Stop(graceful bool) { if w.closed.Load() { w.l.Warn("already closed") return @@ -218,8 +218,12 @@ func (w *SourceWorker) Close() { w.Lock() defer w.Unlock() - // close all sub tasks - w.subTaskHolder.closeAllSubTasks() + // close or kill all subtasks + if graceful { + w.subTaskHolder.closeAllSubTasks() + } else { + w.subTaskHolder.killAllSubTasks() + } if w.relayHolder != nil { w.relayHolder.Close() diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 1ae4eaa476a..a9ec7be9691 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -89,11 +89,11 @@ func (t *testServer) testWorker(c *C) { c.Assert(err, IsNil) c.Assert(w.GetUnitAndSourceStatusJSON("", nil), HasLen, emptyWorkerStatusInfoJSONLength) - // close twice - w.Close() + // stop twice + w.Stop(true) c.Assert(w.closed.Load(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) - w.Close() + w.Stop(true) c.Assert(w.closed.Load(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) c.Assert(w.closed.Load(), IsTrue) @@ -198,11 +198,11 @@ func (t *testServer2) TestTaskAutoResume(c *C) { c.Assert(err, IsNil) subtaskCfg.Mode = "full" subtaskCfg.Timezone = "UTC" - c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running, pb.Stage_Stopped, true), IsNil) + c.Assert(s.getSourceWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running, pb.Stage_Stopped, true), IsNil) // check task in paused state c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { - subtaskStatus, _, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) + subtaskStatus, _, _ := s.getSourceWorker(true).QueryStatus(context.Background(), taskName) for _, st := range subtaskStatus { if st.Name == taskName && st.Stage == pb.Stage_Paused { return true @@ -213,7 +213,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { //nolint:errcheck failpoint.Disable("github.com/pingcap/tiflow/dm/dumpling/dumpUnitProcessWithError") - rtsc, ok := s.getWorker(true).taskStatusChecker.(*realTaskStatusChecker) + rtsc, ok := s.getSourceWorker(true).taskStatusChecker.(*realTaskStatusChecker) c.Assert(ok, IsTrue) defer func() { // close multiple time @@ -223,7 +223,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { // check task will be auto resumed c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { - sts, _, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) + sts, _, _ := s.getSourceWorker(true).QueryStatus(context.Background(), taskName) for _, st := range sts { if st.Name == taskName && st.Stage == pb.Stage_Running { return true @@ -295,7 +295,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { // start worker w, err := NewSourceWorker(sourceCfg, etcdCli, "", "") c.Assert(err, IsNil) - defer w.Close() + defer w.Stop(true) go func() { w.Start() }() @@ -467,7 +467,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - defer w.Close() + defer w.Stop(true) go func() { w.Start() }() @@ -546,7 +546,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(status, HasLen, 1) c.Assert(status[0].Name, Equals, subtaskCfg.Name) c.Assert(status[0].Stage, Equals, pb.Stage_Running) - w.Close() + w.Stop(true) cancel2() wg.Wait() } @@ -585,7 +585,7 @@ func (t *testWorkerEtcdCompact) TestWatchValidatorStageEtcdCompact(c *C) { c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - defer w.Close() + defer w.Stop(true) go func() { w.Start() }() @@ -706,7 +706,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - defer w.Close() + defer w.Stop(true) go func() { c.Assert(w.EnableRelay(false), IsNil) w.Start() diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index 84621226eb5..772723ff3f9 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -428,6 +428,16 @@ func (st *SubTask) closeUnits() { u := st.units[i] st.l.Info("closing unit process", zap.Stringer("unit", cu.Type())) u.Close() + st.l.Info("closing unit done", zap.Stringer("unit", cu.Type())) + } +} + +func (st *SubTask) killCurrentUnit() { + if st.CurrUnit() != nil { + ut := st.CurrUnit().Type() + st.l.Info("kill unit", zap.String("task", st.cfg.Name), zap.Stringer("unit", ut)) + st.CurrUnit().Kill() + st.l.Info("kill unit done", zap.String("task", st.cfg.Name), zap.Stringer("unit", ut)) } } @@ -530,7 +540,18 @@ func (st *SubTask) Close() { st.l.Info("subTask is already closed, no need to close") return } + st.closeUnits() // close all un-closed units + updateTaskMetric(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped, st.workerName) +} +// Kill kill running unit and stop the sub task. +func (st *SubTask) Kill() { + st.l.Info("killing") + if !st.setStageIfNotIn([]pb.Stage{pb.Stage_Stopped, pb.Stage_Stopping, pb.Stage_Finished}, pb.Stage_Stopping) { + st.l.Info("subTask is already closed, no need to close") + return + } + st.killCurrentUnit() st.closeUnits() // close all un-closed units cfg := st.getCfg() diff --git a/dm/dm/worker/subtask_holder.go b/dm/dm/worker/subtask_holder.go index 27f512d62ed..a723bf5b41c 100644 --- a/dm/dm/worker/subtask_holder.go +++ b/dm/dm/worker/subtask_holder.go @@ -73,6 +73,16 @@ func (h *subTaskHolder) closeAllSubTasks() { h.subTasks = make(map[string]*SubTask) } +// killAllSubTasks kill and stop all subtask instances. +func (h *subTaskHolder) killAllSubTasks() { + h.mu.Lock() + defer h.mu.Unlock() + for _, st := range h.subTasks { + st.Kill() + } + h.subTasks = make(map[string]*SubTask) +} + // findSubTask finds subtask instance by name. func (h *subTaskHolder) findSubTask(name string) *SubTask { h.mu.RLock() diff --git a/dm/dm/worker/subtask_test.go b/dm/dm/worker/subtask_test.go index 7e4d142dd43..8fd1b6eb166 100644 --- a/dm/dm/worker/subtask_test.go +++ b/dm/dm/worker/subtask_test.go @@ -118,6 +118,8 @@ func (m *MockUnit) Process(ctx context.Context, pr chan pb.ProcessResult) { func (m *MockUnit) Close() {} +func (m *MockUnit) Kill() {} + func (m MockUnit) Pause() {} func (m *MockUnit) Resume(ctx context.Context, pr chan pb.ProcessResult) { m.Process(ctx, pr) } diff --git a/dm/dumpling/dumpling.go b/dm/dumpling/dumpling.go index 51fb5d7bd88..b50f9139b39 100644 --- a/dm/dumpling/dumpling.go +++ b/dm/dumpling/dumpling.go @@ -175,6 +175,12 @@ func (m *Dumpling) Close() { m.closed.Store(true) } +// Kill implements Unit.Kill. +func (m *Dumpling) Kill() { + // TODO: implement kill + m.Close() +} + // Pause implements Unit.Pause. func (m *Dumpling) Pause() { if m.closed.Load() { diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index 08cc1bc5df1..3ed0004ec3d 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -334,6 +334,12 @@ func (l *LightningLoader) Close() { l.closed.Store(true) } +// Kill does ungraceful shutdown. +func (l *LightningLoader) Kill() { + // TODO: implement kill + l.Close() +} + // Pause pauses the process, and it can be resumed later // should cancel context from external. func (l *LightningLoader) Pause() { diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 3ecc6492ce8..ebbc5416b53 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -804,6 +804,12 @@ func (l *Loader) Close() { l.closed.Store(true) } +// Kill kill the loader without graceful. +func (l *Loader) Kill() { + // TODO: implement kill + l.Close() +} + // stopLoad stops loading, now it used by Close and Pause // maybe we can refine the workflow more clear. func (l *Loader) stopLoad() { diff --git a/dm/syncer/dml_worker.go b/dm/syncer/dml_worker.go index 016a418969a..381cd55b19b 100644 --- a/dm/syncer/dml_worker.go +++ b/dm/syncer/dml_worker.go @@ -36,7 +36,7 @@ type DMLWorker struct { chanSize int multipleRows bool toDBConns []*dbconn.DBConn - tctx *tcontext.Context + syncCtx *tcontext.Context logger log.Logger // for metrics @@ -75,7 +75,7 @@ func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { fatalFunc: syncer.fatalFunc, lagFunc: syncer.updateReplicationJobTS, updateJobMetricsFunc: syncer.updateJobMetrics, - tctx: syncer.tctx, + syncCtx: syncer.syncCtx, // this ctx can be used to cancel all the workers toDBConns: syncer.toDBConns, inCh: inCh, flushCh: make(chan *job), @@ -112,7 +112,6 @@ func (w *DMLWorker) run() { for i := 0; i < w.workerCount; i++ { queueBucketMapping[i] = queueBucketName(i) } - for j := range w.inCh { metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) switch j.tp { @@ -244,7 +243,7 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { time.Sleep(time.Duration(t) * time.Second) }) // use background context to execute sqls as much as possible - ctx, cancel := w.tctx.WithTimeout(maxDMLExecutionDuration) + ctx, cancel := w.syncCtx.WithTimeout(maxDMLExecutionDuration) defer cancel() affect, err = db.ExecuteSQL(ctx, queries, args...) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 41291504414..de05b2132a8 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -121,7 +121,17 @@ const ( type Syncer struct { sync.RWMutex - tctx *tcontext.Context + tctx *tcontext.Context // this ctx only used for logger. + + // this ctx derives from a background ctx and was initialized in s.Run, it is used for some background tasks in s.Run + // when this ctx cancelled, syncer will shutdown all background running jobs (except the syncDML and syncDDL) and not wait transaction end. + runCtx *tcontext.Context + runCancel context.CancelFunc + // this ctx only used for syncDML and syncDDL and only cancelled when ungraceful stop. + syncCtx *tcontext.Context + syncCancel context.CancelFunc + // control all goroutines that started in S.Run + runWg sync.WaitGroup cfg *config.SubTaskConfig syncCfg replication.BinlogSyncerConfig @@ -135,7 +145,6 @@ type Syncer struct { binlogType BinlogType streamerController *StreamerController - wg sync.WaitGroup // counts goroutines jobWg sync.WaitGroup // counts ddl/flush/asyncFlush job in-flight in s.dmlJobCh and s.ddlJobCh schemaTracker *schema.Tracker @@ -188,8 +197,6 @@ type Syncer struct { filteredUpdate atomic.Int64 filteredDelete atomic.Int64 - done chan struct{} - checkpoint CheckPoint checkpointFlushWorker *checkpointFlushWorker onlineDDL onlineddl.OnlinePlugin @@ -253,7 +260,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel syncer.binlogSizeCount.Store(0) syncer.lastCount.Store(0) syncer.count.Store(0) - syncer.done = nil syncer.handleJobFunc = syncer.handleJob syncer.cli = etcdClient @@ -279,11 +285,9 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel } func (s *Syncer) newJobChans() { - s.closeJobChans() chanSize := calculateChanSize(s.cfg.QueueSize, s.cfg.WorkerCount, s.cfg.Compact) s.dmlJobCh = make(chan *job, chanSize) s.ddlJobCh = make(chan *job, s.cfg.QueueSize) - s.checkpointFlushWorker.input = make(chan *checkpointFlushTask, 16) s.jobsClosed.Store(false) } @@ -295,7 +299,6 @@ func (s *Syncer) closeJobChans() { } close(s.dmlJobCh) close(s.ddlJobCh) - close(s.checkpointFlushWorker.input) s.jobsClosed.Store(true) } @@ -431,13 +434,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } } } - s.checkpointFlushWorker = &checkpointFlushWorker{ - input: nil, // will be created in s.reset() - cp: s.checkpoint, - execError: &s.execError, - afterFlushFn: s.afterFlushCheckpoint, - updateJobMetricsFn: s.updateJobMetrics, - } // when Init syncer, set active relay log info if s.cfg.Meta == nil || s.cfg.Meta.BinLogName != binlog.FakeBinlogName { @@ -447,7 +443,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) { } rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) } - s.reset() return nil } @@ -565,6 +560,13 @@ func (s *Syncer) reset() { } // create new job chans s.newJobChans() + s.checkpointFlushWorker = &checkpointFlushWorker{ + input: make(chan *checkpointFlushTask, 16), + cp: s.checkpoint, + execError: &s.execError, + afterFlushFn: s.afterFlushCheckpoint, + updateJobMetricsFn: s.updateJobMetrics, + } s.execError.Store(nil) s.setErrLocation(nil, nil, false) @@ -572,7 +574,6 @@ func (s *Syncer) reset() { s.waitXIDJob.Store(int64(noWait)) s.isTransactionEnd = true s.flushSeq = 0 - switch s.cfg.ShardMode { case config.ShardPessimistic: // every time start to re-sync from resume, we reset status to make it like a fresh syncing @@ -639,7 +640,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { s.Unlock() return } - s.done = make(chan struct{}) s.Unlock() runFatalChan := make(chan *pb.ProcessError, s.cfg.WorkerCount+1) @@ -678,8 +678,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { // cancel goroutines created in s.Run cancel() } - s.closeJobChans() // Run returned, all jobs sent, we can close s.jobs - s.wg.Wait() // wait for sync goroutine to return close(runFatalChan) // Run returned, all potential fatal sent to s.runFatalChan wg.Wait() // wait for receive all fatal from s.runFatalChan @@ -701,14 +699,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } - // try to rollback checkpoints, if they already flushed, no effect - prePos := s.checkpoint.GlobalPoint() - s.checkpoint.Rollback(s.schemaTracker) - currPos := s.checkpoint.GlobalPoint() - if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { - s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) - } - pr <- pb.ProcessResult{ IsCanceled: isCanceled, Errors: errs, @@ -978,7 +968,7 @@ func (s *Syncer) addJob(job *job) { s.dmlJobCh <- job failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() { s.tctx.L().Info("receive dml job", zap.Any("dml job", job)) - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) }) case gc: s.dmlJobCh <- job @@ -1159,7 +1149,7 @@ func (s *Syncer) flushCheckPoints() error { snapshotInfo, exceptTables, shardMetaSQLs, shardMetaArgs := s.createCheckpointSnapshot(true) if snapshotInfo == nil { - log.L().Info("checkpoint has no change, skip sync flush checkpoint") + s.tctx.L().Info("checkpoint has no change, skip sync flush checkpoint") return nil } @@ -1195,7 +1185,7 @@ func (s *Syncer) flushCheckPointsAsync(asyncFlushJob *job) { snapshotInfo, exceptTables, shardMetaSQLs, shardMetaArgs := s.createCheckpointSnapshot(false) if snapshotInfo == nil { - log.L().Info("checkpoint has no change, skip async flush checkpoint", zap.Int64("job seq", asyncFlushJob.flushSeq)) + s.tctx.L().Info("checkpoint has no change, skip async flush checkpoint", zap.Int64("job seq", asyncFlushJob.flushSeq)) return } @@ -1287,8 +1277,8 @@ func (s *Syncer) logAndClearFilteredStatistics() { } // DDL synced one by one, so we only need to process one DDL at a time. -func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { - defer s.wg.Done() +func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { + defer s.runWg.Done() var err error for { @@ -1312,12 +1302,12 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. shardPessimistOp = s.pessimist.PendingOperation() if shardPessimistOp != nil && !shardPessimistOp.Exec { ignore = true - tctx.L().Info("ignore shard DDLs in pessimistic shard mode", zap.Strings("ddls", ddlJob.ddls)) + s.tctx.L().Info("ignore shard DDLs in pessimistic shard mode", zap.Strings("ddls", ddlJob.ddls)) } case config.ShardOptimistic: if len(ddlJob.ddls) == 0 { ignore = true - tctx.L().Info("ignore shard DDLs in optimistic mode", zap.Stringer("info", s.optimist.PendingInfo())) + s.tctx.L().Info("ignore shard DDLs in optimistic mode", zap.Stringer("info", s.optimist.PendingInfo())) } } @@ -1329,9 +1319,9 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. if !ignore { var affected int - affected, err = db.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls) + affected, err = db.ExecuteSQLWithIgnore(s.syncCtx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls) if err != nil { - err = s.handleSpecialDDLError(tctx, err, ddlJob.ddls, affected, db) + err = s.handleSpecialDDLError(s.syncCtx, err, ddlJob.ddls, affected, db) err = terror.WithScope(err, terror.ScopeDownstream) } } @@ -1365,7 +1355,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. switch { case shardInfo == nil: // no need to do the shard DDL handle for `CREATE DATABASE/TABLE` now. - tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) + s.tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) case shardPessimistOp == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) default: @@ -1378,7 +1368,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. // no need to do the shard DDL handle for `DROP DATABASE/TABLE` now. // but for `CREATE DATABASE` and `ALTER DATABASE` we execute it to the downstream directly without `shardInfo`. if ignore { // actually ignored. - tctx.L().Warn("skip shard DDL handle in optimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) + s.tctx.L().Warn("skip shard DDL handle in optimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) } case s.optimist.PendingOperation() == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) @@ -1437,7 +1427,7 @@ func (s *Syncer) fatalFunc(job *job, err error) { // DML synced with causality. func (s *Syncer) syncDML() { - defer s.wg.Done() + defer s.runWg.Done() dmlJobCh := s.dmlJobCh if s.cfg.Compact { @@ -1451,47 +1441,109 @@ func (s *Syncer) syncDML() { } } -// Run starts running for sync, we should guarantee it can rerun when paused. -func (s *Syncer) Run(ctx context.Context) (err error) { - runCtx, runCancel := context.WithCancel(context.Background()) - defer runCancel() - tctx := s.tctx.WithContext(runCtx) +func (s *Syncer) waitBeforeRunExit(ctx context.Context) { + defer s.runWg.Done() + failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() { + s.tctx.L().Info("incr maxPauseOrStopWaitTime time ") + maxPauseOrStopWaitTime = time.Minute * 10 + }) - defer func() { - if s.done != nil { - close(s.done) + select { + case <-ctx.Done(): // hijack the root context from s.Run to wait for the transaction to end. + s.tctx.L().Info("received subtask's done, try graceful stop") + s.waitTransactionLock.Lock() + if s.isTransactionEnd { + s.waitXIDJob.Store(int64(waitComplete)) + s.waitTransactionLock.Unlock() + s.tctx.L().Info("the last job is transaction end, done directly") + s.runCancel() + return } - }() + s.waitXIDJob.Store(int64(waiting)) + s.waitTransactionLock.Unlock() + select { + case <-s.runCtx.Ctx.Done(): + s.tctx.L().Info("syncer run exit so runCtx done") + case <-time.After(maxPauseOrStopWaitTime): + // TODO: maxPauseOrStopWaitTime should also count the time of waiting waitTransactionLock + s.tctx.L().Info("wait transaction end timeout, exit now") + s.runCancel() + } + case <-s.runCtx.Ctx.Done(): // when no graceful stop, run ctx will canceled first. + s.tctx.L().Info("received ungraceful exit ctx, exit now") + } +} - go func() { - <-ctx.Done() +func (s *Syncer) updateTSOffsetCronJob(ctx context.Context) { + defer s.runWg.Done() + // temporarily hard code there. if this metrics works well add this to config file. + ticker := time.NewTicker(time.Minute * 10) + defer ticker.Stop() + for { select { - case <-runCtx.Done(): - default: - tctx.L().Info("received subtask's done") - - s.waitTransactionLock.Lock() - if s.isTransactionEnd { - s.waitXIDJob.Store(int64(waitComplete)) - tctx.L().Info("the last job is transaction end, done directly") - runCancel() - s.waitTransactionLock.Unlock() - return + case <-ticker.C: + if utErr := s.updateTSOffset(ctx); utErr != nil { + s.tctx.L().Error("get server unix ts err", zap.Error(utErr)) } - s.waitXIDJob.Store(int64(waiting)) - s.waitTransactionLock.Unlock() + case <-ctx.Done(): + return + } + } +} - select { - case <-runCtx.Done(): - tctx.L().Info("received syncer's done") - case <-time.After(maxPauseOrStopWaitTime): - tctx.L().Info("wait transaction end timeout") - runCancel() - } +func (s *Syncer) updateLagCronJob(ctx context.Context) { + defer s.runWg.Done() + // temporarily hard code there. if this metrics works well add this to config file. + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.updateReplicationLagMetric() + case <-ctx.Done(): + return } + } +} + +func (s *Syncer) updateTSOffset(ctx context.Context) error { + t1 := time.Now() + ts, tsErr := s.fromDB.GetServerUnixTS(ctx) + rtt := time.Since(t1).Seconds() + if tsErr == nil { + s.tsOffset.Store(time.Now().Unix() - ts - int64(rtt/2)) + } + return tsErr +} + +// Run starts running for sync, we should guarantee it can rerun when paused. +func (s *Syncer) Run(ctx context.Context) (err error) { + runCtx, runCancel := context.WithCancel(context.Background()) + s.runCtx, s.runCancel = tcontext.NewContext(runCtx, s.tctx.L()), runCancel + syncCtx, syncCancel := context.WithCancel(context.Background()) + s.syncCtx, s.syncCancel = tcontext.NewContext(syncCtx, s.tctx.L()), syncCancel + defer func() { + s.runCancel() + s.closeJobChans() + s.checkpointFlushWorker.Close() + s.runWg.Wait() + // s.syncCancel won't be called when normal exit, this call just to follow the best practice of use context. + s.syncCancel() + }() + + // we should start this goroutine as soon as possible, because it's the only goroutine that cancel syncer.Run + s.runWg.Add(1) + go func() { + s.waitBeforeRunExit(ctx) }() - fresh, err := s.IsFreshTask(runCtx) + // before sync run, we get the ts offset from upstream first + if utErr := s.updateTSOffset(ctx); utErr != nil { + return utErr + } + + // some initialization that can't be put in Syncer.Init + fresh, err := s.IsFreshTask(s.runCtx.Ctx) if err != nil { return err } @@ -1505,7 +1557,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.tctx.L().Error("failed to get task cli args", zap.Error(err)) } if s.cliArgs != nil && s.cliArgs.StartTime != "" { - err = s.setGlobalPointByTime(tctx, s.cliArgs.StartTime) + err = s.setGlobalPointByTime(s.runCtx, s.cliArgs.StartTime) if terror.ErrConfigStartTimeTooLate.Equal(err) { return err } @@ -1522,19 +1574,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } - // start flush checkpoints worker. - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.checkpointFlushWorker.Run(s.tctx) - }() - var ( flushCheckpoint bool delLoadTask bool cleanDumpFile = s.cfg.CleanDumpFile ) - flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) + flushCheckpoint, err = s.adjustGlobalPointGTID(s.runCtx) if err != nil { return err } @@ -1543,11 +1588,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { flushCheckpoint = true err = s.loadTableStructureFromDump(ctx) if err != nil { - tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) + s.tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) cleanDumpFile = false } if s.cfg.ShardMode == config.ShardOptimistic { - s.flushOptimisticTableInfos(tctx) + s.flushOptimisticTableInfos(s.runCtx) } } @@ -1555,62 +1600,44 @@ func (s *Syncer) Run(ctx context.Context) (err error) { cleanDumpFile = false } + s.runWg.Add(1) + go s.syncDML() + s.runWg.Add(1) + go func() { + defer s.runWg.Done() + // also need to use a different ctx. checkpointFlushWorker worker will be closed in the first defer + s.checkpointFlushWorker.Run(s.tctx) + }() + s.runWg.Add(1) + go s.syncDDL(adminQueueName, s.ddlDBConn, s.ddlJobCh) + s.runWg.Add(1) + go s.updateLagCronJob(s.runCtx.Ctx) + s.runWg.Add(1) + go s.updateTSOffsetCronJob(s.runCtx.Ctx) if flushCheckpoint { if err = s.flushCheckPoints(); err != nil { - tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) + s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) return err } } if delLoadTask { if err = s.delLoadTask(); err != nil { - tctx.L().Warn("error when del load task in etcd", zap.Error(err)) + s.tctx.L().Warn("error when del load task in etcd", zap.Error(err)) } } if cleanDumpFile { - tctx.L().Info("try to remove all dump files") + s.tctx.L().Info("try to remove all dump files") if err = os.RemoveAll(s.cfg.Dir); err != nil { - tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) + s.tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) } } failpoint.Inject("AdjustGTIDExit", func() { - tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) + s.tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) s.streamerController.Close() utils.OsExit(1) }) - updateTSOffset := func() error { - t1 := time.Now() - ts, tsErr := s.fromDB.GetServerUnixTS(runCtx) - rtt := time.Since(t1).Seconds() - if tsErr == nil { - s.tsOffset.Store(time.Now().Unix() - ts - int64(rtt/2)) - } - return tsErr - } - // before sync run, we get the tsoffset from upstream first - if utErr := updateTSOffset(); utErr != nil { - return utErr - } - // start background task to get/update current ts offset between dm and upstream - s.wg.Add(1) - go func() { - defer s.wg.Done() - // temporarily hard code there. if this metrics works well add this to config file. - updateTicker := time.NewTicker(time.Minute * 10) - defer updateTicker.Stop() - for { - select { - case <-updateTicker.C: - if utErr := updateTSOffset(); utErr != nil { - s.tctx.L().Error("get server unix ts err", zap.Error(utErr)) - } - case <-runCtx.Done(): - return - } - } - }() - // startLocation is the start location for current received event // currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql) // lastLocation is the end location for last received (ROTATE / QUERY / XID) event @@ -1624,37 +1651,15 @@ func (s *Syncer) Run(ctx context.Context) (err error) { currentGTID string ) - tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastLocation)) + s.tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastLocation)) if s.streamerController.IsClosed() { s.locations.reset(lastLocation) - err = s.streamerController.Start(tctx, lastLocation) + err = s.streamerController.Start(s.runCtx, lastLocation) if err != nil { return terror.Annotate(err, "fail to restart streamer controller") } } - - s.wg.Add(1) - go s.syncDML() - - s.wg.Add(1) - go s.syncDDL(tctx, adminQueueName, s.ddlDBConn, s.ddlJobCh) - - s.wg.Add(1) - go func() { - defer s.wg.Done() - updateLagTicker := time.NewTicker(time.Millisecond * 100) - defer updateLagTicker.Stop() - for { - select { - case <-updateLagTicker.C: - s.updateReplicationLagMetric() - case <-runCtx.Done(): - return - } - } - }() - // syncing progress with sharding DDL group // 1. use the global streamer to sync regular binlog events // 2. sharding DDL synced for some sharding groups @@ -1672,13 +1677,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { traceSource = fmt.Sprintf("%s.syncer.%s", s.cfg.SourceID, s.cfg.Name) ) + // this is second defer func in syncer.Run so in this time checkpointFlushWorker are still running defer func() { if err1 := recover(); err1 != nil { failpoint.Inject("ExitAfterSaveOnlineDDL", func() { - tctx.L().Info("force panic") + s.tctx.L().Info("force panic") panic("ExitAfterSaveOnlineDDL") }) - tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("stack")) + s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("stack")) err = terror.ErrSyncerUnitPanic.Generate(err1) } @@ -1695,13 +1701,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // flush all jobs before exit if err2 = s.flushJobs(); err2 != nil { - tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) + s.tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) } // if any execute error, flush safemode exit point if err2 = s.execError.Load(); err2 != nil && (terror.ErrDBExecuteFailed.Equal(err2) || terror.ErrDBUnExpect.Equal(err2)) { if err2 = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err2 != nil { - tctx.L().Warn("failed to flush safe mode checkpoints when exit task", zap.Error(err2)) + s.tctx.L().Warn("failed to flush safe mode checkpoints when exit task", zap.Error(err2)) } } }() @@ -1720,7 +1726,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // if we start syncer at an early position, database must bear a period of inconsistent state, // it's eventual consistency. s.safeMode = sm.NewSafeMode() - s.enableSafeModeInitializationPhase(tctx) + s.enableSafeModeInitializationPhase(s.runCtx) closeShardingResync := func() error { if shardingReSync == nil { @@ -1744,7 +1750,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.isReplacingOrInjectingErr = currentLocation.Suffix != 0 s.locations.reset(currentLocation) - err3 := s.streamerController.RedirectStreamer(tctx, currentLocation) + err3 := s.streamerController.RedirectStreamer(s.tctx, currentLocation) if err3 != nil { return err3 } @@ -1759,7 +1765,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } for i := 0; i < n; { - e, err1 := s.getEvent(tctx, currentLocation) + e, err1 := s.getEvent(s.runCtx, currentLocation) if err1 != nil { return err } @@ -1780,7 +1786,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { i++ } } - log.L().Info("discard event already consumed", zap.Int("count", n), + s.tctx.L().Info("discard event already consumed", zap.Int("count", n), zap.Any("cur_loc", currentLocation)) return nil } @@ -1810,13 +1816,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // if suffix>0, we are replacing error s.isReplacingOrInjectingErr = currentLocation.Suffix != 0 s.locations.reset(shardingReSync.currLocation) - err = s.streamerController.RedirectStreamer(tctx, shardingReSync.currLocation) + err = s.streamerController.RedirectStreamer(s.runCtx, shardingReSync.currLocation) if err != nil { return err } failpoint.Inject("ReSyncExit", func() { - tctx.L().Warn("exit triggered", zap.String("failpoint", "ReSyncExit")) + s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ReSyncExit")) utils.OsExit(1) }) } @@ -1824,7 +1830,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var e *replication.BinlogEvent startTime := time.Now() - e, err = s.getEvent(tctx, currentLocation) + e, err = s.getEvent(s.runCtx, currentLocation) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { if intVal, ok := val.(int); ok && intVal == 1 { @@ -1842,30 +1848,30 @@ func (s *Syncer) Run(ctx context.Context) (err error) { }) switch { case err == context.Canceled: - tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) + s.tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) return nil case err == context.DeadlineExceeded: - tctx.L().Info("deadline exceeded when fetching binlog event") + s.tctx.L().Info("deadline exceeded when fetching binlog event") continue case isDuplicateServerIDError(err): // if the server id is already used, need to use a new server id - tctx.L().Info("server id is already used by another slave, will change to a new server id and get event again") - err1 := s.streamerController.UpdateServerIDAndResetReplication(tctx, lastLocation) + s.tctx.L().Info("server id is already used by another slave, will change to a new server id and get event again") + err1 := s.streamerController.UpdateServerIDAndResetReplication(s.tctx, lastLocation) if err1 != nil { return err1 } continue case err == relay.ErrorMaybeDuplicateEvent: - tctx.L().Warn("read binlog met a truncated file, will skip events that has been consumed") + s.tctx.L().Warn("read binlog met a truncated file, will skip events that has been consumed") err = maybeSkipNRowsEvent(eventIndex) if err == nil { continue } - log.L().Warn("skip duplicate rows event failed", zap.Error(err)) + s.tctx.L().Warn("skip duplicate rows event failed", zap.Error(err)) } if err != nil { - tctx.L().Error("fail to fetch binlog", log.ShortError(err)) + s.tctx.L().Error("fail to fetch binlog", log.ShortError(err)) if isConnectionRefusedError(err) { return err @@ -1873,11 +1879,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if s.streamerController.CanRetry(err) { // GlobalPoint is the last finished transaction location - err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint()) + err = s.streamerController.ResetReplicationSyncer(s.tctx, s.checkpoint.GlobalPoint()) if err != nil { return err } - log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) + s.tctx.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) if err = maybeSkipNRowsEvent(eventIndex); err != nil { return err } @@ -1887,7 +1893,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // try to re-sync in gtid mode if tryReSync && s.cfg.EnableGTID && utils.IsErrBinlogPurged(err) && s.cfg.AutoFixGTID { time.Sleep(retryTimeout) - err = s.reSyncBinlog(*tctx, lastLocation) + err = s.reSyncBinlog(*s.runCtx, lastLocation) if err != nil { return err } @@ -1900,7 +1906,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { failpoint.Inject("IgnoreSomeTypeEvent", func(val failpoint.Value) { if e.Header.EventType.String() == val.(string) { - tctx.L().Debug("IgnoreSomeTypeEvent", zap.Reflect("event", e)) + s.tctx.L().Debug("IgnoreSomeTypeEvent", zap.Reflect("event", e)) failpoint.Continue() } }) @@ -1914,7 +1920,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { metrics.BinlogPosGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(e.Header.LogPos)) index, err := binlog.GetFilenameIndex(lastLocation.Position.Name) if err != nil { - tctx.L().Warn("fail to get index number of binlog file, may because only specify GTID and hasn't saved according binlog position", log.ShortError(err)) + s.tctx.L().Warn("fail to get index number of binlog file, may because only specify GTID and hasn't saved according binlog position", log.ShortError(err)) } else { metrics.BinlogFileGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) } @@ -1923,7 +1929,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { failpoint.Inject("ProcessBinlogSlowDown", nil) - tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) + s.tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) // support QueryEvent and RowsEvent // we calculate startLocation and endLocation(currentLocation) for Query event here @@ -1970,7 +1976,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } else if op == pb.ErrorOp_Skip { queryEvent := ev.(*replication.QueryEvent) ec := eventContext{ - tctx: tctx, + tctx: s.tctx, header: e.Header, startLocation: &startLocation, currentLocation: ¤tLocation, @@ -1979,7 +1985,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var sourceTbls map[string]map[string]struct{} sourceTbls, err = s.trackOriginDDL(queryEvent, ec) if err != nil { - tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", queryEvent.Query)) + s.tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", queryEvent.Query)) } s.saveGlobalPoint(currentLocation) @@ -1993,9 +1999,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } err = s.flushJobs() if err != nil { - tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) + s.tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) } else { - tctx.L().Info("flush jobs when handle-error skip") + s.tctx.L().Info("flush jobs when handle-error skip") } } // skip the current event @@ -2009,7 +2015,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { s.locations.reset(currentLocation) if !s.errOperatorHolder.IsInject(startLocation) { // replace operator need redirect to currentLocation - if err = s.streamerController.RedirectStreamer(tctx, currentLocation); err != nil { + if err = s.streamerController.RedirectStreamer(s.runCtx, currentLocation); err != nil { return err } } @@ -2028,17 +2034,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // 2. push forward and replicate some sqls after safeModeExitPoint to downstream // 3. quit because of network error, fail to flush global checkpoint and new safeModeExitPoint to downstream // 4. restart again, quit safe mode at safeModeExitPoint, but some sqls after this location have already been replicated to the downstream - if err = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err != nil { + if err = s.checkpoint.FlushSafeModeExitPoint(s.runCtx); err != nil { return err } - if err = s.safeMode.Add(tctx, -1); err != nil { + if err = s.safeMode.Add(s.runCtx, -1); err != nil { return err } } } ec := eventContext{ - tctx: tctx, + tctx: s.runCtx, header: e.Header, startLocation: &startLocation, currentLocation: ¤tLocation, @@ -2079,7 +2085,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // only need compare binlog position? lastLocation = shardingReSync.currLocation if binlog.CompareLocation(shardingReSync.currLocation, shardingReSync.latestLocation, s.cfg.EnableGTID) >= 0 { - tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync)) + s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync)) err = closeShardingResync() if err != nil { return terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation) @@ -2094,7 +2100,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return terror.Annotatef(err, "fail to record GTID %v", ev.GSet) } - tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastLocation), log.WrapStringerField("location", currentLocation)) + s.tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastLocation), log.WrapStringerField("location", currentLocation)) lastLocation.Position.Pos = e.Header.LogPos // update lastPos err = lastLocation.SetGTID(ev.GSet) if err != nil { @@ -2107,7 +2113,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events if s.checkpoint.CheckGlobalPoint() { - tctx.L().Info("meet heartbeat event and then flush jobs") + s.tctx.L().Info("meet heartbeat event and then flush jobs") err2 = s.flushJobs() } } @@ -2123,6 +2129,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete { + // already wait until XID event, we can stop sync now, s.runcancel will be called in defer func return nil } } @@ -2508,7 +2515,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o qec.p, err = event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + s.tctx.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) } stmt, err := parseOneStmt(qec) @@ -3119,7 +3126,7 @@ func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (ma } qec.p, err = event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + s.tctx.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) } stmt, err := parseOneStmt(qec) if err != nil { @@ -3386,45 +3393,47 @@ func (s *Syncer) Close() { if s.isClosed() { return } - s.stopSync() s.closeDBs() - s.checkpoint.Close() - if err := s.schemaTracker.Close(); err != nil { s.tctx.L().Error("fail to close schema tracker", log.ShortError(err)) } - if s.sgk != nil { s.sgk.Close() } - s.closeOnlineDDL() - // when closing syncer by `stop-task`, remove active relay log from hub s.removeActiveRelayLog() - metrics.RemoveLabelValuesWithTaskInMetrics(s.cfg.Name) - + s.runWg.Wait() s.closed.Store(true) } -// stopSync stops syncing, now it used by Close and Pause -// maybe we can refine the workflow more clear. -func (s *Syncer) stopSync() { - if s.done != nil { - <-s.done // wait Run to return - } - s.closeJobChans() - s.wg.Wait() // wait job workers to return +// Kill kill syncer without graceful. +func (s *Syncer) Kill() { + s.tctx.L().Warn("kill syncer without graceful") + s.runCancel() + s.syncCancel() + s.Close() +} +// stopSync stops stream and rollbacks checkpoint now it used by Close() and Pause(). +func (s *Syncer) stopSync() { // before re-write workflow for s.syncer, simply close it // when resuming, re-create s.syncer if s.streamerController != nil { s.streamerController.Close() } + + // try to rollback checkpoints, if they already flushed, no effect, this operation should call before close schemaTracker + prePos := s.checkpoint.GlobalPoint() + s.checkpoint.Rollback(s.schemaTracker) + currPos := s.checkpoint.GlobalPoint() + if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { + s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) + } } func (s *Syncer) closeOnlineDDL() { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index c555fb766c6..34b29e94266 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tiflow/dm/syncer/dbconn" "github.com/pingcap/tiflow/pkg/errorutil" "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/stretchr/testify/require" sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-mysql-org/go-mysql/mysql" @@ -103,12 +104,16 @@ type testSyncerSuite struct { } type MockStreamer struct { - events []*replication.BinlogEvent - idx uint32 + events []*replication.BinlogEvent + idx uint32 + pending bool } func (m *MockStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if int(m.idx) >= len(m.events) { + if m.pending { + <-ctx.Done() + } return nil, context.Canceled } e := m.events[m.idx] @@ -122,7 +127,7 @@ type MockStreamProducer struct { func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (reader.Streamer, error) { if location.Position.Pos == 4 { - return &MockStreamer{mp.events, 0}, nil + return &MockStreamer{mp.events, 0, false}, nil } bytesLen := 0 idx := uint32(0) @@ -133,32 +138,11 @@ func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (reader break } } - return &MockStreamer{mp.events, idx}, nil + return &MockStreamer{mp.events, idx, false}, nil } func (s *testSyncerSuite) SetUpSuite(c *C) { - loaderDir, err := os.MkdirTemp("", "loader") - c.Assert(err, IsNil) - loaderCfg := config.LoaderConfig{ - Dir: loaderDir, - } - s.cfg = &config.SubTaskConfig{ - From: config.GetDBConfigForTest(), - To: config.GetDBConfigForTest(), - ServerID: 101, - MetaSchema: "test", - Name: "syncer_ut", - ShadowTableRules: []string{config.DefaultShadowTableRules}, - TrashTableRules: []string{config.DefaultTrashTableRules}, - Mode: config.ModeIncrement, - Flavor: "mysql", - LoaderConfig: loaderCfg, - } - s.cfg.Experimental.AsyncCheckpointFlush = true - s.cfg.From.Adjust() - s.cfg.To.Adjust() - - s.cfg.UseRelay = false + s.cfg = genDefaultSubTaskConfig4Test() s.resetEventsGenerator(c) c.Assert(log.InitLogger(&log.Config{}), IsNil) } @@ -237,7 +221,7 @@ func (s *testSyncerSuite) TearDownSuite(c *C) { os.RemoveAll(s.cfg.Dir) } -func (s *testSyncerSuite) mockGetServerUnixTS(mock sqlmock.Sqlmock) { +func mockGetServerUnixTS(mock sqlmock.Sqlmock) { ts := time.Now().Unix() rows := sqlmock.NewRows([]string{"UNIX_TIMESTAMP()"}).AddRow(strconv.FormatInt(ts, 10)) mock.ExpectQuery("SELECT UNIX_TIMESTAMP()").WillReturnRows(rows) @@ -742,14 +726,14 @@ func (s *testSyncerSuite) TestcheckpointID(c *C) { func (s *testSyncerSuite) TestRun(c *C) { // 1. run syncer with column mapping - // 2. execute some sqls which will trigger casuality + // 2. execute some sqls which will trigger causality // 3. check the generated jobs // 4. update config, add route rules, and update syncer - // 5. execute somes sqls and then check jobs generated + // 5. execute some sqls and then check jobs generated db, mock, err := sqlmock.New() c.Assert(err, IsNil) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) checkPointDB, checkPointMock, err := sqlmock.New() @@ -803,7 +787,7 @@ func (s *testSyncerSuite) TestRun(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) @@ -1041,7 +1025,7 @@ func (s *testSyncerSuite) TestRun(c *C) { func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) @@ -1744,3 +1728,84 @@ func (s *testSyncerSuite) TestExecuteSQLSWithIgnore(c *C) { c.Assert(mock.ExpectationsWereMet(), IsNil) } + +func genDefaultSubTaskConfig4Test() *config.SubTaskConfig { + loaderDir, err := os.MkdirTemp("", "loader") + if err != nil { + panic(err) // no happen + } + + loaderCfg := config.LoaderConfig{ + Dir: loaderDir, + } + cfg := &config.SubTaskConfig{ + From: config.GetDBConfigForTest(), + To: config.GetDBConfigForTest(), + ServerID: 101, + MetaSchema: "test", + Name: "syncer_ut", + ShadowTableRules: []string{config.DefaultShadowTableRules}, + TrashTableRules: []string{config.DefaultTrashTableRules}, + Mode: config.ModeIncrement, + Flavor: "mysql", + LoaderConfig: loaderCfg, + UseRelay: false, + } + cfg.Experimental.AsyncCheckpointFlush = true + cfg.From.Adjust() + cfg.To.Adjust() + return cfg +} + +func TestWaitBeforeRunExit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 0 + syncer := NewSyncer(cfg, nil, nil) + + db, mock, err := sqlmock.New() + require.NoError(t, err) + mockGetServerUnixTS(mock) + + syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.reset() + require.NoError(t, syncer.genRouter()) + + mockStreamerProducer := &MockStreamProducer{} + mockStreamer, err := mockStreamerProducer.generateStreamer(binlog.NewLocation("")) + require.NoError(t, err) + // let getEvent pending until ctx.Done() + mockStreamer.(*MockStreamer).pending = true + syncer.streamerController = &StreamerController{ + streamerProducer: mockStreamerProducer, streamer: mockStreamer, closed: false, + } + + wg := &sync.WaitGroup{} + errCh := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + errCh <- syncer.Run(ctx) + }() + time.Sleep(time.Second) // wait s.Run start + + // test s.Run will not exit unit caller cancel ctx or call s.runCancel + cancel() // this will make s.Run exit + wg.Wait() + require.Nil(t, <-errCh) + require.Equal(t, 0, len(errCh)) + require.NotNil(t, syncer.runCtx) + require.NotNil(t, syncer.runCancel) + + // test syncer wait time not more than maxPauseOrStopWaitTime + oldMaxPauseOrStopWaitTime := maxPauseOrStopWaitTime + maxPauseOrStopWaitTime = time.Second + ctx2, cancel := context.WithCancel(context.Background()) + cancel() + runCtx, runCancel := context.WithCancel(context.Background()) + syncer.runCtx, syncer.runCancel = tcontext.NewContext(runCtx, syncer.tctx.L()), runCancel + syncer.runWg.Add(1) + syncer.waitBeforeRunExit(ctx2) + require.Equal(t, context.Canceled, syncer.runCtx.Ctx.Err()) + maxPauseOrStopWaitTime = oldMaxPauseOrStopWaitTime +} diff --git a/dm/tests/_utils/ha_cases_lib.sh b/dm/tests/_utils/ha_cases_lib.sh index d9fb774a90b..6a0a2896ed8 100644 --- a/dm/tests/_utils/ha_cases_lib.sh +++ b/dm/tests/_utils/ha_cases_lib.sh @@ -145,6 +145,7 @@ function start_multi_tasks_cluster() { } function cleanup() { + cleanup_process $* cleanup_data $ha_test cleanup_data $ha_test2 echo "clean source table" @@ -154,7 +155,6 @@ function cleanup() { $(mysql -h127.0.0.1 -p123456 -P${i} -uroot -e "drop database if exists ha_test2;") sleep 1 done - cleanup_process $* } function isolate_master() { diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index 3d43bc83262..e3f23df4231 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -142,8 +142,8 @@ function test_query_timeout() { run_sql_tidb 'SHOW PROCESSLIST;' check_rows_equal 1 - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_query_timeout >>>>>>" @@ -210,8 +210,8 @@ function test_stop_task_before_checkpoint() { "stop-task test" \ "\"result\": true" 3 - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_stop_task_before_checkpoint >>>>>>" @@ -270,8 +270,8 @@ function test_fail_job_between_event() { "\"result\": true" 3 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_fail_job_between_event >>>>>>" @@ -317,8 +317,8 @@ function test_expression_filter() { "query-status test" \ "\"result\": true" 3 - cleanup_data all_mode cleanup_process + cleanup_data all_mode echo "[$(date)] <<<<<< finish test_expression_filter >>>>>>" } diff --git a/dm/tests/checkpoint_transaction/conf/dm-worker1.toml b/dm/tests/checkpoint_transaction/conf/dm-worker1.toml index 7a72ea72bf8..3d99321d632 100644 --- a/dm/tests/checkpoint_transaction/conf/dm-worker1.toml +++ b/dm/tests/checkpoint_transaction/conf/dm-worker1.toml @@ -1,2 +1,4 @@ -name = "worker1" join = "127.0.0.1:8261" +keepalive-ttl = 1 +name = "worker1" +worker-addr = "127.0.0.1:8262" diff --git a/dm/tests/checkpoint_transaction/run.sh b/dm/tests/checkpoint_transaction/run.sh index 5771945384a..e3834469245 100755 --- a/dm/tests/checkpoint_transaction/run.sh +++ b/dm/tests/checkpoint_transaction/run.sh @@ -6,6 +6,31 @@ cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function check_worker_ungraceful_stop_with_retry() { + for ((k = 0; k < 10; k++)); do + sleep 1 + echo "start check_worker_ungraceful_stop_with_retry times: $k" + + num=$(grep "kill unit" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + num=$(grep "kill syncer without graceful" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + num=$(grep "received ungraceful exit ctx, exit now" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + echo "check_worker_ungraceful_stop_with_retry success after retry: $k" + return 0 + done + + echo "check_worker_ungraceful_stop_with_retry failed after retry" + exit 1 +} + function run() { export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/checkCheckpointInMiddleOfTransaction=return" @@ -28,6 +53,33 @@ function run() { # check diff check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + # test ungraceful stop, worker will not wait transaction finish + run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + sleep 2 + # kill dm-master 1 to make worker lost keep alive while a transaction is not finished + echo "kill dm-master1" + kill_dm_master + check_master_port_offline 1 + sleep 1 # wait worker lost keep alive ttl is 1 second + + # check dm-worker will exit quickly without waiting for the transaction to finish + check_worker_ungraceful_stop_with_retry + + # test data in tidb less than source + dataCountSource=$(mysql -uroot -h$MYSQL_HOST1 -P$MYSQL_PORT1 -p$MYSQL_PASSWORD1 -se "select count(1) from checkpoint_transaction.t1") + dataCountInTiDB=$(mysql -uroot -h127.0.0.1 -P4000 -se "select count(1) from checkpoint_transaction.t1") + echo "after ungraceful exit data in source count: $dataCountSource data in tidb count: $dataCountInTiDB" + if [ "$dataCountInTiDB" -lt "$dataCountSource" ]; then + echo "ungraceful stop test success" + else + echo "ungraceful stop test failed" + exit 1 + fi + + # start dm-master again task will be resume, and data will be synced + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 # wait transaction start # you can see why sleep in https://github.com/pingcap/dm/pull/1928#issuecomment-895820239 @@ -41,8 +93,13 @@ function run() { "\"stage\": \"Paused\"" 1 # check the point is the middle of checkpoint num=$(grep "not receive xid job yet" $WORK_DIR/worker1/log/dm-worker.log | wc -l) - [[ $num -gt 0 ]] - sed -e '/not receive xid job yet/d' $WORK_DIR/worker1/log/dm-worker.log >$WORK_DIR/worker1/log/dm-worker.log + + if [ "$num" -gt 0 ]; then + echo "graceful pause test success" + else + echo "graceful pause test failed" + exit 1 + fi echo "start check pause diff" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml @@ -55,6 +112,16 @@ function run() { "query-status test" \ "\"stage\": \"Running\"" 1 + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + rm -rf $WORK_DIR/worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 1 + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 # wait transaction start # you can see why sleep in https://github.com/pingcap/dm/pull/1928#issuecomment-895820239 @@ -65,7 +132,12 @@ function run() { "\"result\": true" 2 # check the point is the middle of checkpoint num=$(grep "not receive xid job yet" $WORK_DIR/worker1/log/dm-worker.log | wc -l) - [[ $num -gt 0 ]] + if [ "$num" -gt 0 ]; then + echo "graceful stop test success" + else + echo "graceful stop test failed" + exit 1 + fi echo "start check stop diff" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 01b559ccba9..faa7d1d43db 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -53,8 +53,8 @@ function complex_behaviour() { update_num=$(grep -o '"number of filtered update"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}') [ $update_num -eq 3 ] - cleanup_data expr_filter cleanup_process $* + cleanup_data expr_filter } function run() { diff --git a/dm/tests/full_mode/run.sh b/dm/tests/full_mode/run.sh index 80fd8975c52..b74e7599a03 100755 --- a/dm/tests/full_mode/run.sh +++ b/dm/tests/full_mode/run.sh @@ -55,8 +55,8 @@ function fail_acquire_global_lock() { "\"stage\": \"Paused\"" 2 \ "you need (at least one of) the RELOAD privilege(s) for this operation" 2 - cleanup_data full_mode cleanup_process $* + cleanup_data full_mode } function escape_schema() { @@ -113,8 +113,8 @@ function escape_schema() { check_metric $WORKER1_PORT 'dumpling_dump_finished_tables' 3 0 3 check_metric $WORKER2_PORT 'dumpling_dump_finished_tables' 3 0 3 - cleanup_data full/mode cleanup_process $* + cleanup_data full/mode } function empty_data() { @@ -138,8 +138,8 @@ function empty_data() { check_log_contains $WORK_DIR/worker1/log/dm-worker.log "progress=\"100.00 %\"" check_log_contains $WORK_DIR/worker2/log/dm-worker.log "progress=\"100.00 %\"" - cleanup_data full_mode cleanup_process $* + cleanup_data full_mode } function run() { diff --git a/dm/tests/load_interrupt/run.sh b/dm/tests/load_interrupt/run.sh index 79f8c79c44c..e0e96aebb9c 100755 --- a/dm/tests/load_interrupt/run.sh +++ b/dm/tests/load_interrupt/run.sh @@ -60,8 +60,8 @@ function test_save_checkpoint_failed() { ls $WORK_DIR/worker1/dumped_data.test echo "test_save_checkpoint_failed SUCCESS!" - cleanup_data load_interrupt cleanup_process $* + cleanup_data load_interrupt } function run() { diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index f9beb3a4c7f..b359bd0c606 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -10,82 +10,10 @@ SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt" API_VERSION="v1alpha1" -function test_cant_dail_upstream() { - cleanup_data $TEST_NAME - cleanup_process - - run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml - check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - - cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml - dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - - kill_dm_worker - - export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/conn/failDBPing=return()" - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - - # make sure DM-worker doesn't exit - sleep 2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status -s $SOURCE_ID1" \ - "injected error" 1 - - export GO_FAILPOINTS="" - cleanup_process - cleanup_data $TEST_NAME -} - -function test_cant_dail_downstream() { - cleanup_data $TEST_NAME - cleanup_process - - run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml - check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - - cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml - dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 - - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "start-relay -s $SOURCE_ID1 worker1" \ - "\"result\": true" 2 - dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta" - - kill_dm_worker - # kill tidb - pkill -hup tidb-server 2>/dev/null || true - wait_process_exit tidb-server - - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - - # make sure DM-worker doesn't exit - sleep 2 - run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "query-status -s $SOURCE_ID1" \ - "\"relayCatchUpMaster\": true" 1 \ - "dial tcp 127.0.0.1:4000: connect: connection refused" 1 - - # restart tidb - run_tidb_server 4000 $TIDB_PASSWORD - sleep 2 - - cleanup_process - cleanup_data $TEST_NAME -} - function test_restart_relay_status() { - cleanup_data $TEST_NAME cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="" run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT @@ -157,11 +85,83 @@ function test_restart_relay_status() { "list-member --worker" \ "relay" 1 \ "bound" 2 + + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_restart_relay_status passed" } -function test_kill_dump_connection() { +function test_cant_dail_upstream() { + cleanup_process cleanup_data $TEST_NAME + export GO_FAILPOINTS="" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 2 + + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/conn/failDBPing=return()" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # make sure DM-worker doesn't exit + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "injected error" 1 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed" +} + +function test_cant_dail_downstream() { + cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 2 + dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta" + + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + # kill tidb + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"relayCatchUpMaster\": true" 1 \ + "dial tcp 127.0.0.1:4000: connect: connection refused" 1 + + # restart tidb + run_tidb_server 4000 $TIDB_PASSWORD + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_downstream passed" +} + +function test_kill_dump_connection() { cleanup_process + cleanup_data $TEST_NAME run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' @@ -193,16 +193,12 @@ function test_kill_dump_connection() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ "\"relayCatchUpMaster\": true" 1 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_kill_dump_connection passed" +} +function test_relay_operations() { cleanup_process cleanup_data $TEST_NAME -} - -function run() { - test_restart_relay_status - test_cant_dail_downstream - test_cant_dail_upstream - export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -234,7 +230,7 @@ function run() { "\"worker\": \"worker1\"" 1 \ "\"worker\": \"worker2\"" 1 - # worker1 and worker2 has one realy job and worker3 have none. + # worker1 and worker2 has one relay job and worker3 have none. check_metric $WORKER1_PORT "dm_relay_binlog_file{node=\"relay\"}" 3 0 2 check_metric $WORKER1_PORT "dm_relay_exit_with_error_count" 3 -1 1 check_metric $WORKER2_PORT "dm_relay_binlog_file{node=\"relay\"}" 3 0 2 @@ -246,7 +242,7 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - # relay task tranfer to worker1 with no error. + # relay task transfer to worker1 with no error. check_metric $WORKER1_PORT "dm_relay_data_corruption" 3 -1 1 check_metric $WORKER1_PORT "dm_relay_read_error_count" 3 -1 1 check_metric $WORKER1_PORT "dm_relay_write_error_count" 3 -1 1 @@ -254,8 +250,9 @@ function run() { check_metric $WORKER1_PORT 'dm_relay_space{type="available"}' 5 0 9223372036854775807 # subtask is preferred to scheduled to another relay worker - pkill -hup -f dm-worker1.toml 2>/dev/null || true - wait_pattern_exit dm-worker1.toml + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 # worker1 is down, worker2 has running relay and sync unit run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ @@ -296,11 +293,12 @@ function run() { [ "$new_relay_log_count_1" -eq 1 ] [ "$new_relay_log_count_2" -eq 1 ] - pkill -hup -f dm-worker1.toml 2>/dev/null || true - wait_pattern_exit dm-worker1.toml - pkill -hup -f dm-worker2.toml 2>/dev/null || true - wait_pattern_exit dm-worker2.toml - + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + echo "kill dm-worker2" + ps aux | grep dm-worker2 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 # if all relay workers are offline, relay-not-enabled worker should continue to sync run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ @@ -322,8 +320,7 @@ function run() { # destroy cluster cleanup_process $* - rm -rf $WORK_DIR - mkdir $WORK_DIR + cleanup_data $TEST_NAME # insert new data run_sql_file $cur/data/db1.increment5.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -352,7 +349,14 @@ function run() { "\"result\": true" 2 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_operations passed" +} +function run() { + test_relay_operations + test_cant_dail_upstream + test_restart_relay_status + test_cant_dail_downstream test_kill_dump_connection } diff --git a/dm/tests/safe_mode/run.sh b/dm/tests/safe_mode/run.sh index 644967cd306..ed1d8ff74ff 100755 --- a/dm/tests/safe_mode/run.sh +++ b/dm/tests/safe_mode/run.sh @@ -55,8 +55,8 @@ function consistency_none() { check_log_contain_with_retry "\[\"enable safe-mode for safe mode exit point, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" $WORK_DIR/worker2/log/dm-worker.log run_sql_source2 "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'" - cleanup_data safe_mode_target cleanup_process $* + cleanup_data safe_mode_target } function check_exit_safe_binlog() { @@ -182,8 +182,8 @@ function safe_mode_recover() { echo "finish running run safe mode recover case $i" ((i += 1)) - cleanup_data safe_mode_target cleanup_process $* + cleanup_data safe_mode_target done } diff --git a/dm/tests/start_task/run.sh b/dm/tests/start_task/run.sh index c725983852f..49b3a5519f2 100644 --- a/dm/tests/start_task/run.sh +++ b/dm/tests/start_task/run.sh @@ -58,8 +58,8 @@ function lazy_init_tracker() { check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t50' 1 check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t51' - cleanup_data start_task cleanup_process + cleanup_data start_task } function start_task_by_time() {