From 8866cbdccf9555e6888f549983b7afc84e4cd00a Mon Sep 17 00:00:00 2001 From: Ehco Date: Wed, 16 Feb 2022 15:11:39 +0800 Subject: [PATCH] This is an automated cherry-pick of #4035 Signed-off-by: ti-chi-bot --- dm/dm/unit/unit.go | 2 + dm/dm/worker/hub_test.go | 2 +- dm/dm/worker/join.go | 3 +- dm/dm/worker/server.go | 62 +- dm/dm/worker/server_test.go | 33 +- dm/dm/worker/source_worker.go | 10 +- dm/dm/worker/source_worker_test.go | 149 ++++- dm/dm/worker/subtask.go | 21 + dm/dm/worker/subtask_holder.go | 10 + dm/dm/worker/subtask_test.go | 2 + dm/dumpling/dumpling.go | 6 + dm/loader/lightning.go | 6 + dm/loader/loader.go | 6 + dm/syncer/dml_worker.go | 26 +- dm/syncer/syncer.go | 560 +++++++++++++----- dm/syncer/syncer_test.go | 114 +++- dm/tests/_utils/ha_cases_lib.sh | 2 +- dm/tests/all_mode/run.sh | 8 +- .../conf/dm-worker1.toml | 4 +- dm/tests/checkpoint_transaction/run.sh | 78 ++- dm/tests/expression_filter/run.sh | 2 +- dm/tests/full_mode/run.sh | 6 +- dm/tests/load_interrupt/run.sh | 2 +- dm/tests/new_relay/run.sh | 120 +++- dm/tests/safe_mode/run.sh | 4 +- dm/tests/start_task/run.sh | 2 +- 26 files changed, 979 insertions(+), 261 deletions(-) diff --git a/dm/dm/unit/unit.go b/dm/dm/unit/unit.go index 4451a652e86..363d17bf2cc 100644 --- a/dm/dm/unit/unit.go +++ b/dm/dm/unit/unit.go @@ -48,6 +48,8 @@ type Unit interface { // Close shuts down the process and closes the unit, after that can not call Process to resume // The implementation should not block for a long time. Close() + // Kill shuts down the process and closes the unit without graceful. + Kill() // Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned. // The implementation should not block for a long time. Pause() diff --git a/dm/dm/worker/hub_test.go b/dm/dm/worker/hub_test.go index d76697b1e6a..ca20fd437c3 100644 --- a/dm/dm/worker/hub_test.go +++ b/dm/dm/worker/hub_test.go @@ -20,5 +20,5 @@ import ( func (t *testServer) testConidtionHub(c *C, s *Server) { // test condition hub c.Assert(GetConditionHub(), NotNil) - c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true)) + c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true)) } diff --git a/dm/dm/worker/join.go b/dm/dm/worker/join.go index ece56ec1a4d..a50c47d44fe 100644 --- a/dm/dm/worker/join.go +++ b/dm/dm/worker/join.go @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() { failpoint.Label("bypass") // TODO: report the error. - err := s.stopWorker("", true) + // when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737 + err := s.stopSourceWorker("", true, false) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return // return if failed to stop the worker. diff --git a/dm/dm/worker/server.go b/dm/dm/worker/server.go index b3f9b0ce51e..7f373f2085e 100644 --- a/dm/dm/worker/server.go +++ b/dm/dm/worker/server.go @@ -341,6 +341,13 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { } rev = rev1 if relaySource == nil { +<<<<<<< HEAD +======= + if w := s.getSourceWorker(true); w != nil && w.startedRelayBySourceCfg { + break + } + log.L().Info("didn't found relay config after etcd retryable error. Will stop relay now") +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) err = s.disableRelay("") if err != nil { log.L().Error("fail to disableRelay after etcd retryable error", zap.Error(err)) @@ -351,7 +358,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { s.Lock() defer s.Unlock() - if w := s.getWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID { + if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == relaySource.SourceID { // we may face both relay config and subtask bound changed in a compaction error, so here // we check if observeSourceBound has started a worker // TODO: add a test for this situation @@ -362,7 +369,7 @@ func (s *Server) observeRelayConfig(ctx context.Context, rev int64) error { } return nil } - err = s.stopWorker("", false) + err = s.stopSourceWorker("", false, true) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return err // return if failed to stop the worker. @@ -437,7 +444,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { s.Lock() defer s.Unlock() - if w := s.getWorker(false); w != nil && w.cfg.SourceID == bound.Source { + if w := s.getSourceWorker(false); w != nil && w.cfg.SourceID == bound.Source { // we may face both relay config and subtask bound changed in a compaction error, so here // we check if observeRelayConfig has started a worker // TODO: add a test for this situation @@ -448,7 +455,7 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } return nil } - err = s.stopWorker("", false) + err = s.stopSourceWorker("", false, true) if err != nil { log.L().Error("fail to stop worker", zap.Error(err)) return err // return if failed to stop the worker. @@ -475,30 +482,33 @@ func (s *Server) observeSourceBound(ctx context.Context, rev int64) error { } func (s *Server) doClose() { - s.cancel() - // close server in advance, stop receiving source bound and relay bound - s.wg.Wait() - s.Lock() defer s.Unlock() + if s.closed.Load() { return } - // close worker and wait for return - if w := s.getWorker(false); w != nil { - w.Close() + // stop server in advance, stop receiving source bound and relay bound + s.cancel() + s.wg.Wait() + // stop worker and wait for return(we already lock the whole Sever, so no need use lock to get source worker) + if w := s.getSourceWorker(false); w != nil { + w.Stop(true) } s.closed.Store(true) } // Close close the RPC server, this function can be called multiple times. func (s *Server) Close() { + s.doClose() // we should stop current sync first, otherwise master may schedule task on new worker while we are closing s.stopKeepAlive() - s.doClose() + if s.etcdClient != nil { + s.etcdClient.Close() + } } // if needLock is false, we should make sure Server has been locked in caller. -func (s *Server) getWorker(needLock bool) *SourceWorker { +func (s *Server) getSourceWorker(needLock bool) *SourceWorker { if needLock { s.Lock() defer s.Unlock() @@ -531,7 +541,7 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) { defer s.Unlock() } // now setSourceStatus will be concurrently called. skip setting a source status if worker has been closed - if s.getWorker(false) == nil && source != "" { + if s.getSourceWorker(false) == nil && source != "" { return } s.sourceStatus = pb.SourceStatus{ @@ -549,12 +559,12 @@ func (s *Server) setSourceStatus(source string, err error, needLock bool) { // if sourceID is set to "", worker will be closed directly // if sourceID is not "", we will check sourceID with w.cfg.SourceID. -func (s *Server) stopWorker(sourceID string, needLock bool) error { +func (s *Server) stopSourceWorker(sourceID string, needLock, graceful bool) error { if needLock { s.Lock() defer s.Unlock() } - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has not been started, no need to stop", zap.String("source", sourceID)) return nil // no need to stop because not started yet @@ -565,7 +575,7 @@ func (s *Server) stopWorker(sourceID string, needLock bool) error { s.UpdateKeepAliveTTL(s.cfg.KeepAliveTTL) s.setWorker(nil, false) s.setSourceStatus("", nil, false) - w.Close() + w.Stop(graceful) return nil } @@ -680,7 +690,7 @@ func (s *Server) enableHandleSubtasks(sourceCfg *config.SourceConfig, needLock b func (s *Server) disableHandleSubtasks(source string) error { s.Lock() defer s.Unlock() - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has already stopped before DisableHandleSubtasks", zap.String("source", source)) return nil @@ -689,7 +699,7 @@ func (s *Server) disableHandleSubtasks(source string) error { var err error if !w.relayEnabled.Load() { log.L().Info("relay is not enabled after disabling subtask, so stop worker") - err = s.stopWorker(source, false) + err = s.stopSourceWorker(source, false, true) } return err } @@ -734,7 +744,7 @@ func (s *Server) enableRelay(sourceCfg *config.SourceConfig, needLock bool) erro func (s *Server) disableRelay(source string) error { s.Lock() defer s.Unlock() - w := s.getWorker(false) + w := s.getSourceWorker(false) if w == nil { log.L().Warn("worker has already stopped before DisableRelay", zap.Any("relaySource", source)) return nil @@ -744,7 +754,7 @@ func (s *Server) disableRelay(source string) error { var err error if !w.subTaskEnabled.Load() { log.L().Info("subtask is not enabled after disabling relay, so stop worker") - err = s.stopWorker(source, false) + err = s.stopSourceWorker(source, false, true) } return err } @@ -760,7 +770,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* SourceStatus: &sourceStatus, } - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call QueryStatus, because no mysql source is being handled in the worker") resp.Result = false @@ -782,7 +792,7 @@ func (s *Server) QueryStatus(ctx context.Context, req *pb.QueryStatusRequest) (* // PurgeRelay implements WorkerServer.PurgeRelay. func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "PurgeRelay"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call StartSubTask, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil @@ -799,7 +809,7 @@ func (s *Server) PurgeRelay(ctx context.Context, req *pb.PurgeRelayRequest) (*pb func (s *Server) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchemaRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "OperateSchema"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call OperateSchema, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil @@ -826,7 +836,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou defer s.Unlock() } - if w := s.getWorker(false); w != nil { + if w := s.getSourceWorker(false); w != nil { if w.cfg.SourceID == cfg.SourceID { log.L().Info("mysql source is being handled", zap.String("sourceID", s.worker.cfg.SourceID)) return w, nil @@ -917,7 +927,7 @@ func getMinLocForSubTask(ctx context.Context, subTaskCfg config.SubTaskConfig) ( func (s *Server) HandleError(ctx context.Context, req *pb.HandleWorkerErrorRequest) (*pb.CommonWorkerResponse, error) { log.L().Info("", zap.String("request", "HandleError"), zap.Stringer("payload", req)) - w := s.getWorker(true) + w := s.getSourceWorker(true) if w == nil { log.L().Warn("fail to call HandleError, because no mysql source is being handled in the worker") return makeCommonWorkerResponse(terror.ErrWorkerNoStart.Generate()), nil diff --git a/dm/dm/worker/server_test.go b/dm/dm/worker/server_test.go index c9458ece493..c5bca5f09d7 100644 --- a/dm/dm/worker/server_test.go +++ b/dm/dm/worker/server_test.go @@ -209,7 +209,7 @@ func (t *testServer) TestServer(c *C) { _, err = ha.DeleteSubTaskStage(s.etcdClient, ha.NewSubTaskStage(pb.Stage_Stopped, sourceCfg.SourceID, subtaskCfg.Name)) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil + return s.getSourceWorker(true).subTaskHolder.findSubTask(subtaskCfg.Name) == nil }), IsTrue) dupServer := NewServer(cfg) @@ -338,13 +338,13 @@ func (t *testServer) TestHandleSourceBoundAfterError(c *C) { _, err = ha.PutSourceCfg(etcdCli, sourceCfg) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) _, err = ha.DeleteSourceBound(etcdCli, s.cfg.Name) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) == nil + return s.getSourceWorker(true) == nil }), IsTrue) } @@ -414,19 +414,19 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { }() // step 4.1: should stop the running worker, source bound has been deleted, should stop this worker c.Assert(utils.WaitSomething(20, 100*time.Millisecond, func() bool { - return s.getWorker(true) == nil + return s.getSourceWorker(true) == nil }), IsTrue) // step 4.2: put a new source bound, source should be started _, err = ha.PutSourceBound(etcdCli, sourceBound) c.Assert(err, IsNil) c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) - cfg2 := s.getWorker(true).cfg + cfg2 := s.getSourceWorker(true).cfg c.Assert(cfg2, DeepEquals, sourceCfg) cancel1() wg.Wait() - c.Assert(s.stopWorker(sourceCfg.SourceID, true), IsNil) + c.Assert(s.stopSourceWorker(sourceCfg.SourceID, true, true), IsNil) // step 5: start observeSourceBound from compacted revision again, should start worker ctx2, cancel2 := context.WithCancel(ctx) wg.Add(1) @@ -435,9 +435,9 @@ func (t *testServer) TestWatchSourceBoundEtcdCompact(c *C) { c.Assert(s.observeSourceBound(ctx2, startRev), IsNil) }() c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - return s.getWorker(true) != nil + return s.getSourceWorker(true) != nil }), IsTrue) - cfg2 = s.getWorker(true).cfg + cfg2 = s.getSourceWorker(true).cfg c.Assert(cfg2, DeepEquals, sourceCfg) cancel2() wg.Wait() @@ -481,13 +481,13 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) c.Assert(err, IsNil) // worker should be started and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - w := s.getWorker(true) + w := s.getSourceWorker(true) return w != nil && !w.closed.Load() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) } else { // worker should be started before stopped - w := s.getWorker(true) + w := s.getSourceWorker(true) c.Assert(w, NotNil) c.Assert(w.closed.Load(), IsFalse) _, err := ha.DeleteRelayConfig(s.etcdClient, w.name) @@ -496,7 +496,7 @@ func (t *testServer) testOperateWorker(c *C, s *Server, dir string, start bool) c.Assert(err, IsNil) // worker should be closed and without error c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { - currentWorker := s.getWorker(true) + currentWorker := s.getSourceWorker(true) return currentWorker == nil && w.closed.Load() }), IsTrue) c.Assert(s.getSourceStatus(true).Result, IsNil) @@ -507,7 +507,7 @@ func (t *testServer) testRetryConnectMaster(c *C, s *Server, etcd *embed.Etcd, d etcd.Close() time.Sleep(6 * time.Second) // When worker server fail to keepalive with etcd, server should close its worker - c.Assert(s.getWorker(true), IsNil) + c.Assert(s.getSourceWorker(true), IsNil) c.Assert(s.getSourceStatus(true).Result, IsNil) ETCD, err := createMockETCD(dir, "http://"+hostName) c.Assert(err, IsNil) @@ -551,8 +551,15 @@ func (t *testServer) testSubTaskRecover(c *C, s *Server, dir string) { func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, etcd *embed.Etcd) { etcd.Close() +<<<<<<< HEAD time.Sleep(retryConnectSleepTime + time.Duration(defaultKeepAliveTTL+3)*time.Second) c.Assert(s.getWorker(true), IsNil) +======= + c.Assert(utils.WaitSomething(int(defaultKeepAliveTTL+3), time.Second, func() bool { + return s.getSourceWorker(true) == nil + }), IsTrue) + c.Assert(s.getSourceWorker(true), IsNil) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) } func (t *testServer) TestGetMinLocInAllSubTasks(c *C) { diff --git a/dm/dm/worker/source_worker.go b/dm/dm/worker/source_worker.go index ffd31441ad6..cbd633c1507 100644 --- a/dm/dm/worker/source_worker.go +++ b/dm/dm/worker/source_worker.go @@ -189,7 +189,7 @@ func (w *SourceWorker) Start() { } // Close stops working and releases resources. -func (w *SourceWorker) Close() { +func (w *SourceWorker) Stop(graceful bool) { if w.closed.Load() { w.l.Warn("already closed") return @@ -204,8 +204,12 @@ func (w *SourceWorker) Close() { w.Lock() defer w.Unlock() - // close all sub tasks - w.subTaskHolder.closeAllSubTasks() + // close or kill all subtasks + if graceful { + w.subTaskHolder.closeAllSubTasks() + } else { + w.subTaskHolder.killAllSubTasks() + } if w.relayHolder != nil { w.relayHolder.Close() diff --git a/dm/dm/worker/source_worker_test.go b/dm/dm/worker/source_worker_test.go index 2b234f7b924..2d8eeb33a8c 100644 --- a/dm/dm/worker/source_worker_test.go +++ b/dm/dm/worker/source_worker_test.go @@ -88,11 +88,11 @@ func (t *testServer) testWorker(c *C) { c.Assert(err, IsNil) c.Assert(w.GetUnitAndSourceStatusJSON("", nil), HasLen, emptyWorkerStatusInfoJSONLength) - // close twice - w.Close() + // stop twice + w.Stop(true) c.Assert(w.closed.Load(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) - w.Close() + w.Stop(true) c.Assert(w.closed.Load(), IsTrue) c.Assert(w.subTaskHolder.getAllSubTasks(), HasLen, 0) c.Assert(w.closed.Load(), IsTrue) @@ -197,11 +197,15 @@ func (t *testServer2) TestTaskAutoResume(c *C) { c.Assert(err, IsNil) subtaskCfg.Mode = "full" subtaskCfg.Timezone = "UTC" +<<<<<<< HEAD c.Assert(s.getWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running, true), IsNil) +======= + c.Assert(s.getSourceWorker(true).StartSubTask(&subtaskCfg, pb.Stage_Running, pb.Stage_Stopped, true), IsNil) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) // check task in paused state c.Assert(utils.WaitSomething(100, 100*time.Millisecond, func() bool { - subtaskStatus, _, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) + subtaskStatus, _, _ := s.getSourceWorker(true).QueryStatus(context.Background(), taskName) for _, st := range subtaskStatus { if st.Name == taskName && st.Stage == pb.Stage_Paused { return true @@ -212,7 +216,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { //nolint:errcheck failpoint.Disable("github.com/pingcap/tiflow/dm/dumpling/dumpUnitProcessWithError") - rtsc, ok := s.getWorker(true).taskStatusChecker.(*realTaskStatusChecker) + rtsc, ok := s.getSourceWorker(true).taskStatusChecker.(*realTaskStatusChecker) c.Assert(ok, IsTrue) defer func() { // close multiple time @@ -222,7 +226,7 @@ func (t *testServer2) TestTaskAutoResume(c *C) { // check task will be auto resumed c.Assert(utils.WaitSomething(10, 100*time.Millisecond, func() bool { - sts, _, _ := s.getWorker(true).QueryStatus(context.Background(), taskName) + sts, _, _ := s.getSourceWorker(true).QueryStatus(context.Background(), taskName) for _, st := range sts { if st.Name == taskName && st.Stage == pb.Stage_Running { return true @@ -294,7 +298,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) { // start worker w, err := NewSourceWorker(sourceCfg, etcdCli, "") c.Assert(err, IsNil) - defer w.Close() + defer w.Stop(true) go func() { w.Start() }() @@ -467,7 +471,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - defer w.Close() + defer w.Stop(true) go func() { w.Start() }() @@ -548,11 +552,136 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) { c.Assert(status, HasLen, 1) c.Assert(status[0].Name, Equals, subtaskCfg.Name) c.Assert(status[0].Stage, Equals, pb.Stage_Running) - w.Close() + w.Stop(true) cancel2() wg.Wait() } +<<<<<<< HEAD +======= +func (t *testWorkerEtcdCompact) TestWatchValidatorStageEtcdCompact(c *C) { + var ( + masterAddr = tempurl.Alloc()[len("http://"):] + keepAliveTTL = int64(1) + startRev = int64(1) + ) + + etcdDir := c.MkDir() + ETCD, err := createMockETCD(etcdDir, "http://"+masterAddr) + c.Assert(err, IsNil) + defer ETCD.Close() + cfg := NewConfig() + c.Assert(cfg.Parse([]string{"-config=./dm-worker.toml"}), IsNil) + cfg.Join = masterAddr + cfg.KeepAliveTTL = keepAliveTTL + cfg.RelayKeepAliveTTL = keepAliveTTL + + etcdCli, err := clientv3.New(clientv3.Config{ + Endpoints: GetJoinURLs(cfg.Join), + DialTimeout: dialTimeout, + DialKeepAliveTime: keepaliveTime, + DialKeepAliveTimeout: keepaliveTimeout, + }) + c.Assert(err, IsNil) + sourceCfg := loadSourceConfigWithoutPassword(c) + sourceCfg.From = config.GetDBConfigForTest() + sourceCfg.EnableRelay = false + + // + // step 1: start worker + w, err := NewSourceWorker(sourceCfg, etcdCli, "", "") + c.Assert(err, IsNil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + defer w.Stop(true) + go func() { + w.Start() + }() + c.Assert(utils.WaitSomething(50, 100*time.Millisecond, func() bool { + return !w.closed.Load() + }), IsTrue) + + // + // step 2: Put a subtask config and subtask stage to this source, then delete it + subtaskCfg := config.SubTaskConfig{} + err = subtaskCfg.DecodeFile(subtaskSampleFile, true) + c.Assert(err, IsNil) + subtaskCfg.MydumperPath = mydumperPath + subtaskCfg.ValidatorCfg = config.ValidatorConfig{Mode: config.ValidationNone} + + // increase revision + _, err = etcdCli.Put(context.Background(), "/dummy-key", "value") + c.Assert(err, IsNil) + rev, err := ha.PutSubTaskCfgStage(etcdCli, []config.SubTaskConfig{subtaskCfg}, []ha.Stage{ha.NewSubTaskStage(pb.Stage_Running, sourceCfg.SourceID, subtaskCfg.Name)}, nil) + c.Assert(err, IsNil) + + // + // step 2.1: start a subtask manually + c.Assert(w.StartSubTask(&subtaskCfg, pb.Stage_Running, pb.Stage_Stopped, true), IsNil) + + // + // step 3: trigger etcd compaction and check whether we can receive it through watcher + _, err = etcdCli.Compact(ctx, rev) + c.Assert(err, IsNil) + subTaskStageCh := make(chan ha.Stage, 10) + subTaskErrCh := make(chan error, 10) + ctxForWatch, cancelFunc := context.WithCancel(ctx) + ha.WatchValidatorStage(ctxForWatch, etcdCli, sourceCfg.SourceID, startRev, subTaskStageCh, subTaskErrCh) + select { + case err = <-subTaskErrCh: + c.Assert(err, Equals, etcdErrCompacted) + case <-time.After(300 * time.Millisecond): + c.Fatal("fail to get etcd error compacted") + } + cancelFunc() + + // + // step 4: watch subtask stage from startRev + subTask := w.subTaskHolder.findSubTask(subtaskCfg.Name) + getValidator := func() *syncer.DataValidator { + subTask.RLock() + defer subTask.RUnlock() + return subTask.validator + } + c.Assert(subTask, NotNil) + c.Assert(getValidator(), IsNil) + var wg sync.WaitGroup + ctx1, cancel1 := context.WithCancel(ctx) + wg.Add(1) + go func() { + defer wg.Done() + c.Assert(w.observeValidatorStage(ctx1, startRev), IsNil) + }() + time.Sleep(time.Second) + + subtaskCfg.ValidatorCfg = config.ValidatorConfig{Mode: config.ValidationFast} + unitBakup := subTask.units[len(subTask.units)-1] + subTask.units[len(subTask.units)-1] = &syncer.Syncer{} // validator need a Syncer, not a mocked unit + validatorStage := ha.NewValidatorStage(pb.Stage_Running, subtaskCfg.SourceID, subtaskCfg.Name) + _, err = ha.PutSubTaskCfgStage(etcdCli, []config.SubTaskConfig{subtaskCfg}, nil, []ha.Stage{validatorStage}) + c.Assert(err, IsNil) + + // validator created + c.Assert(utils.WaitSomething(30, 100*time.Millisecond, func() bool { + return getValidator() != nil + }), IsTrue) + + subTask.units[len(subTask.units)-1] = unitBakup // restore unit + cancel1() + wg.Wait() + + // test operate validator + err = w.operateValidatorStage(ha.Stage{IsDeleted: true}) + c.Assert(err, IsNil) + err = w.operateValidatorStage(ha.Stage{Expect: pb.Stage_Running, Task: "not-exist"}) + c.Assert(err, IsNil) + err = w.operateValidatorStage(ha.Stage{Expect: pb.Stage_Running, Task: subtaskCfg.Name}) + c.Assert(err, ErrorMatches, ".*failed to get subtask config.*") + err = w.operateValidatorStage(ha.Stage{Expect: pb.Stage_Running, Source: subtaskCfg.SourceID, Task: subtaskCfg.Name}) + c.Assert(err, IsNil) +} + +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { var ( masterAddr = tempurl.Alloc()[len("http://"):] @@ -586,7 +715,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) { c.Assert(err, IsNil) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - defer w.Close() + defer w.Stop(true) go func() { c.Assert(w.EnableRelay(), IsNil) w.Start() diff --git a/dm/dm/worker/subtask.go b/dm/dm/worker/subtask.go index c7a28751e2e..e464bbf4bd1 100644 --- a/dm/dm/worker/subtask.go +++ b/dm/dm/worker/subtask.go @@ -386,6 +386,16 @@ func (st *SubTask) closeUnits() { u := st.units[i] st.l.Info("closing unit process", zap.Stringer("unit", cu.Type())) u.Close() + st.l.Info("closing unit done", zap.Stringer("unit", cu.Type())) + } +} + +func (st *SubTask) killCurrentUnit() { + if st.CurrUnit() != nil { + ut := st.CurrUnit().Type() + st.l.Info("kill unit", zap.String("task", st.cfg.Name), zap.Stringer("unit", ut)) + st.CurrUnit().Kill() + st.l.Info("kill unit done", zap.String("task", st.cfg.Name), zap.Stringer("unit", ut)) } } @@ -488,7 +498,18 @@ func (st *SubTask) Close() { st.l.Info("subTask is already closed, no need to close") return } + st.closeUnits() // close all un-closed units + updateTaskMetric(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped, st.workerName) +} +// Kill kill running unit and stop the sub task. +func (st *SubTask) Kill() { + st.l.Info("killing") + if !st.setStageIfNotIn([]pb.Stage{pb.Stage_Stopped, pb.Stage_Stopping, pb.Stage_Finished}, pb.Stage_Stopping) { + st.l.Info("subTask is already closed, no need to close") + return + } + st.killCurrentUnit() st.closeUnits() // close all un-closed units updateTaskMetric(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped, st.workerName) } diff --git a/dm/dm/worker/subtask_holder.go b/dm/dm/worker/subtask_holder.go index e90aa9fd98e..a319a501d2e 100644 --- a/dm/dm/worker/subtask_holder.go +++ b/dm/dm/worker/subtask_holder.go @@ -73,6 +73,16 @@ func (h *subTaskHolder) closeAllSubTasks() { h.subTasks = make(map[string]*SubTask) } +// killAllSubTasks kill and stop all subtask instances. +func (h *subTaskHolder) killAllSubTasks() { + h.mu.Lock() + defer h.mu.Unlock() + for _, st := range h.subTasks { + st.Kill() + } + h.subTasks = make(map[string]*SubTask) +} + // findSubTask finds subtask instance by name. func (h *subTaskHolder) findSubTask(name string) *SubTask { h.mu.RLock() diff --git a/dm/dm/worker/subtask_test.go b/dm/dm/worker/subtask_test.go index 88d9e7efd5c..d98ffd504e1 100644 --- a/dm/dm/worker/subtask_test.go +++ b/dm/dm/worker/subtask_test.go @@ -118,6 +118,8 @@ func (m *MockUnit) Process(ctx context.Context, pr chan pb.ProcessResult) { func (m *MockUnit) Close() {} +func (m *MockUnit) Kill() {} + func (m MockUnit) Pause() {} func (m *MockUnit) Resume(ctx context.Context, pr chan pb.ProcessResult) { m.Process(ctx, pr) } diff --git a/dm/dumpling/dumpling.go b/dm/dumpling/dumpling.go index cab39480b0a..3306444aaf7 100644 --- a/dm/dumpling/dumpling.go +++ b/dm/dumpling/dumpling.go @@ -168,6 +168,12 @@ func (m *Dumpling) Close() { m.closed.Store(true) } +// Kill implements Unit.Kill. +func (m *Dumpling) Kill() { + // TODO: implement kill + m.Close() +} + // Pause implements Unit.Pause. func (m *Dumpling) Pause() { if m.closed.Load() { diff --git a/dm/loader/lightning.go b/dm/loader/lightning.go index f92278f4c41..8076d74af83 100644 --- a/dm/loader/lightning.go +++ b/dm/loader/lightning.go @@ -256,6 +256,12 @@ func (l *LightningLoader) Close() { l.closed.Store(true) } +// Kill does ungraceful shutdown. +func (l *LightningLoader) Kill() { + // TODO: implement kill + l.Close() +} + // Pause pauses the process, and it can be resumed later // should cancel context from external. func (l *LightningLoader) Pause() { diff --git a/dm/loader/loader.go b/dm/loader/loader.go index 3731a943984..f84136060fa 100644 --- a/dm/loader/loader.go +++ b/dm/loader/loader.go @@ -801,6 +801,12 @@ func (l *Loader) Close() { l.closed.Store(true) } +// Kill kill the loader without graceful. +func (l *Loader) Kill() { + // TODO: implement kill + l.Close() +} + // stopLoad stops loading, now it used by Close and Pause // maybe we can refine the workflow more clear. func (l *Loader) stopLoad() { diff --git a/dm/syncer/dml_worker.go b/dm/syncer/dml_worker.go index 175df8ce0a6..7270b4e0480 100644 --- a/dm/syncer/dml_worker.go +++ b/dm/syncer/dml_worker.go @@ -37,8 +37,12 @@ type DMLWorker struct { chanSize int multipleRows bool toDBConns []*dbconn.DBConn +<<<<<<< HEAD tctx *tcontext.Context wg sync.WaitGroup // counts conflict/flush jobs in all DML job channels. +======= + syncCtx *tcontext.Context +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) logger log.Logger // for metrics @@ -65,6 +69,7 @@ func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { chanSize /= 2 } dmlWorker := &DMLWorker{ +<<<<<<< HEAD batch: syncer.cfg.Batch, workerCount: syncer.cfg.WorkerCount, chanSize: chanSize, @@ -81,6 +86,24 @@ func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { toDBConns: syncer.toDBConns, inCh: inCh, flushCh: make(chan *job), +======= + batch: syncer.cfg.Batch, + workerCount: syncer.cfg.WorkerCount, + chanSize: chanSize, + multipleRows: syncer.cfg.MultipleRows, + task: syncer.cfg.Name, + source: syncer.cfg.SourceID, + worker: syncer.cfg.WorkerName, + logger: syncer.tctx.Logger.WithFields(zap.String("component", "dml_worker")), + successFunc: syncer.successFunc, + fatalFunc: syncer.fatalFunc, + lagFunc: syncer.updateReplicationJobTS, + updateJobMetricsFunc: syncer.updateJobMetrics, + syncCtx: syncer.syncCtx, // this ctx can be used to cancel all the workers + toDBConns: syncer.toDBConns, + inCh: inCh, + flushCh: make(chan *job), +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) } go func() { @@ -114,7 +137,6 @@ func (w *DMLWorker) run() { for i := 0; i < w.workerCount; i++ { queueBucketMapping[i] = queueBucketName(i) } - for j := range w.inCh { metrics.QueueSizeGauge.WithLabelValues(w.task, "dml_worker_input", w.source).Set(float64(len(w.inCh))) if j.tp == flush || j.tp == conflict { @@ -230,7 +252,7 @@ func (w *DMLWorker) executeBatchJobs(queueID int, jobs []*job) { time.Sleep(time.Duration(t) * time.Second) }) // use background context to execute sqls as much as possible - ctx, cancel := w.tctx.WithTimeout(maxDMLExecutionDuration) + ctx, cancel := w.syncCtx.WithTimeout(maxDMLExecutionDuration) defer cancel() affect, err = db.ExecuteSQL(ctx, queries, args...) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 5a360875123..d83cffe300c 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -121,7 +121,17 @@ const ( type Syncer struct { sync.RWMutex - tctx *tcontext.Context + tctx *tcontext.Context // this ctx only used for logger. + + // this ctx derives from a background ctx and was initialized in s.Run, it is used for some background tasks in s.Run + // when this ctx cancelled, syncer will shutdown all background running jobs (except the syncDML and syncDDL) and not wait transaction end. + runCtx *tcontext.Context + runCancel context.CancelFunc + // this ctx only used for syncDML and syncDDL and only cancelled when ungraceful stop. + syncCtx *tcontext.Context + syncCancel context.CancelFunc + // control all goroutines that started in S.Run + runWg sync.WaitGroup cfg *config.SubTaskConfig syncCfg replication.BinlogSyncerConfig @@ -135,8 +145,12 @@ type Syncer struct { streamerController *StreamerController enableRelay bool +<<<<<<< HEAD wg sync.WaitGroup // counts goroutines jobWg sync.WaitGroup // counts ddl/flush job in-flight in s.dmlJobCh and s.ddlJobCh +======= + jobWg sync.WaitGroup // counts ddl/flush/asyncFlush job in-flight in s.dmlJobCh and s.ddlJobCh +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) schemaTracker *schema.Tracker @@ -187,10 +201,16 @@ type Syncer struct { filteredUpdate atomic.Int64 filteredDelete atomic.Int64 +<<<<<<< HEAD done chan struct{} checkpoint CheckPoint onlineDDL onlineddl.OnlinePlugin +======= + checkpoint CheckPoint + checkpointFlushWorker *checkpointFlushWorker + onlineDDL onlineddl.OnlinePlugin +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) // record process error rather than log.Fatal runFatalChan chan *pb.ProcessError @@ -246,9 +266,13 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier syncer.binlogSizeCount.Store(0) syncer.lastCount.Store(0) syncer.count.Store(0) +<<<<<<< HEAD syncer.done = nil syncer.addJobFunc = syncer.addJob syncer.enableRelay = cfg.UseRelay +======= + syncer.handleJobFunc = syncer.handleJob +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) syncer.cli = etcdClient syncer.checkpoint = NewRemoteCheckPoint(syncer.tctx, cfg, syncer.checkpointID()) @@ -272,7 +296,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier } func (s *Syncer) newJobChans() { - s.closeJobChans() chanSize := calculateChanSize(s.cfg.QueueSize, s.cfg.WorkerCount, s.cfg.Compact) s.dmlJobCh = make(chan *job, chanSize) s.ddlJobCh = make(chan *job, s.cfg.QueueSize) @@ -420,8 +443,11 @@ func (s *Syncer) Init(ctx context.Context) (err error) { if err != nil { return err } +<<<<<<< HEAD rollbackHolder.Add(fr.FuncRollback{Name: "remove-active-realylog", Fn: s.removeActiveRelayLog}) +======= +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) s.reset() return nil } @@ -539,13 +565,24 @@ func (s *Syncer) reset() { } // create new job chans s.newJobChans() + s.checkpointFlushWorker = &checkpointFlushWorker{ + input: make(chan *checkpointFlushTask, 16), + cp: s.checkpoint, + execError: &s.execError, + afterFlushFn: s.afterFlushCheckpoint, + updateJobMetricsFn: s.updateJobMetrics, + } s.execError.Store(nil) s.setErrLocation(nil, nil, false) s.isReplacingErr = false s.waitXIDJob.Store(int64(noWait)) s.isTransactionEnd = true +<<<<<<< HEAD +======= + s.flushSeq = 0 +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) switch s.cfg.ShardMode { case config.ShardPessimistic: // every time start to re-sync from resume, we reset status to make it like a fresh syncing @@ -612,7 +649,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { s.Unlock() return } - s.done = make(chan struct{}) s.Unlock() runFatalChan := make(chan *pb.ProcessError, s.cfg.WorkerCount+1) @@ -651,8 +687,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { // cancel goroutines created in s.Run cancel() } - s.closeJobChans() // Run returned, all jobs sent, we can close s.jobs - s.wg.Wait() // wait for sync goroutine to return close(runFatalChan) // Run returned, all potential fatal sent to s.runFatalChan wg.Wait() // wait for receive all fatal from s.runFatalChan @@ -674,14 +708,6 @@ func (s *Syncer) Process(ctx context.Context, pr chan pb.ProcessResult) { default: } - // try to rollback checkpoints, if they already flushed, no effect - prePos := s.checkpoint.GlobalPoint() - s.checkpoint.Rollback(s.schemaTracker) - currPos := s.checkpoint.GlobalPoint() - if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { - s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) - } - pr <- pb.ProcessResult{ IsCanceled: isCanceled, Errors: errs, @@ -958,7 +984,7 @@ func (s *Syncer) addJob(job *job) error { s.isTransactionEnd = false failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() { s.tctx.L().Info("receive dml job", zap.Any("dml job", job)) - time.Sleep(100 * time.Millisecond) + time.Sleep(500 * time.Millisecond) }) } @@ -1076,6 +1102,69 @@ func (s *Syncer) flushCheckPoints() error { return nil } +<<<<<<< HEAD +======= + snapshotInfo, exceptTables, shardMetaSQLs, shardMetaArgs := s.createCheckpointSnapshot(true) + + if snapshotInfo == nil { + s.tctx.L().Info("checkpoint has no change, skip sync flush checkpoint") + return nil + } + + syncFlushErrCh := make(chan error, 1) + + task := &checkpointFlushTask{ + snapshotInfo: snapshotInfo, + exceptTables: exceptTables, + shardMetaSQLs: shardMetaSQLs, + shardMetaArgs: shardMetaArgs, + asyncflushJob: nil, + syncFlushErrCh: syncFlushErrCh, + } + s.checkpointFlushWorker.Add(task) + + return <-syncFlushErrCh +} + +// flushCheckPointsAsync asynchronous flushes checkpoint. +func (s *Syncer) flushCheckPointsAsync(asyncFlushJob *job) { + err := s.execError.Load() + // TODO: for now, if any error occurred (including user canceled), checkpoint won't be updated. But if we have put + // optimistic shard info, DM-master may resolved the optimistic lock and let other worker execute DDL. So after this + // worker resume, it can not execute the DML/DDL in old binlog because of downstream table structure mismatching. + // We should find a way to (compensating) implement a transaction containing interaction with both etcd and SQL. + if err != nil && (terror.ErrDBExecuteFailed.Equal(err) || terror.ErrDBUnExpect.Equal(err)) { + s.tctx.L().Warn("error detected when executing SQL job, skip async flush checkpoints", + zap.Stringer("checkpoint", s.checkpoint), + zap.Error(err)) + return + } + + snapshotInfo, exceptTables, shardMetaSQLs, shardMetaArgs := s.createCheckpointSnapshot(false) + + if snapshotInfo == nil { + s.tctx.L().Info("checkpoint has no change, skip async flush checkpoint", zap.Int64("job seq", asyncFlushJob.flushSeq)) + return + } + + task := &checkpointFlushTask{ + snapshotInfo: snapshotInfo, + exceptTables: exceptTables, + shardMetaSQLs: shardMetaSQLs, + shardMetaArgs: shardMetaArgs, + asyncflushJob: asyncFlushJob, + syncFlushErrCh: nil, + } + s.checkpointFlushWorker.Add(task) +} + +func (s *Syncer) createCheckpointSnapshot(isSyncFlush bool) (*SnapshotInfo, []*filter.Table, []string, [][]interface{}) { + snapshotInfo := s.checkpoint.Snapshot(isSyncFlush) + if snapshotInfo == nil { + return nil, nil, nil, nil + } + +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) var ( exceptTableIDs map[string]bool exceptTables []*filter.Table @@ -1125,8 +1214,8 @@ func (s *Syncer) flushCheckPoints() error { } // DDL synced one by one, so we only need to process one DDL at a time. -func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { - defer s.wg.Done() +func (s *Syncer) syncDDL(queueBucket string, db *dbconn.DBConn, ddlJobChan chan *job) { + defer s.runWg.Done() var err error for { @@ -1150,12 +1239,12 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. shardPessimistOp = s.pessimist.PendingOperation() if shardPessimistOp != nil && !shardPessimistOp.Exec { ignore = true - tctx.L().Info("ignore shard DDLs in pessimistic shard mode", zap.Strings("ddls", ddlJob.ddls)) + s.tctx.L().Info("ignore shard DDLs in pessimistic shard mode", zap.Strings("ddls", ddlJob.ddls)) } case config.ShardOptimistic: if len(ddlJob.ddls) == 0 { ignore = true - tctx.L().Info("ignore shard DDLs in optimistic mode", zap.Stringer("info", s.optimist.PendingInfo())) + s.tctx.L().Info("ignore shard DDLs in optimistic mode", zap.Stringer("info", s.optimist.PendingInfo())) } } @@ -1167,9 +1256,9 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. if !ignore { var affected int - affected, err = db.ExecuteSQLWithIgnore(tctx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls) + affected, err = db.ExecuteSQLWithIgnore(s.syncCtx, errorutil.IsIgnorableMySQLDDLError, ddlJob.ddls) if err != nil { - err = s.handleSpecialDDLError(tctx, err, ddlJob.ddls, affected, db) + err = s.handleSpecialDDLError(s.syncCtx, err, ddlJob.ddls, affected, db) err = terror.WithScope(err, terror.ScopeDownstream) } } @@ -1203,7 +1292,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. switch { case shardInfo == nil: // no need to do the shard DDL handle for `CREATE DATABASE/TABLE` now. - tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) + s.tctx.L().Warn("skip shard DDL handle in pessimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) case shardPessimistOp == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) default: @@ -1216,7 +1305,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *dbconn. // no need to do the shard DDL handle for `DROP DATABASE/TABLE` now. // but for `CREATE DATABASE` and `ALTER DATABASE` we execute it to the downstream directly without `shardInfo`. if ignore { // actually ignored. - tctx.L().Warn("skip shard DDL handle in optimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) + s.tctx.L().Warn("skip shard DDL handle in optimistic shard mode", zap.Strings("ddl", ddlJob.ddls)) } case s.optimist.PendingOperation() == nil: err = terror.ErrWorkerDDLLockOpNotFound.Generate(shardInfo) @@ -1277,7 +1366,7 @@ func (s *Syncer) fatalFunc(job *job, err error) { // DML synced with causality. func (s *Syncer) syncDML() { - defer s.wg.Done() + defer s.runWg.Done() dmlJobCh := s.dmlJobCh if s.cfg.Compact { @@ -1291,51 +1380,140 @@ func (s *Syncer) syncDML() { } } -// Run starts running for sync, we should guarantee it can rerun when paused. -func (s *Syncer) Run(ctx context.Context) (err error) { - runCtx, runCancel := context.WithCancel(context.Background()) - defer runCancel() - tctx := s.tctx.WithContext(runCtx) +func (s *Syncer) waitBeforeRunExit(ctx context.Context) { + defer s.runWg.Done() + failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() { + s.tctx.L().Info("incr maxPauseOrStopWaitTime time ") + maxPauseOrStopWaitTime = time.Minute * 10 + }) - defer func() { - if s.done != nil { - close(s.done) + select { + case <-ctx.Done(): // hijack the root context from s.Run to wait for the transaction to end. + s.tctx.L().Info("received subtask's done, try graceful stop") + s.waitTransactionLock.Lock() + if s.isTransactionEnd { + s.waitXIDJob.Store(int64(waitComplete)) + s.waitTransactionLock.Unlock() + s.tctx.L().Info("the last job is transaction end, done directly") + s.runCancel() + return } - }() + s.waitXIDJob.Store(int64(waiting)) + s.waitTransactionLock.Unlock() + select { + case <-s.runCtx.Ctx.Done(): + s.tctx.L().Info("syncer run exit so runCtx done") + case <-time.After(maxPauseOrStopWaitTime): + // TODO: maxPauseOrStopWaitTime should also count the time of waiting waitTransactionLock + s.tctx.L().Info("wait transaction end timeout, exit now") + s.runCancel() + } + case <-s.runCtx.Ctx.Done(): // when no graceful stop, run ctx will canceled first. + s.tctx.L().Info("received ungraceful exit ctx, exit now") + } +} - go func() { - <-ctx.Done() +func (s *Syncer) updateTSOffsetCronJob(ctx context.Context) { + defer s.runWg.Done() + // temporarily hard code there. if this metrics works well add this to config file. + ticker := time.NewTicker(time.Minute * 10) + defer ticker.Stop() + for { select { - case <-runCtx.Done(): - default: - tctx.L().Info("received subtask's done") - - s.waitTransactionLock.Lock() - if s.isTransactionEnd { - s.waitXIDJob.Store(int64(waitComplete)) - tctx.L().Info("the last job is transaction end, done directly") - runCancel() - s.waitTransactionLock.Unlock() - return + case <-ticker.C: + if utErr := s.updateTSOffset(ctx); utErr != nil { + s.tctx.L().Error("get server unix ts err", zap.Error(utErr)) } - s.waitXIDJob.Store(int64(waiting)) - s.waitTransactionLock.Unlock() + case <-ctx.Done(): + return + } + } +} - select { - case <-runCtx.Done(): - tctx.L().Info("received syncer's done") - case <-time.After(maxPauseOrStopWaitTime): - tctx.L().Info("wait transaction end timeout") - runCancel() - } +func (s *Syncer) updateLagCronJob(ctx context.Context) { + defer s.runWg.Done() + // temporarily hard code there. if this metrics works well add this to config file. + ticker := time.NewTicker(time.Millisecond * 100) + defer ticker.Stop() + for { + select { + case <-ticker.C: + s.updateReplicationLagMetric() + case <-ctx.Done(): + return } + } +} + +func (s *Syncer) updateTSOffset(ctx context.Context) error { + t1 := time.Now() + ts, tsErr := s.fromDB.GetServerUnixTS(ctx) + rtt := time.Since(t1).Seconds() + if tsErr == nil { + s.tsOffset.Store(time.Now().Unix() - ts - int64(rtt/2)) + } + return tsErr +} + +// Run starts running for sync, we should guarantee it can rerun when paused. +func (s *Syncer) Run(ctx context.Context) (err error) { + runCtx, runCancel := context.WithCancel(context.Background()) + s.runCtx, s.runCancel = tcontext.NewContext(runCtx, s.tctx.L()), runCancel + syncCtx, syncCancel := context.WithCancel(context.Background()) + s.syncCtx, s.syncCancel = tcontext.NewContext(syncCtx, s.tctx.L()), syncCancel + defer func() { + s.runCancel() + s.closeJobChans() + s.checkpointFlushWorker.Close() + s.runWg.Wait() + // s.syncCancel won't be called when normal exit, this call just to follow the best practice of use context. + s.syncCancel() + }() + + // we should start this goroutine as soon as possible, because it's the only goroutine that cancel syncer.Run + s.runWg.Add(1) + go func() { + s.waitBeforeRunExit(ctx) }() +<<<<<<< HEAD // some initialization that can't be put in Syncer.Init fresh, err := s.IsFreshTask(runCtx) if err != nil { return err } else if fresh { +======= + // before sync run, we get the ts offset from upstream first + if utErr := s.updateTSOffset(ctx); utErr != nil { + return utErr + } + + // some initialization that can't be put in Syncer.Init + fresh, err := s.IsFreshTask(s.runCtx.Ctx) + if err != nil { + return err + } + + // task command line arguments have the highest priority + // dm-syncer and other usage may not have a etcdCli, so we check it first + skipLoadMeta := false + if s.cli != nil { + s.cliArgs, err = ha.GetTaskCliArgs(s.cli, s.cfg.Name, s.cfg.SourceID) + if err != nil { + s.tctx.L().Error("failed to get task cli args", zap.Error(err)) + } + if s.cliArgs != nil && s.cliArgs.StartTime != "" { + err = s.setGlobalPointByTime(s.runCtx, s.cliArgs.StartTime) + if terror.ErrConfigStartTimeTooLate.Equal(err) { + return err + } + skipLoadMeta = err == nil + } + } + + // some initialization that can't be put in Syncer.Init + if fresh && !skipLoadMeta { +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) // for fresh task, we try to load checkpoints from meta (file or config item) err = s.checkpoint.LoadMeta() if err != nil { @@ -1348,7 +1526,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { delLoadTask bool cleanDumpFile = s.cfg.CleanDumpFile ) - flushCheckpoint, err = s.adjustGlobalPointGTID(tctx) + flushCheckpoint, err = s.adjustGlobalPointGTID(s.runCtx) if err != nil { return err } @@ -1357,69 +1535,65 @@ func (s *Syncer) Run(ctx context.Context) (err error) { flushCheckpoint = true err = s.loadTableStructureFromDump(ctx) if err != nil { - tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) + s.tctx.L().Warn("error happened when load table structure from dump files", zap.Error(err)) cleanDumpFile = false } +<<<<<<< HEAD } else { +======= + if s.cfg.ShardMode == config.ShardOptimistic { + s.flushOptimisticTableInfos(s.runCtx) + } + } + + if s.cfg.Mode == config.ModeIncrement || !fresh { +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) cleanDumpFile = false } + s.runWg.Add(1) + go s.syncDML() + s.runWg.Add(1) + go func() { + defer s.runWg.Done() + // also need to use a different ctx. checkpointFlushWorker worker will be closed in the first defer + s.checkpointFlushWorker.Run(s.tctx) + }() + s.runWg.Add(1) + go s.syncDDL(adminQueueName, s.ddlDBConn, s.ddlJobCh) + s.runWg.Add(1) + go s.updateLagCronJob(s.runCtx.Ctx) + s.runWg.Add(1) + go s.updateTSOffsetCronJob(s.runCtx.Ctx) if flushCheckpoint { if err = s.flushCheckPoints(); err != nil { - tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) + s.tctx.L().Warn("fail to flush checkpoints when starting task", zap.Error(err)) return err } } if delLoadTask { if err = s.delLoadTask(); err != nil { - tctx.L().Warn("error when del load task in etcd", zap.Error(err)) + s.tctx.L().Warn("error when del load task in etcd", zap.Error(err)) } } if cleanDumpFile { - tctx.L().Info("try to remove all dump files") + s.tctx.L().Info("try to remove all dump files") if err = os.RemoveAll(s.cfg.Dir); err != nil { - tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) + s.tctx.L().Warn("error when remove loaded dump folder", zap.String("data folder", s.cfg.Dir), zap.Error(err)) } } failpoint.Inject("AdjustGTIDExit", func() { +<<<<<<< HEAD tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) s.streamerController.Close(tctx) +======= + s.tctx.L().Warn("exit triggered", zap.String("failpoint", "AdjustGTIDExit")) + s.streamerController.Close() +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) utils.OsExit(1) }) - updateTSOffset := func() error { - t1 := time.Now() - ts, tsErr := s.fromDB.GetServerUnixTS(runCtx) - rtt := time.Since(t1).Seconds() - if tsErr == nil { - s.tsOffset.Store(time.Now().Unix() - ts - int64(rtt/2)) - } - return tsErr - } - // before sync run, we get the tsoffset from upstream first - if utErr := updateTSOffset(); utErr != nil { - return utErr - } - // start background task to get/update current ts offset between dm and upstream - s.wg.Add(1) - go func() { - defer s.wg.Done() - // temporarily hard code there. if this metrics works well add this to config file. - updateTicker := time.NewTicker(time.Minute * 10) - defer updateTicker.Stop() - for { - select { - case <-updateTicker.C: - if utErr := updateTSOffset(); utErr != nil { - s.tctx.L().Error("get server unix ts err", zap.Error(utErr)) - } - case <-runCtx.Done(): - return - } - } - }() - // startLocation is the start location for current received event // currentLocation is the end location for current received event (End_log_pos in `show binlog events` for mysql) // lastLocation is the end location for last received (ROTATE / QUERY / XID) event @@ -1431,36 +1605,19 @@ func (s *Syncer) Run(ctx context.Context) (err error) { startLocation = s.checkpoint.GlobalPoint() lastLocation = s.checkpoint.GlobalPoint() ) - tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastLocation)) + s.tctx.L().Info("replicate binlog from checkpoint", zap.Stringer("checkpoint", lastLocation)) if s.streamerController.IsClosed() { +<<<<<<< HEAD err = s.streamerController.Start(tctx, lastLocation) +======= + s.locations.reset(lastLocation) + err = s.streamerController.Start(s.runCtx, lastLocation) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) if err != nil { return terror.Annotate(err, "fail to restart streamer controller") } } - - s.wg.Add(1) - go s.syncDML() - - s.wg.Add(1) - go s.syncDDL(tctx, adminQueueName, s.ddlDBConn, s.ddlJobCh) - - s.wg.Add(1) - go func() { - defer s.wg.Done() - updateLagTicker := time.NewTicker(time.Millisecond * 100) - defer updateLagTicker.Stop() - for { - select { - case <-updateLagTicker.C: - s.updateReplicationLagMetric() - case <-runCtx.Done(): - return - } - } - }() - // syncing progress with sharding DDL group // 1. use the global streamer to sync regular binlog events // 2. sharding DDL synced for some sharding groups @@ -1478,13 +1635,14 @@ func (s *Syncer) Run(ctx context.Context) (err error) { traceSource = fmt.Sprintf("%s.syncer.%s", s.cfg.SourceID, s.cfg.Name) ) + // this is second defer func in syncer.Run so in this time checkpointFlushWorker are still running defer func() { if err1 := recover(); err1 != nil { failpoint.Inject("ExitAfterSaveOnlineDDL", func() { - tctx.L().Info("force panic") + s.tctx.L().Info("force panic") panic("ExitAfterSaveOnlineDDL") }) - tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("stack")) + s.tctx.L().Error("panic log", zap.Reflect("error message", err1), zap.Stack("stack")) err = terror.ErrSyncerUnitPanic.Generate(err1) } @@ -1501,13 +1659,13 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // flush all jobs before exit if err2 = s.flushJobs(); err2 != nil { - tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) + s.tctx.L().Warn("failed to flush jobs when exit task", zap.Error(err2)) } // if any execute error, flush safemode exit point if err2 = s.execError.Load(); err2 != nil && (terror.ErrDBExecuteFailed.Equal(err2) || terror.ErrDBUnExpect.Equal(err2)) { if err2 = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err2 != nil { - tctx.L().Warn("failed to flush safe mode checkpoints when exit task", zap.Error(err2)) + s.tctx.L().Warn("failed to flush safe mode checkpoints when exit task", zap.Error(err2)) } } }() @@ -1526,7 +1684,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // if we start syncer at an early position, database must bear a period of inconsistent state, // it's eventual consistency. s.safeMode = sm.NewSafeMode() - s.enableSafeModeInitializationPhase(tctx) + s.enableSafeModeInitializationPhase(s.runCtx) closeShardingResync := func() error { if shardingReSync == nil { @@ -1549,7 +1707,12 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // if suffix>0, we are replacing error s.isReplacingErr = currentLocation.Suffix != 0 +<<<<<<< HEAD err3 := s.streamerController.RedirectStreamer(tctx, currentLocation) +======= + s.locations.reset(currentLocation) + err3 := s.streamerController.RedirectStreamer(s.tctx, currentLocation) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) if err3 != nil { return err3 } @@ -1558,8 +1721,44 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return nil } +<<<<<<< HEAD inFinerRetry := false // in release branch, we only use eventIndex to test a bug +======= + maybeSkipNRowsEvent := func(n int) error { + if n == 0 { + return nil + } + + for i := 0; i < n; { + e, err1 := s.getEvent(s.runCtx, currentLocation) + if err1 != nil { + return err + } + switch e.Event.(type) { + case *replication.GTIDEvent, *replication.MariadbGTIDEvent: + gtidStr, err2 := event.GetGTIDStr(e) + if err2 != nil { + return err2 + } + if currentGTID != gtidStr { + s.tctx.L().Error("after recover GTID-based replication, the first GTID is not same as broken one. May meet duplicate entry or corrupt data if table has no PK/UK.", + zap.String("last GTID", currentGTID), + zap.String("GTID after reset", gtidStr), + ) + return nil + } + case *replication.RowsEvent: + i++ + } + } + s.tctx.L().Info("discard event already consumed", zap.Int("count", n), + zap.Any("cur_loc", currentLocation)) + return nil + } + + // eventIndex is the rows event index in this transaction, it's used to avoiding read duplicate event in gtid mode +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) eventIndex := 0 for { if s.execError.Load() != nil { @@ -1579,14 +1778,20 @@ func (s *Syncer) Run(ctx context.Context) (err error) { currentLocation = shardingReSync.currLocation // if suffix>0, we are replacing error +<<<<<<< HEAD s.isReplacingErr = currentLocation.Suffix != 0 err = s.streamerController.RedirectStreamer(tctx, shardingReSync.currLocation) +======= + s.isReplacingOrInjectingErr = currentLocation.Suffix != 0 + s.locations.reset(shardingReSync.currLocation) + err = s.streamerController.RedirectStreamer(s.runCtx, shardingReSync.currLocation) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) if err != nil { return err } failpoint.Inject("ReSyncExit", func() { - tctx.L().Warn("exit triggered", zap.String("failpoint", "ReSyncExit")) + s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ReSyncExit")) utils.OsExit(1) }) } @@ -1594,7 +1799,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var e *replication.BinlogEvent startTime := time.Now() - e, err = s.getEvent(tctx, currentLocation) + e, err = s.getEvent(s.runCtx, currentLocation) failpoint.Inject("SafeModeExit", func(val failpoint.Value) { if intVal, ok := val.(int); ok && intVal == 1 { @@ -1612,29 +1817,40 @@ func (s *Syncer) Run(ctx context.Context) (err error) { }) switch { case err == context.Canceled: - tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) + s.tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last location", lastLocation)) return nil case err == context.DeadlineExceeded: - tctx.L().Info("deadline exceeded when fetching binlog event") + s.tctx.L().Info("deadline exceeded when fetching binlog event") continue case isDuplicateServerIDError(err): // if the server id is already used, need to use a new server id - tctx.L().Info("server id is already used by another slave, will change to a new server id and get event again") - err1 := s.streamerController.UpdateServerIDAndResetReplication(tctx, lastLocation) + s.tctx.L().Info("server id is already used by another slave, will change to a new server id and get event again") + err1 := s.streamerController.UpdateServerIDAndResetReplication(s.tctx, lastLocation) if err1 != nil { return err1 } continue +<<<<<<< HEAD +======= + case err == relay.ErrorMaybeDuplicateEvent: + s.tctx.L().Warn("read binlog met a truncated file, will skip events that has been consumed") + err = maybeSkipNRowsEvent(eventIndex) + if err == nil { + continue + } + s.tctx.L().Warn("skip duplicate rows event failed", zap.Error(err)) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) } if err != nil { - tctx.L().Error("fail to fetch binlog", log.ShortError(err)) + s.tctx.L().Error("fail to fetch binlog", log.ShortError(err)) if isConnectionRefusedError(err) { return err } if s.streamerController.CanRetry(err) { +<<<<<<< HEAD // lastLocation is the last finished GTID err = s.streamerController.ResetReplicationSyncer(tctx, s.checkpoint.GlobalPoint()) if err != nil { @@ -1642,13 +1858,24 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } log.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) inFinerRetry = true +======= + // GlobalPoint is the last finished transaction location + err = s.streamerController.ResetReplicationSyncer(s.tctx, s.checkpoint.GlobalPoint()) + if err != nil { + return err + } + s.tctx.L().Info("reset replication binlog puller", zap.Any("pos", s.checkpoint.GlobalPoint())) + if err = maybeSkipNRowsEvent(eventIndex); err != nil { + return err + } +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) continue } // try to re-sync in gtid mode if tryReSync && s.cfg.EnableGTID && utils.IsErrBinlogPurged(err) && s.cfg.AutoFixGTID { time.Sleep(retryTimeout) - err = s.reSyncBinlog(*tctx, lastLocation) + err = s.reSyncBinlog(*s.runCtx, lastLocation) if err != nil { return err } @@ -1661,7 +1888,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { failpoint.Inject("IgnoreSomeTypeEvent", func(val failpoint.Value) { if e.Header.EventType.String() == val.(string) { - tctx.L().Debug("IgnoreSomeTypeEvent", zap.Reflect("event", e)) + s.tctx.L().Debug("IgnoreSomeTypeEvent", zap.Reflect("event", e)) failpoint.Continue() } }) @@ -1675,7 +1902,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { metrics.BinlogPosGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(e.Header.LogPos)) index, err := binlog.GetFilenameIndex(lastLocation.Position.Name) if err != nil { - tctx.L().Warn("fail to get index number of binlog file, may because only specify GTID and hasn't saved according binlog position", log.ShortError(err)) + s.tctx.L().Warn("fail to get index number of binlog file, may because only specify GTID and hasn't saved according binlog position", log.ShortError(err)) } else { metrics.BinlogFileGauge.WithLabelValues("syncer", s.cfg.Name, s.cfg.SourceID).Set(float64(index)) } @@ -1684,7 +1911,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { failpoint.Inject("ProcessBinlogSlowDown", nil) - tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) + s.tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header)) // TODO: support all event // we calculate startLocation and endLocation(currentLocation) for Query event here @@ -1727,7 +1954,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { currentLocation = startLocation } else if op == pb.ErrorOp_Skip { ec := eventContext{ - tctx: tctx, + tctx: s.tctx, header: e.Header, startLocation: &startLocation, currentLocation: ¤tLocation, @@ -1736,7 +1963,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { var sourceTbls map[string]map[string]struct{} sourceTbls, err = s.trackOriginDDL(ev, ec) if err != nil { +<<<<<<< HEAD tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", ev.Query)) +======= + s.tctx.L().Warn("failed to track query when handle-error skip", zap.Error(err), zap.ByteString("sql", queryEvent.Query)) +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) } s.saveGlobalPoint(currentLocation) @@ -1750,9 +1981,9 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } err = s.flushJobs() if err != nil { - tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) + s.tctx.L().Warn("failed to flush jobs when handle-error skip", zap.Error(err)) } else { - tctx.L().Info("flush jobs when handle-error skip") + s.tctx.L().Info("flush jobs when handle-error skip") } } // skip the current event @@ -1763,10 +1994,20 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // also redirect stream to next event if currentLocation.Suffix > 0 && e.Header.EventSize > 0 { currentLocation.Suffix = 0 +<<<<<<< HEAD s.isReplacingErr = false err = s.streamerController.RedirectStreamer(tctx, currentLocation) if err != nil { return err +======= + s.isReplacingOrInjectingErr = false + s.locations.reset(currentLocation) + if !s.errOperatorHolder.IsInject(startLocation) { + // replace operator need redirect to currentLocation + if err = s.streamerController.RedirectStreamer(s.runCtx, currentLocation); err != nil { + return err + } +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) } } } @@ -1783,17 +2024,17 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // 2. push forward and replicate some sqls after safeModeExitPoint to downstream // 3. quit because of network error, fail to flush global checkpoint and new safeModeExitPoint to downstream // 4. restart again, quit safe mode at safeModeExitPoint, but some sqls after this location have already been replicated to the downstream - if err = s.checkpoint.FlushSafeModeExitPoint(s.tctx); err != nil { + if err = s.checkpoint.FlushSafeModeExitPoint(s.runCtx); err != nil { return err } - if err = s.safeMode.Add(tctx, -1); err != nil { + if err = s.safeMode.Add(s.runCtx, -1); err != nil { return err } } } ec := eventContext{ - tctx: tctx, + tctx: s.runCtx, header: e.Header, startLocation: &startLocation, currentLocation: ¤tLocation, @@ -1836,7 +2077,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // only need compare binlog position? lastLocation = shardingReSync.currLocation if binlog.CompareLocation(shardingReSync.currLocation, shardingReSync.latestLocation, s.cfg.EnableGTID) >= 0 { - tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync)) + s.tctx.L().Info("re-replicate shard group was completed", zap.String("event", "XID"), zap.Stringer("re-shard", shardingReSync)) err = closeShardingResync() if err != nil { return terror.Annotatef(err, "shard group current location %s", shardingReSync.currLocation) @@ -1851,7 +2092,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { return terror.Annotatef(err, "fail to record GTID %v", ev.GSet) } - tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastLocation), log.WrapStringerField("location", currentLocation)) + s.tctx.L().Debug("", zap.String("event", "XID"), zap.Stringer("last location", lastLocation), log.WrapStringerField("location", currentLocation)) lastLocation.Position.Pos = e.Header.LogPos // update lastPos err = lastLocation.SetGTID(ev.GSet) if err != nil { @@ -1864,7 +2105,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { if e.Header.EventType == replication.HEARTBEAT_EVENT { // flush checkpoint even if there are no real binlog events if s.checkpoint.CheckGlobalPoint() { - tctx.L().Info("meet heartbeat event and then flush jobs") + s.tctx.L().Info("meet heartbeat event and then flush jobs") err2 = s.flushJobs() } } @@ -1875,6 +2116,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) { } } if waitXIDStatus(s.waitXIDJob.Load()) == waitComplete { + // already wait until XID event, we can stop sync now, s.runcancel will be called in defer func return nil } } @@ -2209,7 +2451,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o qec.p, err = event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + s.tctx.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) } stmt, err := parseOneStmt(qec) @@ -2811,7 +3053,7 @@ func (s *Syncer) trackOriginDDL(ev *replication.QueryEvent, ec eventContext) (ma } qec.p, err = event.GetParserForStatusVars(ev.StatusVars) if err != nil { - log.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) + s.tctx.L().Warn("found error when get sql_mode from binlog status_vars", zap.Error(err)) } stmt, err := parseOneStmt(qec) if err != nil { @@ -3068,45 +3310,47 @@ func (s *Syncer) Close() { if s.isClosed() { return } - s.stopSync() s.closeDBs() - s.checkpoint.Close() - if err := s.schemaTracker.Close(); err != nil { s.tctx.L().Error("fail to close schema tracker", log.ShortError(err)) } - if s.sgk != nil { s.sgk.Close() } - s.closeOnlineDDL() - // when closing syncer by `stop-task`, remove active relay log from hub s.removeActiveRelayLog() - metrics.RemoveLabelValuesWithTaskInMetrics(s.cfg.Name) - + s.runWg.Wait() s.closed.Store(true) } -// stopSync stops syncing, now it used by Close and Pause -// maybe we can refine the workflow more clear. -func (s *Syncer) stopSync() { - if s.done != nil { - <-s.done // wait Run to return - } - s.closeJobChans() - s.wg.Wait() // wait job workers to return +// Kill kill syncer without graceful. +func (s *Syncer) Kill() { + s.tctx.L().Warn("kill syncer without graceful") + s.runCancel() + s.syncCancel() + s.Close() +} +// stopSync stops stream and rollbacks checkpoint now it used by Close() and Pause(). +func (s *Syncer) stopSync() { // before re-write workflow for s.syncer, simply close it // when resuming, re-create s.syncer if s.streamerController != nil { s.streamerController.Close(s.tctx) } + + // try to rollback checkpoints, if they already flushed, no effect, this operation should call before close schemaTracker + prePos := s.checkpoint.GlobalPoint() + s.checkpoint.Rollback(s.schemaTracker) + currPos := s.checkpoint.GlobalPoint() + if binlog.CompareLocation(prePos, currPos, s.cfg.EnableGTID) != 0 { + s.tctx.L().Warn("something wrong with rollback global checkpoint", zap.Stringer("previous position", prePos), zap.Stringer("current position", currPos)) + } } func (s *Syncer) closeOnlineDDL() { diff --git a/dm/syncer/syncer_test.go b/dm/syncer/syncer_test.go index b924a935546..b0a0a2eba88 100644 --- a/dm/syncer/syncer_test.go +++ b/dm/syncer/syncer_test.go @@ -41,6 +41,11 @@ import ( "github.com/pingcap/tiflow/dm/pkg/utils" "github.com/pingcap/tiflow/dm/syncer/dbconn" "github.com/pingcap/tiflow/pkg/errorutil" +<<<<<<< HEAD +======= + "github.com/pingcap/tiflow/pkg/sqlmodel" + "github.com/stretchr/testify/require" +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) sqlmock "github.com/DATA-DOG/go-sqlmock" "github.com/go-mysql-org/go-mysql/mysql" @@ -97,12 +102,16 @@ type testSyncerSuite struct { } type MockStreamer struct { - events []*replication.BinlogEvent - idx uint32 + events []*replication.BinlogEvent + idx uint32 + pending bool } func (m *MockStreamer) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if int(m.idx) >= len(m.events) { + if m.pending { + <-ctx.Done() + } return nil, context.Canceled } e := m.events[m.idx] @@ -116,7 +125,7 @@ type MockStreamProducer struct { func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (streamer2.Streamer, error) { if location.Position.Pos == 4 { - return &MockStreamer{mp.events, 0}, nil + return &MockStreamer{mp.events, 0, false}, nil } bytesLen := 0 idx := uint32(0) @@ -127,10 +136,11 @@ func (mp *MockStreamProducer) generateStreamer(location binlog.Location) (stream break } } - return &MockStreamer{mp.events, idx}, nil + return &MockStreamer{mp.events, idx, false}, nil } func (s *testSyncerSuite) SetUpSuite(c *C) { +<<<<<<< HEAD loaderDir, err := os.MkdirTemp("", "loader") c.Assert(err, IsNil) loaderCfg := config.LoaderConfig{ @@ -152,6 +162,9 @@ func (s *testSyncerSuite) SetUpSuite(c *C) { s.cfg.To.Adjust() s.cfg.UseRelay = false +======= + s.cfg = genDefaultSubTaskConfig4Test() +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) s.resetEventsGenerator(c) c.Assert(log.InitLogger(&log.Config{}), IsNil) } @@ -230,7 +243,7 @@ func (s *testSyncerSuite) TearDownSuite(c *C) { os.RemoveAll(s.cfg.Dir) } -func (s *testSyncerSuite) mockGetServerUnixTS(mock sqlmock.Sqlmock) { +func mockGetServerUnixTS(mock sqlmock.Sqlmock) { ts := time.Now().Unix() rows := sqlmock.NewRows([]string{"UNIX_TIMESTAMP()"}).AddRow(strconv.FormatInt(ts, 10)) mock.ExpectQuery("SELECT UNIX_TIMESTAMP()").WillReturnRows(rows) @@ -731,14 +744,14 @@ func (s *testSyncerSuite) TestcheckpointID(c *C) { func (s *testSyncerSuite) TestRun(c *C) { // 1. run syncer with column mapping - // 2. execute some sqls which will trigger casuality + // 2. execute some sqls which will trigger causality // 3. check the generated jobs // 4. update config, add route rules, and update syncer - // 5. execute somes sqls and then check jobs generated + // 5. execute some sqls and then check jobs generated db, mock, err := sqlmock.New() c.Assert(err, IsNil) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) checkPointDB, checkPointMock, err := sqlmock.New() @@ -792,7 +805,7 @@ func (s *testSyncerSuite) TestRun(c *C) { mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_1`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_1", "create table t_1(id int primary key, name varchar(24), KEY `index1` (`name`))")) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) mock.ExpectQuery("SHOW CREATE TABLE " + "`test_1`.`t_2`").WillReturnRows( sqlmock.NewRows([]string{"Table", "Create Table"}). AddRow("t_2", "create table t_2(id int primary key, name varchar(24))")) @@ -1015,7 +1028,7 @@ func (s *testSyncerSuite) TestRun(c *C) { func (s *testSyncerSuite) TestExitSafeModeByConfig(c *C) { db, mock, err := sqlmock.New() c.Assert(err, IsNil) - s.mockGetServerUnixTS(mock) + mockGetServerUnixTS(mock) dbConn, err := db.Conn(context.Background()) c.Assert(err, IsNil) @@ -1660,3 +1673,84 @@ func (s *testSyncerSuite) TestExecuteSQLSWithIgnore(c *C) { c.Assert(mock.ExpectationsWereMet(), IsNil) } + +func genDefaultSubTaskConfig4Test() *config.SubTaskConfig { + loaderDir, err := os.MkdirTemp("", "loader") + if err != nil { + panic(err) // no happen + } + + loaderCfg := config.LoaderConfig{ + Dir: loaderDir, + } + cfg := &config.SubTaskConfig{ + From: config.GetDBConfigForTest(), + To: config.GetDBConfigForTest(), + ServerID: 101, + MetaSchema: "test", + Name: "syncer_ut", + ShadowTableRules: []string{config.DefaultShadowTableRules}, + TrashTableRules: []string{config.DefaultTrashTableRules}, + Mode: config.ModeIncrement, + Flavor: "mysql", + LoaderConfig: loaderCfg, + UseRelay: false, + } + cfg.Experimental.AsyncCheckpointFlush = true + cfg.From.Adjust() + cfg.To.Adjust() + return cfg +} + +func TestWaitBeforeRunExit(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cfg := genDefaultSubTaskConfig4Test() + cfg.WorkerCount = 0 + syncer := NewSyncer(cfg, nil, nil) + + db, mock, err := sqlmock.New() + require.NoError(t, err) + mockGetServerUnixTS(mock) + + syncer.fromDB = &dbconn.UpStreamConn{BaseDB: conn.NewBaseDB(db)} + syncer.reset() + require.NoError(t, syncer.genRouter()) + + mockStreamerProducer := &MockStreamProducer{} + mockStreamer, err := mockStreamerProducer.generateStreamer(binlog.NewLocation("")) + require.NoError(t, err) + // let getEvent pending until ctx.Done() + mockStreamer.(*MockStreamer).pending = true + syncer.streamerController = &StreamerController{ + streamerProducer: mockStreamerProducer, streamer: mockStreamer, closed: false, + } + + wg := &sync.WaitGroup{} + errCh := make(chan error, 1) + wg.Add(1) + go func() { + defer wg.Done() + errCh <- syncer.Run(ctx) + }() + time.Sleep(time.Second) // wait s.Run start + + // test s.Run will not exit unit caller cancel ctx or call s.runCancel + cancel() // this will make s.Run exit + wg.Wait() + require.Nil(t, <-errCh) + require.Equal(t, 0, len(errCh)) + require.NotNil(t, syncer.runCtx) + require.NotNil(t, syncer.runCancel) + + // test syncer wait time not more than maxPauseOrStopWaitTime + oldMaxPauseOrStopWaitTime := maxPauseOrStopWaitTime + maxPauseOrStopWaitTime = time.Second + ctx2, cancel := context.WithCancel(context.Background()) + cancel() + runCtx, runCancel := context.WithCancel(context.Background()) + syncer.runCtx, syncer.runCancel = tcontext.NewContext(runCtx, syncer.tctx.L()), runCancel + syncer.runWg.Add(1) + syncer.waitBeforeRunExit(ctx2) + require.Equal(t, context.Canceled, syncer.runCtx.Ctx.Err()) + maxPauseOrStopWaitTime = oldMaxPauseOrStopWaitTime +} diff --git a/dm/tests/_utils/ha_cases_lib.sh b/dm/tests/_utils/ha_cases_lib.sh index d9fb774a90b..6a0a2896ed8 100644 --- a/dm/tests/_utils/ha_cases_lib.sh +++ b/dm/tests/_utils/ha_cases_lib.sh @@ -145,6 +145,7 @@ function start_multi_tasks_cluster() { } function cleanup() { + cleanup_process $* cleanup_data $ha_test cleanup_data $ha_test2 echo "clean source table" @@ -154,7 +155,6 @@ function cleanup() { $(mysql -h127.0.0.1 -p123456 -P${i} -uroot -e "drop database if exists ha_test2;") sleep 1 done - cleanup_process $* } function isolate_master() { diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index c3d87bcf207..340d2527a6c 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -141,8 +141,8 @@ function test_query_timeout() { run_sql_tidb 'SHOW PROCESSLIST;' check_rows_equal 1 - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_query_timeout >>>>>>" @@ -207,8 +207,8 @@ function test_stop_task_before_checkpoint() { "stop-task test" \ "\"result\": true" 3 - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_stop_task_before_checkpoint >>>>>>" @@ -267,8 +267,8 @@ function test_fail_job_between_event() { "\"result\": true" 3 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - cleanup_data all_mode cleanup_process + cleanup_data all_mode export GO_FAILPOINTS='' echo "[$(date)] <<<<<< finish test_fail_job_between_event >>>>>>" @@ -314,8 +314,8 @@ function test_expression_filter() { "query-status test" \ "\"result\": true" 3 - cleanup_data all_mode cleanup_process + cleanup_data all_mode echo "[$(date)] <<<<<< finish test_expression_filter >>>>>>" } diff --git a/dm/tests/checkpoint_transaction/conf/dm-worker1.toml b/dm/tests/checkpoint_transaction/conf/dm-worker1.toml index 7a72ea72bf8..3d99321d632 100644 --- a/dm/tests/checkpoint_transaction/conf/dm-worker1.toml +++ b/dm/tests/checkpoint_transaction/conf/dm-worker1.toml @@ -1,2 +1,4 @@ -name = "worker1" join = "127.0.0.1:8261" +keepalive-ttl = 1 +name = "worker1" +worker-addr = "127.0.0.1:8262" diff --git a/dm/tests/checkpoint_transaction/run.sh b/dm/tests/checkpoint_transaction/run.sh index 5771945384a..e3834469245 100755 --- a/dm/tests/checkpoint_transaction/run.sh +++ b/dm/tests/checkpoint_transaction/run.sh @@ -6,6 +6,31 @@ cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) source $cur/../_utils/test_prepare WORK_DIR=$TEST_DIR/$TEST_NAME +function check_worker_ungraceful_stop_with_retry() { + for ((k = 0; k < 10; k++)); do + sleep 1 + echo "start check_worker_ungraceful_stop_with_retry times: $k" + + num=$(grep "kill unit" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + num=$(grep "kill syncer without graceful" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + num=$(grep "received ungraceful exit ctx, exit now" $WORK_DIR/worker1/log/dm-worker.log | wc -l) + if [ $num -lt 1 ]; then + continue + fi + echo "check_worker_ungraceful_stop_with_retry success after retry: $k" + return 0 + done + + echo "check_worker_ungraceful_stop_with_retry failed after retry" + exit 1 +} + function run() { export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/checkCheckpointInMiddleOfTransaction=return" @@ -28,6 +53,33 @@ function run() { # check diff check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + # test ungraceful stop, worker will not wait transaction finish + run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + sleep 2 + # kill dm-master 1 to make worker lost keep alive while a transaction is not finished + echo "kill dm-master1" + kill_dm_master + check_master_port_offline 1 + sleep 1 # wait worker lost keep alive ttl is 1 second + + # check dm-worker will exit quickly without waiting for the transaction to finish + check_worker_ungraceful_stop_with_retry + + # test data in tidb less than source + dataCountSource=$(mysql -uroot -h$MYSQL_HOST1 -P$MYSQL_PORT1 -p$MYSQL_PASSWORD1 -se "select count(1) from checkpoint_transaction.t1") + dataCountInTiDB=$(mysql -uroot -h127.0.0.1 -P4000 -se "select count(1) from checkpoint_transaction.t1") + echo "after ungraceful exit data in source count: $dataCountSource data in tidb count: $dataCountInTiDB" + if [ "$dataCountInTiDB" -lt "$dataCountSource" ]; then + echo "ungraceful stop test success" + else + echo "ungraceful stop test failed" + exit 1 + fi + + # start dm-master again task will be resume, and data will be synced + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 # wait transaction start # you can see why sleep in https://github.com/pingcap/dm/pull/1928#issuecomment-895820239 @@ -41,8 +93,13 @@ function run() { "\"stage\": \"Paused\"" 1 # check the point is the middle of checkpoint num=$(grep "not receive xid job yet" $WORK_DIR/worker1/log/dm-worker.log | wc -l) - [[ $num -gt 0 ]] - sed -e '/not receive xid job yet/d' $WORK_DIR/worker1/log/dm-worker.log >$WORK_DIR/worker1/log/dm-worker.log + + if [ "$num" -gt 0 ]; then + echo "graceful pause test success" + else + echo "graceful pause test failed" + exit 1 + fi echo "start check pause diff" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml @@ -55,6 +112,16 @@ function run() { "query-status test" \ "\"stage\": \"Running\"" 1 + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + rm -rf $WORK_DIR/worker1 + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + "\"stage\": \"Running\"" 1 + run_sql_file $cur/data/db1.increment2.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 # wait transaction start # you can see why sleep in https://github.com/pingcap/dm/pull/1928#issuecomment-895820239 @@ -65,7 +132,12 @@ function run() { "\"result\": true" 2 # check the point is the middle of checkpoint num=$(grep "not receive xid job yet" $WORK_DIR/worker1/log/dm-worker.log | wc -l) - [[ $num -gt 0 ]] + if [ "$num" -gt 0 ]; then + echo "graceful stop test success" + else + echo "graceful stop test failed" + exit 1 + fi echo "start check stop diff" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml diff --git a/dm/tests/expression_filter/run.sh b/dm/tests/expression_filter/run.sh index 01b559ccba9..faa7d1d43db 100755 --- a/dm/tests/expression_filter/run.sh +++ b/dm/tests/expression_filter/run.sh @@ -53,8 +53,8 @@ function complex_behaviour() { update_num=$(grep -o '"number of filtered update"=[0-9]\+' $WORK_DIR/worker1/log/dm-worker.log | grep -o '[0-9]\+' | awk '{n += $1}; END{print n}') [ $update_num -eq 3 ] - cleanup_data expr_filter cleanup_process $* + cleanup_data expr_filter } function run() { diff --git a/dm/tests/full_mode/run.sh b/dm/tests/full_mode/run.sh index afb89800720..657c9ac3689 100755 --- a/dm/tests/full_mode/run.sh +++ b/dm/tests/full_mode/run.sh @@ -59,8 +59,8 @@ function fail_acquire_global_lock() { "\"stage\": \"Paused\"" 3 \ "you need (at least one of) the RELOAD privilege(s) for this operation" 2 - cleanup_data full_mode cleanup_process $* + cleanup_data full_mode } function escape_schema() { @@ -117,8 +117,8 @@ function escape_schema() { check_metric $WORKER1_PORT 'dumpling_dump_finished_tables' 3 0 3 check_metric $WORKER2_PORT 'dumpling_dump_finished_tables' 3 0 3 - cleanup_data full/mode cleanup_process $* + cleanup_data full/mode } function empty_data() { @@ -141,8 +141,8 @@ function empty_data() { check_log_contains $WORK_DIR/worker1/log/dm-worker.log "progress=\"100.00 %\"" check_log_contains $WORK_DIR/worker2/log/dm-worker.log "progress=\"100.00 %\"" - cleanup_data full_mode cleanup_process $* + cleanup_data full_mode } function run() { diff --git a/dm/tests/load_interrupt/run.sh b/dm/tests/load_interrupt/run.sh index 79f8c79c44c..e0e96aebb9c 100755 --- a/dm/tests/load_interrupt/run.sh +++ b/dm/tests/load_interrupt/run.sh @@ -60,8 +60,8 @@ function test_save_checkpoint_failed() { ls $WORK_DIR/worker1/dumped_data.test echo "test_save_checkpoint_failed SUCCESS!" - cleanup_data load_interrupt cleanup_process $* + cleanup_data load_interrupt } function run() { diff --git a/dm/tests/new_relay/run.sh b/dm/tests/new_relay/run.sh index 42f0fda61ce..c0a9ca20778 100755 --- a/dm/tests/new_relay/run.sh +++ b/dm/tests/new_relay/run.sh @@ -10,6 +10,7 @@ SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt" API_VERSION="v1alpha1" +<<<<<<< HEAD function test_cant_dail_upstream() { cleanup_data $TEST_NAME cleanup_process @@ -83,9 +84,12 @@ function test_cant_dail_downstream() { cleanup_data $TEST_NAME } +======= +>>>>>>> e7b0aae47 (unit(dm): add Kill func for unit (#4035)) function test_restart_relay_status() { - cleanup_data $TEST_NAME cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="" run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT @@ -157,11 +161,83 @@ function test_restart_relay_status() { "list-member --worker" \ "relay" 1 \ "bound" 2 + + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_restart_relay_status passed" } -function test_kill_dump_connection() { +function test_cant_dail_upstream() { + cleanup_process cleanup_data $TEST_NAME + export GO_FAILPOINTS="" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 2 + + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + + export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/pkg/conn/failDBPing=return()" + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + # make sure DM-worker doesn't exit + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "injected error" 1 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_upstream passed" +} + +function test_cant_dail_downstream() { cleanup_process + cleanup_data $TEST_NAME + export GO_FAILPOINTS="" + + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-relay -s $SOURCE_ID1 worker1" \ + "\"result\": true" 2 + dmctl_start_task_standalone $cur/conf/dm-task.yaml "--remove-meta" + + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + # kill tidb + pkill -hup tidb-server 2>/dev/null || true + wait_process_exit tidb-server + + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status -s $SOURCE_ID1" \ + "\"relayCatchUpMaster\": true" 1 \ + "dial tcp 127.0.0.1:4000: connect: connection refused" 1 + + # restart tidb + run_tidb_server 4000 $TIDB_PASSWORD + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_cant_dail_downstream passed" +} + +function test_kill_dump_connection() { + cleanup_process + cleanup_data $TEST_NAME run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' @@ -193,16 +269,12 @@ function test_kill_dump_connection() { run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ "\"relayCatchUpMaster\": true" 1 + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_kill_dump_connection passed" +} +function test_relay_operations() { cleanup_process cleanup_data $TEST_NAME -} - -function run() { - test_restart_relay_status - test_cant_dail_downstream - test_cant_dail_upstream - export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/relay/ReportRelayLogSpaceInBackground=return(1)" run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -234,7 +306,7 @@ function run() { "\"worker\": \"worker1\"" 1 \ "\"worker\": \"worker2\"" 1 - # worker1 and worker2 has one realy job and worker3 have none. + # worker1 and worker2 has one relay job and worker3 have none. check_metric $WORKER1_PORT "dm_relay_binlog_file{node=\"relay\"}" 3 0 2 check_metric $WORKER1_PORT "dm_relay_exit_with_error_count" 3 -1 1 check_metric $WORKER2_PORT "dm_relay_binlog_file{node=\"relay\"}" 3 0 2 @@ -246,7 +318,7 @@ function run() { run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 - # relay task tranfer to worker1 with no error. + # relay task transfer to worker1 with no error. check_metric $WORKER1_PORT "dm_relay_data_corruption" 3 -1 1 check_metric $WORKER1_PORT "dm_relay_read_error_count" 3 -1 1 check_metric $WORKER1_PORT "dm_relay_write_error_count" 3 -1 1 @@ -254,8 +326,9 @@ function run() { check_metric $WORKER1_PORT 'dm_relay_space{type="available"}' 5 0 9223372036854775807 # subtask is preferred to scheduled to another relay worker - pkill -hup -f dm-worker1.toml 2>/dev/null || true - wait_pattern_exit dm-worker1.toml + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 # worker1 is down, worker2 has running relay and sync unit run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ @@ -295,11 +368,12 @@ function run() { [ "$new_relay_log_count_1" -eq 1 ] [ "$new_relay_log_count_2" -eq 1 ] - pkill -hup -f dm-worker1.toml 2>/dev/null || true - wait_pattern_exit dm-worker1.toml - pkill -hup -f dm-worker2.toml 2>/dev/null || true - wait_pattern_exit dm-worker2.toml - + echo "kill dm-worker1" + ps aux | grep dm-worker1 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + echo "kill dm-worker2" + ps aux | grep dm-worker2 | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 # if all relay workers are offline, relay-not-enabled worker should continue to sync run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status -s $SOURCE_ID1" \ @@ -321,8 +395,7 @@ function run() { # destroy cluster cleanup_process $* - rm -rf $WORK_DIR - mkdir $WORK_DIR + cleanup_data $TEST_NAME # insert new data run_sql_file $cur/data/db1.increment5.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -351,7 +424,14 @@ function run() { "\"result\": true" 2 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + echo ">>>>>>>>>>>>>>>>>>>>>>>>>>test test_relay_operations passed" +} +function run() { + test_relay_operations + test_cant_dail_upstream + test_restart_relay_status + test_cant_dail_downstream test_kill_dump_connection } diff --git a/dm/tests/safe_mode/run.sh b/dm/tests/safe_mode/run.sh index 4d10cc0914b..dd516ff7fdc 100755 --- a/dm/tests/safe_mode/run.sh +++ b/dm/tests/safe_mode/run.sh @@ -54,8 +54,8 @@ function consistency_none() { check_log_contain_with_retry "\[\"enable safe-mode for safe mode exit point, will exit at\"\] \[task=test\] \[unit=\"binlog replication\"\] \[location=\"position: ($name2, $pos2), gtid-set: $gtid2\"\]" $WORK_DIR/worker2/log/dm-worker.log run_sql_source2 "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'" - cleanup_data safe_mode_target cleanup_process $* + cleanup_data safe_mode_target } function check_exit_safe_binlog() { @@ -181,8 +181,8 @@ function safe_mode_recover() { echo "finish running run safe mode recover case $i" ((i += 1)) - cleanup_data safe_mode_target cleanup_process $* + cleanup_data safe_mode_target done } diff --git a/dm/tests/start_task/run.sh b/dm/tests/start_task/run.sh index 90e51cb24f7..5f67c350b00 100644 --- a/dm/tests/start_task/run.sh +++ b/dm/tests/start_task/run.sh @@ -58,8 +58,8 @@ function lazy_init_tracker() { check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t50' 1 check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t51' - cleanup_data start_task cleanup_process + cleanup_data start_task } function run() {