Skip to content

Commit

Permalink
relay: start with the min binlog position in checkpoint (pingcap#457)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored Feb 4, 2020
1 parent ec1bddb commit 256723e
Show file tree
Hide file tree
Showing 13 changed files with 194 additions and 44 deletions.
16 changes: 12 additions & 4 deletions dm/master/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,18 +409,26 @@ func (c *Coordinator) tryRestartMysqlTask() {
for hasTaskToSchedule {
select {
case source := <-c.waitingTask:
log.L().Info("will schedule source", zap.String("source", source))
if cfg, ok := c.sourceConfigs[source]; ok {
ret := false
if w, ok := c.upstreams[source]; ok {
// Try start mysql task at the same worker.
c.mu.RUnlock()
log.L().Info("try start mysql task at the same worker", zap.String("worker", w.Address()))
ret = c.restartMysqlTask(w, &cfg)
c.mu.RLock()
} else {
c.mu.RUnlock()
w, _ := c.AcquireWorkerForSource(source)
if w != nil {
ret = c.restartMysqlTask(w, &cfg)
w, err := c.AcquireWorkerForSource(source)
if err != nil {
log.L().Error("acquire worker for source", zap.String("source", source), zap.Error(err))
} else {
if w != nil {
ret = c.restartMysqlTask(w, &cfg)
} else {
log.L().Info("acquire worker for source get nil worker")
}
}
c.mu.RLock()
}
Expand Down Expand Up @@ -472,9 +480,9 @@ func (c *Coordinator) restartMysqlTask(w *Worker, cfg *config.MysqlConfig) bool
}
}
} else {
log.L().Warn("restartMysqlTask failed", zap.Error(err))
// Error means there is something wrong about network, set worker to close.
// remove sourceID from upstreams. So the source would be schedule in other worker.
log.L().Warn("operate mysql worker", zap.Error(err), zap.Stringer("request", req))
delete(c.upstreams, cfg.SourceID)
delete(c.workerToSource, w.Address())
w.SetStatus(WorkerClosed)
Expand Down
130 changes: 103 additions & 27 deletions dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import (
"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/dm/config"
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/binlog"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/syncer"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/sync2"
"github.com/soheilhy/cmux"
"go.etcd.io/etcd/clientv3"
Expand All @@ -35,11 +39,12 @@ import (
)

var (
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = 2 * time.Second
cmuxReadTimeout = 10 * time.Second
dialTimeout = 3 * time.Second
keepaliveTimeout = 3 * time.Second
keepaliveTime = 3 * time.Second
retryConnectSleepTime = 2 * time.Second
getMinPosForSubTaskFunc = getMinPosForSubTask
)

// Server accepts RPC requests
Expand Down Expand Up @@ -653,46 +658,70 @@ func (s *Server) startWorker(cfg *config.MysqlConfig) error {
if w := s.getWorker(false); w != nil {
if w.cfg.SourceID == cfg.SourceID {
// This mysql task has started. It may be a repeated request. Just return true
log.L().Info("This mysql task has started. It may be a repeated request. Just return true", zap.String("sourceID", s.worker.cfg.SourceID))
return nil
}
return terror.ErrWorkerAlreadyStart.Generate()
}
w, err := NewWorker(cfg)
if err != nil {
return err
}
s.setWorker(w, false)
go func() {
w.Start()
}()

ectx, cancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3)
defer cancel()
subTaskCfgs := make([]*config.SubTaskConfig, 0, 3)

ectx, ecancel := context.WithTimeout(s.etcdClient.Ctx(), time.Second*3)
defer ecancel()
key := common.UpstreamSubTaskKeyAdapter.Encode(cfg.SourceID)
resp, err := s.etcdClient.KV.Get(ectx, key, clientv3.WithPrefix())
if err != nil {
return err
}
for _, kv := range resp.Kvs {
infos, err := common.UpstreamSubTaskKeyAdapter.Decode(string(kv.Key))
if err != nil {
log.L().Error("decode upstream subtask key from etcd failed", zap.Error(err))
return err
}
taskName := infos[1]
task := string(kv.Value)
cfg := config.NewSubTaskConfig()
if err = cfg.Decode(task); err != nil {
subTaskcfg := config.NewSubTaskConfig()
if err = subTaskcfg.Decode(task); err != nil {
return err
}
cfg.LogLevel = s.cfg.LogLevel
cfg.LogFile = s.cfg.LogFile
subTaskcfg.LogLevel = s.cfg.LogLevel
subTaskcfg.LogFile = s.cfg.LogFile

subTaskCfgs = append(subTaskCfgs, subTaskcfg)
}

if err = w.StartSubTask(cfg); err != nil {
dctx, dcancel := context.WithTimeout(s.etcdClient.Ctx(), time.Duration(len(subTaskCfgs))*3*time.Second)
defer dcancel()
minPos, err := getMinPosInAllSubTasks(dctx, subTaskCfgs)
if err != nil {
return err
}

// TODO: support GTID
// don't contain GTID information in checkpoint table, just set it to empty
if minPos != nil {
cfg.RelayBinLogName = binlog.AdjustPosition(*minPos).Name
cfg.RelayBinlogGTID = ""
}

log.L().Info("start workers", zap.Reflect("subTasks", subTaskCfgs))

w, err := NewWorker(cfg)
if err != nil {
return err
}
s.setWorker(w, false)
go func() {
w.Start()
}()

// FIXME: worker's closed will be set to false in Start.
// when start sub task, will check the `closed`, if closed is true, will ignore start subTask
// just sleep and make test success, will refine this later
time.Sleep(1 * time.Second)

for _, subTaskCfg := range subTaskCfgs {
if err = w.StartSubTask(subTaskCfg); err != nil {
return err
}
log.L().Info("load subtask successful", zap.String("sourceID", cfg.SourceID), zap.String("task", taskName))
log.L().Info("load subtask successful", zap.String("sourceID", subTaskCfg.SourceID), zap.String("task", subTaskCfg.Name))
}

return nil
}

Expand Down Expand Up @@ -753,3 +782,50 @@ func makeCommonWorkerResponse(reqErr error) *pb.CommonWorkerResponse {
}
return resp
}

// all subTask in subTaskCfgs should have same source
// this function return the min position in all subtasks, used for relay's position
func getMinPosInAllSubTasks(ctx context.Context, subTaskCfgs []*config.SubTaskConfig) (minPos *mysql.Position, err error) {
for _, subTaskCfg := range subTaskCfgs {
pos, err := getMinPosForSubTaskFunc(ctx, subTaskCfg)
if err != nil {
return nil, err
}

if pos == nil {
continue
}

if minPos == nil {
minPos = pos
} else {
if minPos.Compare(*pos) >= 1 {
minPos = pos
}
}
}

return minPos, nil
}

func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
if subTaskCfg.Mode == config.ModeFull {
return nil, nil
}

tctx := tcontext.NewContext(ctx, log.L())
checkpoint := syncer.NewRemoteCheckPoint(tctx, subTaskCfg, subTaskCfg.SourceID)
err = checkpoint.Init(tctx)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}
defer checkpoint.Close()

err = checkpoint.Load(tctx, nil)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}

pos := checkpoint.GlobalPoint()
return &pos, nil
}
49 changes: 49 additions & 0 deletions dm/worker/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/pd/pkg/tempurl"
"github.com/siddontang/go-mysql/mysql"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
"google.golang.org/grpc"
Expand All @@ -47,6 +48,12 @@ var _ = Suite(&testServer{})
func (t *testServer) SetUpSuite(c *C) {
err := log.InitLogger(&log.Config{})
c.Assert(err, IsNil)

getMinPosForSubTaskFunc = getFakePosForSubTask
}

func (t *testServer) TearDownSuite(c *C) {
getMinPosForSubTaskFunc = getMinPosForSubTask
}

func createMockETCD(dir string, host string) (*embed.Etcd, error) {
Expand Down Expand Up @@ -107,6 +114,11 @@ func (t *testServer) TestServer(c *C) {
// check worker would retry connecting master rather than stop worker directly.
ETCD = t.testRetryConnectMaster(c, s, ETCD, etcdDir, hostName)

mysqlCfg := &config.MysqlConfig{}
c.Assert(mysqlCfg.LoadFromFile("./dm-mysql.toml"), IsNil)
err = s.startWorker(mysqlCfg)
c.Assert(err, IsNil)

// test condition hub
t.testConidtionHub(c, s)

Expand Down Expand Up @@ -323,3 +335,40 @@ func (t *testServer) testStopWorkerWhenLostConnect(c *C, s *Server, ETCD *embed.
c.Assert(s.getWorker(true), IsNil)
c.Assert(s.retryConnectMaster.Get(), IsFalse)
}

func (t *testServer) TestGetMinPosInAllSubTasks(c *C) {
subTaskCfg := []*config.SubTaskConfig{
{
Name: "test2",
}, {
Name: "test3",
}, {
Name: "test1",
},
}
minPos, err := getMinPosInAllSubTasks(context.Background(), subTaskCfg)
c.Assert(err, IsNil)
c.Assert(minPos.Name, Equals, "mysql-binlog.00001")
c.Assert(minPos.Pos, Equals, uint32(12))
}

func getFakePosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig) (minPos *mysql.Position, err error) {
switch subTaskCfg.Name {
case "test1":
return &mysql.Position{
Name: "mysql-binlog.00001",
Pos: 123,
}, nil
case "test2":
return &mysql.Position{
Name: "mysql-binlog.00001",
Pos: 12,
}, nil
case "test3":
return &mysql.Position{
Name: "mysql-binlog.00003",
}, nil
default:
return nil, nil
}
}
2 changes: 1 addition & 1 deletion dm/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewWorker(cfg *config.MysqlConfig) (w *Worker, err error) {

InitConditionHub(w)

w.l.Info("initialized")
w.l.Info("initialized", zap.Stringer("cfg", cfg))

return w, nil
}
Expand Down
2 changes: 2 additions & 0 deletions dm/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (t *testServer) TestTaskAutoResume(c *C) {
defer failpoint.Disable("github.com/pingcap/dm/mydumper/dumpUnitProcessWithError")
c.Assert(failpoint.Enable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly", `return(true)`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/dm/worker/mockCreateUnitsDumpOnly")
c.Assert(failpoint.Enable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr", `return()`), IsNil)
defer failpoint.Disable("github.com/pingcap/dm/loader/ignoreLoadCheckpointErr")

s := NewServer(cfg)

Expand Down
4 changes: 4 additions & 0 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,10 @@ func (l *Loader) Init(ctx context.Context) (err error) {
tctx := l.logCtx.WithContext(ctx)

checkpoint, err := newRemoteCheckPoint(tctx, l.cfg, l.checkpointID())
failpoint.Inject("ignoreLoadCheckpointErr", func(_ failpoint.Value) {
l.logCtx.L().Info("", zap.String("failpoint", "ignoreLoadCheckpointErr"))
err = nil
})
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
// TODO: try auto fix GTID, and can support auto switching between upstream server later.
cfg := r.cfg.From
r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error",
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err))
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), zap.Stringer("last pos", lastPos), log.ShortError(err))
// log the status for debug
pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor)
if err2 == nil {
Expand Down
13 changes: 8 additions & 5 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,14 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T
if err = json.Unmarshal(tiBytes, &ti); err != nil {
return terror.ErrSchemaTrackerInvalidJSON.Delegate(err, cpSchema, cpTable)
}
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)

if schemaTracker != nil {
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)
}
}

