Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
dm-worker: split DDL sync from sync function (#289) (#295)
Browse files Browse the repository at this point in the history
* split DDL sync from sync function
  • Loading branch information
3pointer authored Sep 23, 2019
1 parent 0fe4013 commit 51c55d9
Showing 1 changed file with 58 additions and 53 deletions.
111 changes: 58 additions & 53 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,62 @@ func (s *Syncer) flushCheckPoints() error {
return nil
}

// DDL synced one by one, so we only need to process one DDL at a time
func (s *Syncer) syncDDL(ctx *tcontext.Context, queueBucket string, db *Conn, ddlJobChan chan *job) {
defer s.wg.Done()

var err error
for {
sqlJob, ok := <-ddlJobChan
if !ok {
return
}

if sqlJob.ddlExecItem != nil && sqlJob.ddlExecItem.req != nil && !sqlJob.ddlExecItem.req.Exec {
s.tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls))
} else {
_, err = db.executeSQLWithIgnore(s.tctx, ignoreDDLError, sqlJob.ddls)
if err != nil {
err = terror.WithScope(err, terror.ScopeDownstream)
}

if s.tracer.Enable() {
syncerJobState := s.tracer.FinishedSyncerJobState(err)
var execDDLReq *pb.ExecDDLRequest
if sqlJob.ddlExecItem != nil {
execDDLReq = sqlJob.ddlExecItem.req
}
_, traceErr := s.tracer.CollectSyncerJobEvent(sqlJob.traceID, sqlJob.traceGID, int32(sqlJob.tp), sqlJob.pos, sqlJob.currentPos, queueBucket, sqlJob.sql, sqlJob.ddls, nil, execDDLReq, syncerJobState)
if traceErr != nil {
s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr))
}
}
}
if err != nil {
s.appendExecErrors(&ExecErrorContext{
err: err,
pos: sqlJob.currentPos,
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
}

if s.cfg.IsSharding {
// for sharding DDL syncing, send result back
if sqlJob.ddlExecItem != nil {
sqlJob.ddlExecItem.resp <- err
}
s.ddlExecInfo.ClearBlockingDDL()
}
s.jobWg.Done()
if err != nil {
s.execErrorDetected.Set(true)
s.runFatalChan <- unit.NewProcessError(pb.ErrorType_ExecSQL, errors.ErrorStack(err))
continue
}
s.addCount(true, queueBucket, sqlJob.tp, int64(len(sqlJob.ddls)))
}
}

func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *Conn, jobChan chan *job) {
defer s.wg.Done()

Expand Down Expand Up @@ -918,58 +974,7 @@ func (s *Syncer) sync(ctx *tcontext.Context, queueBucket string, db *Conn, jobCh
}
idx++

if sqlJob.tp == ddl {
err = executeSQLs()
if err != nil {
fatalF(err, pb.ErrorType_ExecSQL)
continue
}

if sqlJob.ddlExecItem != nil && sqlJob.ddlExecItem.req != nil && !sqlJob.ddlExecItem.req.Exec {
s.tctx.L().Info("ignore sharding DDLs", zap.Strings("ddls", sqlJob.ddls))
} else {
_, err = db.executeSQLWithIgnore(s.tctx, ignoreDDLError, sqlJob.ddls)
if err != nil {
err = terror.WithScope(err, terror.ScopeDownstream)
}

if s.tracer.Enable() {
syncerJobState := s.tracer.FinishedSyncerJobState(err)
var execDDLReq *pb.ExecDDLRequest
if sqlJob.ddlExecItem != nil {
execDDLReq = sqlJob.ddlExecItem.req
}
_, traceErr := s.tracer.CollectSyncerJobEvent(sqlJob.traceID, sqlJob.traceGID, int32(sqlJob.tp), sqlJob.pos, sqlJob.currentPos, queueBucket, sqlJob.sql, sqlJob.ddls, nil, execDDLReq, syncerJobState)
if traceErr != nil {
s.tctx.L().Error("fail to collect binlog replication job event", log.ShortError(traceErr))
}
}
}
if err != nil {
s.appendExecErrors(&ExecErrorContext{
err: err,
pos: sqlJob.currentPos,
jobs: fmt.Sprintf("%v", sqlJob.ddls),
})
}

if s.cfg.IsSharding {
// for sharding DDL syncing, send result back
if sqlJob.ddlExecItem != nil {
sqlJob.ddlExecItem.resp <- err
}
s.ddlExecInfo.ClearBlockingDDL()
}
if err != nil {
// errro then pause.
fatalF(err, pb.ErrorType_ExecSQL)
continue
}

tpCnt[sqlJob.tp] += int64(len(sqlJob.ddls))
clearF()

} else if sqlJob.tp != flush && len(sqlJob.sql) > 0 {
if sqlJob.tp != flush && len(sqlJob.sql) > 0 {
jobs = append(jobs, sqlJob)
tpCnt[sqlJob.tp]++
}
Expand Down Expand Up @@ -1064,7 +1069,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
go func() {
ctx2, cancel := context.WithCancel(ctx)
ctctx := s.tctx.WithContext(ctx2)
s.sync(ctctx, adminQueueName, s.ddlDB, s.jobs[s.cfg.WorkerCount])
s.syncDDL(ctctx, adminQueueName, s.ddlDB, s.jobs[s.cfg.WorkerCount])
cancel()
}()

Expand Down

0 comments on commit 51c55d9

Please sign in to comment.