Skip to content

Commit

Permalink
validator(dm): fix deadlock of validator (#7269)
Browse files Browse the repository at this point in the history
close #7241
  • Loading branch information
buchuitoudegou authored Oct 10, 2022
1 parent 0f2a253 commit 2e06aef
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
7 changes: 6 additions & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,10 +725,10 @@ func (v *DataValidator) Stop() {

func (v *DataValidator) stopInner() {
v.Lock()
defer v.Unlock()
v.L.Info("stopping")
if v.Stage() != pb.Stage_Running {
v.L.Warn("not started")
v.Unlock()
return
}

Expand All @@ -737,8 +737,13 @@ func (v *DataValidator) stopInner() {
v.fromDB.Close()
v.toDB.Close()

// release the lock so that the error routine can process errors
// wait until all errors are recorded
v.Unlock()
v.wg.Wait()
close(v.errChan) // close error chan after all possible sender goroutines stopped
v.Lock() // lock and modify the stage
defer v.Unlock()

v.setStage(pb.Stage_Stopped)
v.L.Info("stopped")
Expand Down
36 changes: 36 additions & 0 deletions dm/syncer/data_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,42 @@ func TestValidatorErrorProcessRoutine(t *testing.T) {
require.Len(t, validator.result.Errors, 1)
}

func TestValidatorDeadLock(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tiflow/dm/syncer/ValidatorMockUpstreamTZ", `return()`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tiflow/dm/syncer/ValidatorMockUpstreamTZ"))
}()
cfg := genSubtaskConfig(t)
syncerObj := NewSyncer(cfg, nil, nil)
_, _, err := conn.InitMockDBFull()
require.NoError(t, err)
defer func() {
conn.DefaultDBProvider = &conn.DefaultDBProviderImpl{}
}()

validator := NewContinuousDataValidator(cfg, syncerObj, false)
validator.persistHelper.schemaInitialized.Store(true)
validator.Start(pb.Stage_Running)
require.Equal(t, pb.Stage_Running, validator.Stage())
validator.wg.Add(1)
go func() {
defer func() {
// ignore panic when try to insert error to a closed channel,
// which will happen after the validator is successfully stopped.
// The panic is expected.
validator.wg.Done()
// nolint:errcheck
recover()
}()
for i := 0; i < 100; i++ {
validator.sendError(context.Canceled) // prevent from stopping the validator
}
}()
// stuck if the validator doesn't unlock before waiting wg
validator.Stop()
require.Equal(t, pb.Stage_Stopped, validator.Stage())
}

type mockedCheckPointForValidator struct {
CheckPoint
cnt int
Expand Down

0 comments on commit 2e06aef

Please sign in to comment.