diff --git a/dm/master/coordinator/coordinator.go b/dm/master/coordinator/coordinator.go index 842ae0254e..167997366a 100644 --- a/dm/master/coordinator/coordinator.go +++ b/dm/master/coordinator/coordinator.go @@ -409,18 +409,26 @@ func (c *Coordinator) tryRestartMysqlTask() { for hasTaskToSchedule { select { case source := <-c.waitingTask: + log.L().Info("will schedule source", zap.String("source", source)) if cfg, ok := c.sourceConfigs[source]; ok { ret := false if w, ok := c.upstreams[source]; ok { // Try start mysql task at the same worker. c.mu.RUnlock() + log.L().Info("try start mysql task at the same worker", zap.String("worker", w.Address())) ret = c.restartMysqlTask(w, &cfg) c.mu.RLock() } else { c.mu.RUnlock() - w, _ := c.AcquireWorkerForSource(source) - if w != nil { - ret = c.restartMysqlTask(w, &cfg) + w, err := c.AcquireWorkerForSource(source) + if err != nil { + log.L().Error("acquire worker for source", zap.String("source", source), zap.Error(err)) + } else { + if w != nil { + ret = c.restartMysqlTask(w, &cfg) + } else { + log.L().Info("acquire worker for source get nil worker") + } } c.mu.RLock() } @@ -472,9 +480,9 @@ func (c *Coordinator) restartMysqlTask(w *Worker, cfg *config.MysqlConfig) bool } } } else { - log.L().Warn("restartMysqlTask failed", zap.Error(err)) // Error means there is something wrong about network, set worker to close. // remove sourceID from upstreams. So the source would be schedule in other worker. + log.L().Warn("operate mysql worker", zap.Error(err), zap.Stringer("request", req)) delete(c.upstreams, cfg.SourceID) delete(c.workerToSource, w.Address()) w.SetStatus(WorkerClosed) diff --git a/dm/worker/server.go b/dm/worker/server.go index ad5038aa28..5ce5bd1d7e 100644 --- a/dm/worker/server.go +++ b/dm/worker/server.go @@ -23,10 +23,14 @@ import ( "github.com/pingcap/dm/dm/common" "github.com/pingcap/dm/dm/config" "github.com/pingcap/dm/dm/pb" + "github.com/pingcap/dm/pkg/binlog" + tcontext "github.com/pingcap/dm/pkg/context" "github.com/pingcap/dm/pkg/log" "github.com/pingcap/dm/pkg/terror" + "github.com/pingcap/dm/syncer" "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go/sync2" "github.com/soheilhy/cmux" "go.etcd.io/etcd/clientv3" @@ -35,11 +39,12 @@ import ( ) var ( - cmuxReadTimeout = 10 * time.Second - dialTimeout = 3 * time.Second - keepaliveTimeout = 3 * time.Second - keepaliveTime = 3 * time.Second - retryConnectSleepTime = 2 * time.Second + cmuxReadTimeout = 10 * time.Second + dialTimeout = 3 * time.Second + keepaliveTimeout = 3 * time.Second + keepaliveTime = 3 * time.Second + retryConnectSleepTime = 2 * time.Second + getMinPosForSubTaskFunc = getMinPosForSubTask ) // Server accepts RPC requests @@ -653,46 +658,70 @@ func (s *Server) startWorker(cfg *config.MysqlConfig) error { if w := s.getWorker(false); w != nil { if w.cfg.SourceID == cfg.SourceID { // This mysql task has started. It may be a repeated request. Just return true + log.L().Info("This mysql task has started. It may be a repeated request. Just return true", zap.String("sourceID", s.worker.cfg.SourceID)) return nil } return terror.ErrWorkerAlreadyStart.Generate() } - w, err := NewWorker(cfg) - if err != nil { - return err - } - s.setWorker(w, false) - go func() { - w.Start() - }() - ectx, cancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3) - defer cancel() + subTaskCfgs := make([]*config.SubTaskConfig, 0, 3) + + ectx, ecancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3) + defer ecancel() key := common.UpstreamSubTaskKeyAdapter.Encode(cfg.SourceID) resp, err := s.etcdClient.KV.Get(ectx, key, clientv3.WithPrefix()) if err != nil { return err } for _, kv := range resp.Kvs { - infos, err := common.UpstreamSubTaskKeyAdapter.Decode(string(kv.Key)) - if err != nil { - log.L().Error("decode upstream subtask key from etcd failed", zap.Error(err)) - return err - } - taskName := infos[1] task := string(kv.Value) - cfg := config.NewSubTaskConfig() - if err = cfg.Decode(task); err != nil { + subTaskcfg := config.NewSubTaskConfig() + if err = subTaskcfg.Decode(task); err != nil { return err } - cfg.LogLevel = s.cfg.LogLevel - cfg.LogFile = s.cfg.LogFile + subTaskcfg.LogLevel = s.cfg.LogLevel + subTaskcfg.LogFile = s.cfg.LogFile + + subTaskCfgs = append(subTaskCfgs, subTaskcfg) + } - if err = w.StartSubTask(cfg); err != nil { + dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second) + defer dcancel() + minPos, err := getMinPosInAllSubTasks(dctx, subTaskCfgs) + if err != nil { + return err + } + + // TODO: support GTID + // don't contain GTID information in checkpoint table, just set it to empty + if minPos != nil { + cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name + cfg.RelayBinlogGTID = "" + } + + log.L().Info("start workers", zap.Reflect("subTasks", subTaskCfgs)) + + w, err := NewWorker(cfg) + if err != nil { + return err + } + s.setWorker(w, false) + go func() { + w.Start() + }() + + // FIXME: worker's closed will be set to false in Start. + // when start sub task, will check the `closed`, if closed is true, will ignore start subTask + // just sleep and make test success, will refine this later + time.Sleep(1 * time.Second) + + for _, subTaskCfg := range subTaskCfgs { + if err = w.StartSubTask(subTaskCfg); err != nil { return err } - log.L().Info("load subtask successful", zap.String("sourceID", cfg.SourceID), zap.String("task", taskName)) + log.L().Info("load subtask successful", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name)) } + return nil } @@ -753,3 +782,50 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse { } return resp } + +// all subTask in subTaskCfgs should have same source +// this function return the min position in all subtasks, used for relay's position +func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) { + for _, subTaskCfg := range subTaskCfgs { + pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg) + if err != nil { + return nil, err + } + + if pos == nil { + continue + } + + if minPos == nil { + minPos = pos + } else { + if minPos.Compare(*pos) >= 1 { + minPos = pos + } + } + } + + return minPos, nil +} + +func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) { + if subTaskCfg.Mode == config.ModeFull { + return nil, nil + } + + tctx := tcontext.NewContext(ctx, log.L()) + checkpoint := syncer.NewRemoteCheckPoint(tctx, subTaskCfg, subTaskCfg.SourceID) + err = checkpoint.Init(tctx) + if err != nil { + return nil, errors.Annotate(err, "get min position from checkpoint") + } + defer checkpoint.Close() + + err = checkpoint.Load(tctx, nil) + if err != nil { + return nil, errors.Annotate(err, "get min position from checkpoint") + } + + pos := checkpoint.GlobalPoint() + return &pos, nil +} diff --git a/dm/worker/server_test.go b/dm/worker/server_test.go index a2cbbdedbd..cb62229b3b 100644 --- a/dm/worker/server_test.go +++ b/dm/worker/server_test.go @@ -24,6 +24,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/pd/pkg/tempurl" + "github.com/siddontang/go-mysql/mysql" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" "google.golang.org/grpc" @@ -47,6 +48,12 @@ var _ = Suite(&testServer{}) func (t *testServer) SetUpSuite(c *C) { err := log.InitLogger(&log.Config{}) c.Assert(err, IsNil) + + getMinPosForSubTaskFunc = getFakePosForSubTask +} + +func (t *testServer) TearDownSuite(c *C) { + getMinPosForSubTaskFunc = getMinPosForSubTask } func createMockETCD(dir string, host string) (*embed.Etcd, error) { @@ -107,6 +114,11 @@ func (t *testServer) TestServer(c *C) { // check worker would retry connecting master rather than stop worker directly. ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName) + mysqlCfg := &config.MysqlConfig{} + c.Assert(mysqlCfg.LoadFromFile("./dm-mysql.toml"), IsNil) + err = s.startWorker(mysqlCfg) + c.Assert(err, IsNil) + // test condition hub t.testConidtionHub(c, s) @@ -323,3 +335,40 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed. c.Assert(s.getWorker(true), IsNil) c.Assert(s.retryConnectMaster.Get(), IsFalse) } + +func (t *testServer) TestGetMinPosInAllSubTasks(c *C) { + subTaskCfg := []*config.SubTaskConfig{ + { + Name: "test2", + }, { + Name: "test3", + }, { + Name: "test1", + }, + } + minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg) + c.Assert(err, IsNil) + c.Assert(minPos.Name, Equals, "mysql-binlog.00001") + c.Assert(minPos.Pos, Equals, uint32(12)) +} + +func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) { + switch subTaskCfg.Name { + case "test1": + return &mysql.Position{ + Name: "mysql-binlog.00001", + Pos: 123, + }, nil + case "test2": + return &mysql.Position{ + Name: "mysql-binlog.00001", + Pos: 12, + }, nil + case "test3": + return &mysql.Position{ + Name: "mysql-binlog.00003", + }, nil + default: + return nil, nil + } +} diff --git a/dm/worker/worker.go b/dm/worker/worker.go index c96b155c66..2dc597bf4d 100644 --- a/dm/worker/worker.go +++ b/dm/worker/worker.go @@ -109,7 +109,7 @@ func NewWorker(cfg *config.MysqlConfig) (w *Worker, err error) { InitConditionHub(w) - w.l.Info("initialized") + w.l.Info("initialized", zap.Stringer("cfg", cfg)) return w, nil } diff --git a/dm/worker/worker_test.go b/dm/worker/worker_test.go index c6828fb390..7d4a962e23 100644 --- a/dm/worker/worker_test.go +++ b/dm/worker/worker_test.go @@ -118,6 +118,8 @@ func (t *testServer) TestTaskAutoResume(c *C) { defer failpoint.Disable("github.com/pingcap/dm/mydumper/dumpUnitProcessWithError") c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly", `return(true)`), IsNil) defer failpoint.Disable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly") + c.Assert(failpoint.Enable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr", `return()`), IsNil) + defer failpoint.Disable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr") s := NewServer(cfg) diff --git a/loader/loader.go b/loader/loader.go index 9d42150202..c8cdd62aa4 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -411,6 +411,10 @@ func (l *Loader) Init(ctx context.Context) (err error) { tctx := l.logCtx.WithContext(ctx) checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID()) + failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) { + l.logCtx.L().Info("", zap.String("failpoint", "ignoreLoadCheckpointErr")) + err = nil + }) if err != nil { return err } diff --git a/relay/relay.go b/relay/relay.go index 8e2b41a306..d846427dca 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -389,7 +389,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo // TODO: try auto fix GTID, and can support auto switching between upstream server later. cfg := r.cfg.From r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error", - zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err)) + zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), zap.Stringer("last pos", lastPos), log.ShortError(err)) // log the status for debug pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor) if err2 == nil { diff --git a/syncer/checkpoint.go b/syncer/checkpoint.go index e3e566d5a5..63dbf455ef 100644 --- a/syncer/checkpoint.go +++ b/syncer/checkpoint.go @@ -607,11 +607,14 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T if err = json.Unmarshal(tiBytes, &ti); err != nil { return terror.ErrSchemaTrackerInvalidJSON.Delegate(err, cpSchema, cpTable) } - if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil { - return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema) - } - if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil { - return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable) + + if schemaTracker != nil { + if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil { + return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema) + } + if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil { + return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable) + } } mSchema, ok := cp.points[cpSchema] diff --git a/tests/_utils/run_dm_ctl_with_retry b/tests/_utils/run_dm_ctl_with_retry index 12fd7fab5b..11b6a48bd3 100755 --- a/tests/_utils/run_dm_ctl_with_retry +++ b/tests/_utils/run_dm_ctl_with_retry @@ -20,10 +20,10 @@ pid=$$ echo "dmctl test cmd: \"$cmd\"" -all_matched=True +all_matched=true for ((k=0; k<10; k++)); do echo "$cmd" | $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.dmctl.$ts.$pid.out" DEVEL -master-addr=$master_addr > $dmctl_log 2>&1 - all_matched=True + all_matched=true for ((i=1; i<$#; i+=2)); do j=$((i+1)) value=${!i} @@ -31,7 +31,7 @@ for ((k=0; k<10; k++)); do got=$(sed "s/$value/$value\n/g" $dmctl_log | grep -c "$value") if [ "$got" != "$expected" ]; then echo "command: $cmd $value count: $got != expected: $expected, failed the $k-th time, will retry again" - all_matched=False + all_matched=false break fi done diff --git a/tests/_utils/test_prepare b/tests/_utils/test_prepare index c910d41e7d..60fc1aba09 100644 --- a/tests/_utils/test_prepare +++ b/tests/_utils/test_prepare @@ -7,7 +7,7 @@ MASTER_PORT=8261 WORKER1_PORT=8262 WORKER2_PORT=8263 WORKER3_PORT=8264 -TRACER_PORT=8264 +TRACER_PORT=8265 SOURCE_ID1="mysql-replica-01" SOURCE_ID2="mysql-replica-02" RESET_MASTER=${RESET_MASTER:-true} diff --git a/tests/ha/conf/mysql1.toml b/tests/ha/conf/mysql1.toml index 0e2cf1e32f..02b3a121b9 100644 --- a/tests/ha/conf/mysql1.toml +++ b/tests/ha/conf/mysql1.toml @@ -5,6 +5,7 @@ flavor = "" enable-gtid = false relay-binlog-name = "" relay-binlog-gtid = "" +enable-relay = true [from] host = "127.0.0.1" diff --git a/tests/ha/conf/mysql2.toml b/tests/ha/conf/mysql2.toml index 69cee28cee..a1c6ecc780 100644 --- a/tests/ha/conf/mysql2.toml +++ b/tests/ha/conf/mysql2.toml @@ -5,6 +5,7 @@ flavor = "" enable-gtid = false relay-binlog-name = "" relay-binlog-gtid = "" +enable-relay = true [from] host = "127.0.0.1" diff --git a/tests/ha/run.sh b/tests/ha/run.sh index 45ecbe3708..4f0c0c602d 100755 --- a/tests/ha/run.sh +++ b/tests/ha/run.sh @@ -39,11 +39,17 @@ function run() { echo "use sync_diff_inspector to check full dump loader" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + echo "flush logs to force rotate binlog file" + run_sql "flush logs;" $MYSQL_PORT1 + run_sql "flush logs;" $MYSQL_PORT2 + echo "start dm-worker3 and kill dm-worker2" - run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT ps aux | grep dm-worker2 |awk '{print $2}'|xargs kill || true check_port_offline $WORKER2_PORT 20 + rm -rf $WORK_DIR/worker2/relay_log + + run_dm_worker $WORK_DIR/worker3 $WORKER3_PORT $cur/conf/dm-worker3.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER3_PORT echo "wait and check task running" check_http_alive 127.0.0.1:$MASTER_PORT/apis/${API_VERSION}/status/test '"name":"test","stage":"Running"' 10