mSchema, ok := cp.points[cpSchema]
Expand Down
6 changes: 3 additions & 3 deletions tests/_utils/run_dm_ctl_with_retry
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ pid=$$
echo "dmctl test cmd: \"$cmd\""


all_matched=True
all_matched=true
for ((k=0; k<10; k++)); do
echo "$cmd" | $binary -test.coverprofile="$TEST_DIR/cov.$TEST_NAME.dmctl.$ts.$pid.out" DEVEL -master-addr=$master_addr > $dmctl_log 2>&1
all_matched=True
all_matched=true
for ((i=1; i<$#; i+=2)); do
j=$((i+1))
value=${!i}
expected=${!j}
got=$(sed "s/$value/$value\n/g" $dmctl_log | grep -c "$value")
if [ "$got" != "$expected" ]; then
echo "command: $cmd $value count: $got != expected: $expected, failed the $k-th time, will retry again"
all_matched=False
all_matched=false
break
fi
done
Expand Down
2 changes: 1 addition & 1 deletion tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ MASTER_PORT=8261
WORKER1_PORT=8262
WORKER2_PORT=8263
WORKER3_PORT=8264
TRACER_PORT=8264
TRACER_PORT=8265
SOURCE_ID1="mysql-replica-01"
SOURCE_ID2="mysql-replica-02"
RESET_MASTER=${RESET_MASTER:-true}
Expand Down
1 change: 1 addition & 0 deletions tests/ha/conf/mysql1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""
enable-relay = true

[from]
host = "127.0.0.1"
Expand Down
1 change: 1 addition & 0 deletions tests/ha/conf/mysql2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ flavor = ""
enable-gtid = false
relay-binlog-name = ""
relay-binlog-gtid = ""
enable-relay = true

[from]
host = "127.0.0.1"
Expand Down
Loading

0 comments on commit 256723e

Please sign in to comment.