Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
  • Loading branch information
Ehco1996 committed Dec 29, 2021
1 parent bb57be6 commit 08da001
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 27 deletions.
6 changes: 6 additions & 0 deletions dm/dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,12 @@ func (st *SubTask) PrevUnit() unit.Unit {

// closeUnits closes all un-closed units (current unit and all the subsequent units).
func (st *SubTask) closeUnits(graceful bool) {
// when not graceful, we want to syncer to exit immediately, so we call u.Close(false) before call cancel
// Note that we only implement un graceful close for sync unit
if !graceful && st.CurrUnit().Type() == pb.UnitType_Sync {
st.l.Info("closing syncer in ungraceful mode", zap.String("task", st.cfg.Name))
st.CurrUnit().Close(false)
}
st.cancel()
st.resultWg.Wait()

Expand Down
46 changes: 19 additions & 27 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ type Syncer struct {
waitXIDJob atomic.Int64
isTransactionEnd bool
waitTransactionLock sync.Mutex
noWaitCtx context.Context // when this cancel, syncer will exit quickly no need to wait transaction end
noWaitCancel context.CancelFunc
runCtx context.Context // this ctx is injected in `s.Run` and when this ctx cancelled, syncer will exit quickly and not wait transaction end
runCancel context.CancelFunc

tableRouter *router.Table
binlogFilter *bf.BinlogEvent
Expand Down Expand Up @@ -275,10 +275,6 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, relay rel
}
syncer.lastCheckpointFlushedTime = time.Time{}
syncer.relay = relay

noWaitCtx, noWaitCancel := context.WithCancel(context.Background())
syncer.noWaitCtx = noWaitCtx
syncer.noWaitCancel = noWaitCancel
return syncer
}

Expand Down Expand Up @@ -570,9 +566,6 @@ func (s *Syncer) reset() {
s.waitXIDJob.Store(int64(noWait))
s.isTransactionEnd = true
s.flushSeq = 0
noWaitCtx, noWaitCancel := context.WithCancel(context.Background())
s.noWaitCtx = noWaitCtx
s.noWaitCancel = noWaitCancel
switch s.cfg.ShardMode {
case config.ShardPessimistic:
// every time start to re-sync from resume, we reset status to make it like a fresh syncing
Expand Down Expand Up @@ -980,7 +973,7 @@ func (s *Syncer) addJob(job *job) error {
s.isTransactionEnd = false
failpoint.Inject("checkCheckpointInMiddleOfTransaction", func() {
s.tctx.L().Info("receive dml job", zap.Any("dml job", job))
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
})
}

Expand Down Expand Up @@ -1393,41 +1386,39 @@ func (s *Syncer) syncDML() {
}
}

func (s *Syncer) waitTransactionEndBeforeExit(ctx, runCtx context.Context, cancel context.CancelFunc) {
<-ctx.Done()
func (s *Syncer) waitTransactionEndBeforeExit(ctx context.Context) {
select {
case <-runCtx.Done():
default:
log.L().Info("received subtask's done")

case <-ctx.Done(): // hijack the context to wait for the transaction to end.
log.L().Info("received subtask's done, try graceful stop")
s.waitTransactionLock.Lock()
if s.isTransactionEnd {
s.waitXIDJob.Store(int64(waitComplete))
log.L().Info("the last job is transaction end, done directly")
cancel()
s.runCancel()
s.waitTransactionLock.Unlock()
return
}
s.waitXIDJob.Store(int64(waiting))
s.waitTransactionLock.Unlock()

select {
case <-runCtx.Done():
log.L().Info("received syncer's done")
case <-s.noWaitCtx.Done():
log.L().Info("received no wait ctx done")
cancel()
case <-s.runCtx.Done():
log.L().Info("received run ctx done,exit now")
case <-time.After(maxPauseOrStopWaitTime):
log.L().Info("wait transaction end timeout")
cancel()
log.L().Info("wait transaction end timeout,exit now")
}
case <-s.runCtx.Done(): // when no graceful stop, run ctx will canceled first.
log.L().Info("received ungraceful exit ctx,exit now")
}
}

// Run starts running for sync, we should guarantee it can rerun when paused.
func (s *Syncer) Run(ctx context.Context) (err error) {
runCtx, runCancel := context.WithCancel(context.Background())
defer runCancel()
s.RWMutex.Lock()
s.runCtx = runCtx
s.runCancel = runCancel
s.RWMutex.Unlock()
tctx := s.tctx.WithContext(runCtx)

defer func() {
Expand All @@ -1437,7 +1428,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
}()

// start another goroutine to wait until transaction end.
go s.waitTransactionEndBeforeExit(ctx, runCtx, runCancel)
go s.waitTransactionEndBeforeExit(ctx)

// some initialization that can't be put in Syncer.Init
fresh, err := s.IsFreshTask(runCtx)
Expand Down Expand Up @@ -3183,7 +3174,8 @@ func (s *Syncer) Close(graceful bool) {
}

if !graceful {
s.noWaitCancel()
s.tctx.L().Warn("syncer is closed without graceful")
s.runCancel()
}
s.stopSync()
s.closeDBs()
Expand Down
1 change: 1 addition & 0 deletions dm/tests/checkpoint_transaction/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
name = "worker1"
join = "127.0.0.1:8261"
keepalive-ttl = 1
29 changes: 29 additions & 0 deletions dm/tests/checkpoint_transaction/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,35 @@ function run() {
# check diff
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# test ungraceful stop, worker will not wait transaction finish
run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
sleep 2
# kill dm-master 1 to make worker lost keepalive while a transaction is not finished
echo "kill dm-master1"
ps aux | grep dm-master | awk '{print $2}' | xargs kill || true
check_master_port_offline 1

# check dm-worker2 will exit quickly without waiting for the transaction to finish
num=$(grep "closing syncer in ungraceful mod" $WORK_DIR/worker1/log/dm-worker.log | wc -l)
[[ $num -gt 0 ]]

num=$(grep "syncer is closed without graceful" $WORK_DIR/worker1/log/dm-worker.log | wc -l)
[[ $num -gt 0 ]]

num=$(grep "received ungraceful exit ctx,exit now" $WORK_DIR/worker1/log/dm-worker.log | wc -l)
[[ $num -gt 0 ]]
# test data in tidb less than source

dataCountSource=$(mysql -uroot -P$MYSQL_PORT1 -p$MYSQL_PASSWORD1 -se "select count(1) from checkpoint_transaction.t1")
dataCountIntidb=$(mysql -uroot -P4000 -se "select count(1) from checkpoint_transaction.t1")
echo "afetr ungraceful exit data in source count: $dataCountSource data in tidb count: $dataCountIntidb"
[[ $dataCountIntidb -lt $dataCountSource ]]

# start dm-master again task will be resume, and data will be synced
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
# wait transaction start
# you can see why sleep in https://github.com/pingcap/dm/pull/1928#issuecomment-895820239
Expand Down

0 comments on commit 08da001

Please sign in to comment.