Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

relay: start with the min binlog position in checkpoint #457

Merged
merged 30 commits into from
Feb 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f7c67bd
fix get subtask
WangXiangUSTC Jan 11, 2020
bc8ed8c
add test
WangXiangUSTC Jan 11, 2020
6338cc8
minor fix
WangXiangUSTC Jan 11, 2020
c58e767
Apply suggestions from code review
WangXiangUSTC Jan 12, 2020
34cd486
address comment
WangXiangUSTC Jan 12, 2020
fca171d
remove deploy config in dm-master
WangXiangUSTC Jan 12, 2020
3721168
check dm-worker offline
WangXiangUSTC Jan 13, 2020
2a26366
merge ha-dev and resolve conflicts
WangXiangUSTC Jan 13, 2020
0d200ea
add function to get min pos in all subtasks
WangXiangUSTC Jan 15, 2020
c4a6c25
set relay binlog name if not set
WangXiangUSTC Jan 15, 2020
7cf89d8
minor fix
WangXiangUSTC Jan 15, 2020
6a0b42e
remove useless log
WangXiangUSTC Jan 15, 2020
f044c4a
minor fix
WangXiangUSTC Jan 15, 2020
7951f72
merge ha-dev
WangXiangUSTC Jan 15, 2020
594320f
update true and false
WangXiangUSTC Jan 15, 2020
83b7e9a
merge ha-dev and some fix
WangXiangUSTC Jan 16, 2020
14ef879
add db close
WangXiangUSTC Jan 17, 2020
b93ca7f
add simple unit test
WangXiangUSTC Jan 17, 2020
65a75bf
merge ha-dev and resolve conflicts
WangXiangUSTC Jan 17, 2020
8617e18
minor fix
WangXiangUSTC Jan 19, 2020
99e32ce
address comment
WangXiangUSTC Jan 20, 2020
70d6e8e
update log
WangXiangUSTC Jan 20, 2020
736aa48
add TODO
WangXiangUSTC Jan 20, 2020
79442de
use minPos when minPos is not nil
WangXiangUSTC Jan 20, 2020
69363d7
use checkpoint
WangXiangUSTC Jan 20, 2020
e3a4ba7
return error with annotate
WangXiangUSTC Jan 20, 2020
ce3635a
fix context
WangXiangUSTC Jan 20, 2020
7802a15
fix unit test
WangXiangUSTC Feb 3, 2020
4b3f764
merge ha-dev
WangXiangUSTC Feb 3, 2020
1f82f5a
fix test
WangXiangUSTC Feb 3, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions dm/master/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,26 @@ func (c *Coordinator) tryRestartMysqlTask() {
for hasTaskToSchedule {
select {
case source := <-c.waitingTask:
log.L().Info("will schedule source", zap.String("source", source))
if cfg, ok := c.sourceConfigs[source]; ok {
ret := false
if w, ok := c.upstreams[source]; ok {
// Try start mysql task at the same worker.
c.mu.RUnlock()
log.L().Info("try start mysql task at the same worker", zap.String("worker", w.Address()))
ret = c.restartMysqlTask(w, &cfg)
c.mu.RLock()
} else {
c.mu.RUnlock()
w, _ := c.AcquireWorkerForSource(source)
if w != nil {
ret = c.restartMysqlTask(w, &cfg)
w, err := c.AcquireWorkerForSource(source)
if err != nil {
log.L().Error("acquire worker for source", zap.String("source", source), zap.Error(err))
} else {
if w != nil {
ret = c.restartMysqlTask(w, &cfg)
} else {
log.L().Info("acquire worker for source get nil worker")
}
}
c.mu.RLock()
}
Expand Down Expand Up @@ -472,9 +480,9 @@ func (c *Coordinator) restartMysqlTask(w *Worker, cfg *config.MysqlConfig) bool
}
}
} else {
log.L().Warn("restartMysqlTask failed", zap.Error(err))
// Error means there is something wrong about network, set worker to close.
// remove sourceID from upstreams. So the source would be schedule in other worker.
log.L().Warn("operate mysql worker", zap.Error(err), zap.Stringer("request", req))
delete(c.upstreams, cfg.SourceID)
delete(c.workerToSource, w.Address())
w.SetStatus(WorkerClosed)
Expand Down
130 changes: 103 additions & 27 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import (
"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/syncer"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/sync2"
"github.com/soheilhy/cmux"
"go.etcd.io/etcd/clientv3"
Expand All @@ -35,11 +39,12 @@ import (
)

var (
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = 2 * time.Second
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = 2 * time.Second
getMinPosForSubTaskFunc = getMinPosForSubTask
)

// Server accepts RPC requests
Expand Down Expand Up @@ -653,46 +658,70 @@ func (s *Server) startWorker(cfg *config.MysqlConfig) error {
if w := s.getWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
// This mysql task has started. It may be a repeated request. Just return true
log.L().Info("This mysql task has started. It may be a repeated request. Just return true", zap.String("sourceID", s.worker.cfg.SourceID))
return nil
}
return terror.ErrWorkerAlreadyStart.Generate()
}
w, err := NewWorker(cfg)
if err != nil {
return err
}
s.setWorker(w, false)
go func() {
w.Start()
}()

ectx, cancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3)
defer cancel()
subTaskCfgs := make([]*config.SubTaskConfig, 0, 3)

ectx, ecancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3)
defer ecancel()
key := common.UpstreamSubTaskKeyAdapter.Encode(cfg.SourceID)
resp, err := s.etcdClient.KV.Get(ectx, key, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
infos, err := common.UpstreamSubTaskKeyAdapter.Decode(string(kv.Key))
if err != nil {
log.L().Error("decode upstream subtask key from etcd failed", zap.Error(err))
return err
}
taskName := infos[1]
task := string(kv.Value)
cfg := config.NewSubTaskConfig()
if err = cfg.Decode(task); err != nil {
subTaskcfg := config.NewSubTaskConfig()
if err = subTaskcfg.Decode(task); err != nil {
return err
}
cfg.LogLevel = s.cfg.LogLevel
cfg.LogFile = s.cfg.LogFile
subTaskcfg.LogLevel = s.cfg.LogLevel
subTaskcfg.LogFile = s.cfg.LogFile

subTaskCfgs = append(subTaskCfgs, subTaskcfg)
}

if err = w.StartSubTask(cfg); err != nil {
dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minPos, err := getMinPosInAllSubTasks(dctx, subTaskCfgs)
if err != nil {
return err
}

// TODO: support GTID
// don't contain GTID information in checkpoint table, just set it to empty
if minPos != nil {
cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name
cfg.RelayBinlogGTID = ""
}

log.L().Info("start workers", zap.Reflect("subTasks", subTaskCfgs))

w, err := NewWorker(cfg)
if err != nil {
return err
}
s.setWorker(w, false)
go func() {
w.Start()
}()

// FIXME: worker's closed will be set to false in Start.
// when start sub task, will check the `closed`, if closed is true, will ignore start subTask
// just sleep and make test success, will refine this later
time.Sleep(1 * time.Second)

for _, subTaskCfg := range subTaskCfgs {
if err = w.StartSubTask(subTaskCfg); err != nil {
return err
}
log.L().Info("load subtask successful", zap.String("sourceID", cfg.SourceID), zap.String("task", taskName))
log.L().Info("load subtask successful", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
}

return nil
}

Expand Down Expand Up @@ -753,3 +782,50 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
if err != nil {
return nil, err
}

if pos == nil {
continue
}

if minPos == nil {
minPos = pos
} else {
if minPos.Compare(*pos) >= 1 {
minPos = pos
}
}
}

return minPos, nil
}

func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
if subTaskCfg.Mode == config.ModeFull {
return nil, nil
}

tctx := tcontext.NewContext(ctx, log.L())
checkpoint := syncer.NewRemoteCheckPoint(tctx, subTaskCfg, subTaskCfg.SourceID)
err = checkpoint.Init(tctx)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}
defer checkpoint.Close()

