From e7e365714d55253d3f871ac4029dde84285d790c Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Mon, 15 Nov 2021 22:21:06 +0800 Subject: [PATCH] compactor: fix data race (#3417) --- dm/syncer/syncer.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index 3dfe640503b..a23662d1edc 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -926,7 +926,11 @@ func (s *Syncer) addJob(job *job) error { s.tctx.L().Info("All jobs is completed before syncer close, the coming job will be reject", zap.Any("job", job)) return nil } - switch job.tp { + + // avoid job.type data race with compactor.run() + // simply copy the opType for performance, though copy a new job in compactor is better + tp := job.tp + switch tp { case xid: s.waitXIDJob.CAS(int64(waiting), int64(waitComplete)) s.saveGlobalPoint(job.location) @@ -978,7 +982,7 @@ func (s *Syncer) addJob(job *job) error { return nil } - switch job.tp { + switch tp { case ddl: failpoint.Inject("ExitAfterDDLBeforeFlush", func() { s.tctx.L().Warn("exit triggered", zap.String("failpoint", "ExitAfterDDLBeforeFlush")) @@ -1015,7 +1019,7 @@ func (s *Syncer) addJob(job *job) error { } } - if needFlush || job.tp == ddl { + if needFlush || tp == ddl { // interrupted after save checkpoint and before flush checkpoint. failpoint.Inject("FlushCheckpointStage", func(val failpoint.Value) { err := handleFlushCheckpointStage(4, val.(int), "before flush checkpoint")