Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

handle-error: only support to handle query error #1297

Merged
merged 5 commits into from
Nov 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion syncer/handle_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@ func (s *Syncer) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReque
pos := req.BinlogPos

if len(pos) == 0 {
startLocation := s.getErrLocation()
startLocation, isQueryEvent := s.getErrLocation()
if startLocation == nil {
return fmt.Errorf("source '%s' has no error", s.cfg.SourceID)
}
if !isQueryEvent {
return fmt.Errorf("only support to handle ddl error currently, see https://docs.pingcap.com/tidb-data-migration/stable/error-handling for other errors")
}
pos = startLocation.Position.String()
} else {
startLocation, err := binlog.VerifyBinlogPos(pos)
Expand Down
26 changes: 14 additions & 12 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type Syncer struct {
sync.RWMutex
startLocation *binlog.Location
endLocation *binlog.Location
isQueryEvent bool
}

addJobFunc func(*job) error
Expand Down Expand Up @@ -433,7 +434,7 @@ func (s *Syncer) reset() {
s.newJobChans(s.cfg.WorkerCount + 1)

s.execError.Set(nil)
s.setErrLocation(nil, nil)
s.setErrLocation(nil, nil, false)
s.isReplacingErr = false

switch s.cfg.ShardMode {
Expand Down Expand Up @@ -936,7 +937,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation)
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
Expand Down Expand Up @@ -972,7 +973,7 @@ func (s *Syncer) syncDDL(tctx *tcontext.Context, queueBucket string, db *DBConn,
if err != nil {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation)
err = s.handleEventError(err, sqlJob.startLocation, sqlJob.currentLocation, true)
s.runFatalChan <- unit.NewProcessError(err)
}
s.jobWg.Done()
Expand Down Expand Up @@ -1007,7 +1008,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
fatalF := func(affected int, err error) {
s.execError.Set(err)
if !utils.IsContextCanceledError(err) {
err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation)
err = s.handleEventError(err, jobs[affected].startLocation, jobs[affected].currentLocation, false)
s.runFatalChan <- unit.NewProcessError(err)
}
clearF()
Expand Down Expand Up @@ -1328,11 +1329,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
tctx.L().Debug("receive binlog event", zap.Reflect("header", e.Header))

// TODO: support all event
// we calculate startLocation and endLocation(currentLocation) for Rows/Query event here
// we calculate startLocation and endLocation(currentLocation) for Query event here
// set startLocation empty for other events to avoid misuse
startLocation = binlog.Location{}
switch e.Event.(type) {
case *replication.RowsEvent, *replication.QueryEvent:
case *replication.QueryEvent:
startLocation = binlog.InitLocation(
mysql.Position{
Name: lastLocation.Position.Name,
Expand Down Expand Up @@ -1469,7 +1470,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}
}
if err2 != nil {
if err := s.handleEventError(err2, startLocation, currentLocation); err != nil {
if err := s.handleEventError(err2, startLocation, currentLocation, e.Header.EventType == replication.QUERY_EVENT); err != nil {
return err
}
}
Expand Down Expand Up @@ -2709,10 +2710,11 @@ func (s *Syncer) ShardDDLOperation() *pessimism.Operation {
return s.pessimist.PendingOperation()
}

func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) {
func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location, isQueryEventEvent bool) {
s.errLocation.Lock()
defer s.errLocation.Unlock()

s.errLocation.isQueryEvent = isQueryEventEvent
if s.errLocation.startLocation == nil || startLocation == nil {
s.errLocation.startLocation = startLocation
} else if binlog.CompareLocation(*startLocation, *s.errLocation.startLocation, s.cfg.EnableGTID) < 0 {
Expand All @@ -2726,18 +2728,18 @@ func (s *Syncer) setErrLocation(startLocation, endLocation *binlog.Location) {
}
}

func (s *Syncer) getErrLocation() *binlog.Location {
func (s *Syncer) getErrLocation() (*binlog.Location, bool) {
s.errLocation.Lock()
defer s.errLocation.Unlock()
return s.errLocation.startLocation
return s.errLocation.startLocation, s.errLocation.isQueryEvent
}

func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location) error {
func (s *Syncer) handleEventError(err error, startLocation, endLocation binlog.Location, isQueryEvent bool) error {
if err == nil {
return nil
}

s.setErrLocation(&startLocation, &endLocation)
s.setErrLocation(&startLocation, &endLocation, isQueryEvent)
return terror.Annotatef(err, "startLocation: [%s], endLocation: [%s]", startLocation, endLocation)
}

Expand Down
20 changes: 4 additions & 16 deletions tests/handle_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,20 @@ function DM_SKIP_ERROR_CASE() {
# skip one source
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip -s mysql-replica-01" \
"\"result\": true" 2
"only support to handle ddl error currently" 1

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 1 \
"\"stage\": \"Paused\"" 1
"\"stage\": \"Paused\"" 2

# skip all sources
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"handle-error test skip" \
"\"result\": true" 2 \
"\"source 'mysql-replica-01' has no error\"" 1
"only support to handle ddl error currently" 2

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 2

# '11' -> 11, '22' -> 22, no error
run_sql_source1 "insert into ${db}.${tb1} values('111',7)"
run_sql_source2 "insert into ${db}.${tb2} values('222',8)"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"stage\": \"Running\"" 2

run_sql_tidb_with_retry "select count(1) from ${db}.${tb1} where id=111;" "count(1): 1"
run_sql_tidb_with_retry "select count(1) from ${db}.${tb2} where id=222;" "count(1): 1"
"\"stage\": \"Paused\"" 2
}

function DM_SKIP_ERROR() {
Expand Down