err = checkpoint.Load(tctx, nil)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}

pos := checkpoint.GlobalPoint()
return &pos, nil
}
49 changes: 49 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/siddontang/go-mysql/mysql"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"google.golang.org/grpc"
Expand All @@ -47,6 +48,12 @@ var _ = Suite(&testServer{})
func (t *testServer) SetUpSuite(c *C) {
err := log.InitLogger(&log.Config{})
c.Assert(err, IsNil)

getMinPosForSubTaskFunc = getFakePosForSubTask
}

func (t *testServer) TearDownSuite(c *C) {
getMinPosForSubTaskFunc = getMinPosForSubTask
}

func createMockETCD(dir string, host string) (*embed.Etcd, error) {
Expand Down Expand Up @@ -107,6 +114,11 @@ func (t *testServer) TestServer(c *C) {
// check worker would retry connecting master rather than stop worker directly.
ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName)

mysqlCfg := &config.MysqlConfig{}
c.Assert(mysqlCfg.LoadFromFile("./dm-mysql.toml"), IsNil)
err = s.startWorker(mysqlCfg)
c.Assert(err, IsNil)

// test condition hub
t.testConidtionHub(c, s)

Expand Down Expand Up @@ -323,3 +335,40 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed.
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.retryConnectMaster.Get(), IsFalse)
}

