Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1297 to release-2.0 (#1303)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Nov 26, 2020
1 parent 38caab6 commit 82e4c5f
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 29 deletions.
5 changes: 4 additions & 1 deletion syncer/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque
pos := req.BinlogPos

if len(pos) == 0 {
startLocation := s.getErrLocation()
startLocation, isQueryEvent := s.getErrLocation()
if startLocation == nil {
return fmt.Errorf("source '%s' has no error", s.cfg.SourceID)
}
if !isQueryEvent {
return fmt.Errorf("only support to handle ddl error currently, see https://docs.pingcap.com/tidb-data-migration/stable/error-handling for other errors")
}
pos = startLocation.Position.String()
} else {
startLocation, err := binlog.VerifyBinlogPos(pos)
Expand Down
26 changes: 14 additions & 12 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type Syncer struct {
sync.RWMutex
startLocation *binlog.Location
endLocation *binlog.Location
isQueryEvent bool
}

addJobFunc func(*job) error
Expand Down Expand Up @@ -433,7 +434,7 @@ func (s *Syncer) reset() {
s.newJobChans(s.cfg.WorkerCount + 1)

s.execError.Set(nil)
s.setErrLocation(nil, nil)
s.setErrLocation(nil, nil, false)
s.isReplacingErr = false

switch s.cfg.ShardMode {
Expand Down Expand Up @@ -936,7 +937,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation)
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
Expand Down Expand Up @@ -972,7 +973,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation)
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
Expand Down Expand Up @@ -1007,7 +1008,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
fatalF := func(affected int, err error) {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation)
err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation, false)
s.runFatalChan <- unit.NewProcessError(err)
}
clearF()
Expand Down Expand Up @@ -1328,11 +1329,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header))

// TODO: support all event
// we calculate startLocation and endLocation(currentLocation) for Rows/Query event here
// we calculate startLocation and endLocation(currentLocation) for Query event here
// set startLocation empty for other events to avoid misuse
startLocation = binlog.Location{}
switch e.Event.(type) {
case *replication.RowsEvent, *replication.QueryEvent:
case *replication.QueryEvent:
startLocation = binlog.InitLocation(
mysql.Position{
Name: lastLocation.Position.Name,
Expand Down Expand Up @@ -1469,7 +1470,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}
if err2 != nil {
if err := s.handleEventError(err2, startLocation, currentLocation); err != nil {
if err := s.handleEventError(err2, startLocation, currentLocation, e.Header.EventType == replication.QUERY_EVENT); err != nil {
return err
}
}
Expand Down Expand Up @@ -2709,10 +2710,11 @@ func (s *Syncer) ShardDDLOperation() *pessimism.Operation {
return s.pessimist.PendingOperation()
}

func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) {
func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location, isQueryEventEvent bool) {
s.errLocation.Lock()
defer s.errLocation.Unlock()

s.errLocation.isQueryEvent = isQueryEventEvent
if s.errLocation.startLocation == nil || startLocation == nil {
s.errLocation.startLocation = startLocation
} else if binlog.CompareLocation(*startLocation, *s.errLocation.startLocation, s.cfg.EnableGTID) < 0 {
Expand All @@ -2726,18 +2728,18 @@ func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) {
}
}

func (s *Syncer) getErrLocation() *binlog.Location {
func (s *Syncer) getErrLocation() (*binlog.Location, bool) {
s.errLocation.Lock()
defer s.errLocation.Unlock()
return s.errLocation.startLocation
return s.errLocation.startLocation, s.errLocation.isQueryEvent
}

func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location) error {
func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location, isQueryEvent bool) error {
if err == nil {
return nil
}

s.setErrLocation(&startLocation, &endLocation)
s.setErrLocation(&startLocation, &endLocation, isQueryEvent)
return terror.Annotatef(err, "startLocation: [%s], endLocation: [%s]", startLocation, endLocation)
}

Expand Down
20 changes: 4 additions & 16 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,20 @@ function DM_SKIP_ERROR_CASE() {
# skip one source
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip -s mysql-replica-01" \
"\"result\": true" 2
"only support to handle ddl error currently" 1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 1 \
"\"stage\": \"Paused\"" 1
"\"stage\": \"Paused\"" 2

# skip all sources
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip" \
"\"result\": true" 2 \
"\"source 'mysql-replica-01' has no error\"" 1
"only support to handle ddl error currently" 2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 2

# '11' -> 11, '22' -> 22, no error
run_sql_source1 "insert into ${db}.${tb1} values('111',7)"
run_sql_source2 "insert into ${db}.${tb2} values('222',8)"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb1} where id=111;" "count(1): 1"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where id=222;" "count(1): 1"
"\"stage\": \"Paused\"" 2
}

function DM_SKIP_ERROR() {
Expand Down

0 comments on commit 82e4c5f

Please sign in to comment.