Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

dm-worker: split DDL sync from sync function #289

Merged
merged 4 commits into from
Sep 23, 2019
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 58 additions & 53 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,62 @@ func (s *Syncer) flushCheckPoints() error {
return nil
}

// DDL synced one by one, so we only need to process one DDL at a time
func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *Conn, ddlJobChan chan *job) {
defer s.wg.Done()

var err error
for {
sqlJob, ok := <-ddlJobChan
if !ok {
return
}

if sqlJob.ddlExecItem != nil && sqlJob.ddlExecItem.req != nil && !sqlJob.ddlExecItem.req.Exec {
s.tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls))
} else {
_, err = db.executeSQLWithIgnore(s.tctx, ignoreDDLError, sqlJob.ddls)
if err != nil {
err = terror.WithScope(err, terror.ScopeDownstream)
}

if s.tracer.Enable() {
syncerJobState := s.tracer.FinishedSyncerJobState(err)
var execDDLReq *pb.ExecDDLRequest
if sqlJob.ddlExecItem != nil {
execDDLReq = sqlJob.ddlExecItem.req
}
_, traceErr := s.tracer.CollectSyncerJobEvent(sqlJob.traceID, sqlJob.traceGID, int32(sqlJob.tp), sqlJob.pos, sqlJob.currentPos, queueBucket, sqlJob.sql, sqlJob.ddls, nil, execDDLReq, syncerJobState)
if traceErr != nil {
s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr))
}
}
}
if err != nil {
s.appendExecErrors(&ExecErrorContext{
err: err,
pos: sqlJob.currentPos,
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
}

if s.cfg.IsSharding {
// for sharding DDL syncing, send result back
if sqlJob.ddlExecItem != nil {
sqlJob.ddlExecItem.resp <- err
}
s.ddlExecInfo.ClearBlockingDDL()
}
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, errors.ErrorStack(err))
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
}
}

func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *Conn, jobChan chan *job) {
defer s.wg.Done()

Expand Down Expand Up @@ -919,58 +975,7 @@ func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *Conn, jobCh
}
idx++

if sqlJob.tp == ddl {
err = executeSQLs()
if err != nil {
fatalF(err, pb.ErrorType_ExecSQL)
continue
}

if sqlJob.ddlExecItem != nil && sqlJob.ddlExecItem.req != nil && !sqlJob.ddlExecItem.req.Exec {
s.tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls))
} else {
_, err = db.executeSQLWithIgnore(s.tctx, ignoreDDLError, sqlJob.ddls)
if err != nil {
err = terror.WithScope(err, terror.ScopeDownstream)
}

if s.tracer.Enable() {
syncerJobState := s.tracer.FinishedSyncerJobState(err)
var execDDLReq *pb.ExecDDLRequest
if sqlJob.ddlExecItem != nil {
execDDLReq = sqlJob.ddlExecItem.req
}
_, traceErr := s.tracer.CollectSyncerJobEvent(sqlJob.traceID, sqlJob.traceGID, int32(sqlJob.tp), sqlJob.pos, sqlJob.currentPos, queueBucket, sqlJob.sql, sqlJob.ddls, nil, execDDLReq, syncerJobState)
if traceErr != nil {
s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr))
}
}
}
if err != nil {
s.appendExecErrors(&ExecErrorContext{
err: err,
pos: sqlJob.currentPos,
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
}

if s.cfg.IsSharding {
// for sharding DDL syncing, send result back
if sqlJob.ddlExecItem != nil {
sqlJob.ddlExecItem.resp <- err
}
s.ddlExecInfo.ClearBlockingDDL()
}
if err != nil {
// errro then pause.
fatalF(err, pb.ErrorType_ExecSQL)
continue
}

tpCnt[sqlJob.tp] += int64(len(sqlJob.ddls))
clearF()

} else if sqlJob.tp != flush && len(sqlJob.sql) > 0 {
if sqlJob.tp != flush && len(sqlJob.sql) > 0 {
jobs = append(jobs, sqlJob)
tpCnt[sqlJob.tp]++
}
Expand Down Expand Up @@ -1065,7 +1070,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
go func() {
ctx2, cancel := context.WithCancel(ctx)
ctctx := s.tctx.WithContext(ctx2)
s.sync(ctctx, adminQueueName, s.ddlDB, s.jobs[s.cfg.WorkerCount])
s.syncDDL(ctctx, adminQueueName, s.ddlDB, s.jobs[s.cfg.WorkerCount])
cancel()
}()

Expand Down