func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
subTaskCfg := []*config.SubTaskConfig{
{
Name: "test2",
}, {
Name: "test3",
}, {
Name: "test1",
},
}
minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minPos.Name, Equals, "mysql-binlog.00001")
c.Assert(minPos.Pos, Equals, uint32(12))
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
return &mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}, nil
case "test2":
return &mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
}, nil
case "test3":
return &mysql.Position{
Name: "mysql-binlog.00003",
}, nil
default:
return nil, nil
}
}
2 changes: 1 addition & 1 deletion dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewWorker(cfg *config.MysqlConfig) (w *Worker, err error) {

InitConditionHub(w)

w.l.Info("initialized")
w.l.Info("initialized", zap.Stringer("cfg", cfg))

return w, nil
}
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (t *testServer) TestTaskAutoResume(c *C) {
defer failpoint.Disable("github.com/pingcap/dm/mydumper/dumpUnitProcessWithError")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly", `return(true)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly")
c.Assert(failpoint.Enable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr", `return()`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr")

s := NewServer(cfg)

Expand Down
4 changes: 4 additions & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ func (l *Loader) Init(ctx context.Context) (err error) {
tctx := l.logCtx.WithContext(ctx)

checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID())
failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) {
l.logCtx.L().Info("", zap.String("failpoint", "ignoreLoadCheckpointErr"))
err = nil
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
// TODO: try auto fix GTID, and can support auto switching between upstream server later.
cfg := r.cfg.From
r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error",
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err))
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), zap.Stringer("last pos", lastPos), log.ShortError(err))
// log the status for debug
pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor)
if err2 == nil {
Expand Down
13 changes: 8 additions & 5 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,14 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T
if err = json.Unmarshal(tiBytes, &ti); err != nil {
return terror.ErrSchemaTrackerInvalidJSON.Delegate(err, cpSchema, cpTable)
}
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)

if schemaTracker != nil {
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)
}
}

mSchema, ok := cp.points[cpSchema]
Expand Down
6 changes: 3 additions & 3 deletions tests/_utils/run_dm_ctl_with_retry
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ pid=$$
echo "dmctl test cmd: \"$cmd\""


all_matched=True
all_matched=true
for ((k=0; k<10; k++)); do
echo "$cmd" | $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.dmctl.$ts.$pid.out" DEVEL -master-addr=$master_addr > $dmctl_log 2>&1
all_matched=True
all_matched=true
for ((i=1; i<$#; i+=2)); do
j=$((i+1))
value=${!i}
expected=${!j}
got=$(sed "s/$value/$value\n/g" $dmctl_log | grep -c "$value")
if [ "$got" != "$expected" ]; then
echo "command: $cmd $value count: $got != expected: $expected, failed the $k-th time, will retry again"
all_matched=False
all_matched=false
break
fi
done
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ MASTER_PORT=8261
WORKER1_PORT=8262
WORKER2_PORT=8263
WORKER3_PORT=8264
TRACER_PORT=8264
TRACER_PORT=8265
SOURCE_ID1="mysql-replica-01"
SOURCE_ID2="mysql-replica-02"
RESET_MASTER=${RESET_MASTER:-true}
Expand Down
1 change: 1 addition & 0 deletions tests/ha/conf/mysql1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""
enable-relay = true

[from]
host = "127.0.0.1"
Expand Down
1 change: 1 addition & 0 deletions tests/ha/conf/mysql2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""
enable-relay = true

[from]
host = "127.0.0.1"
Expand Down
Loading