diff --git a/ddl/ddl.go b/ddl/ddl.go index 86d0c144045c8..c7476b816a4c0 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -310,6 +310,7 @@ type limitJobTask struct { type ddl struct { m sync.RWMutex quitCh chan struct{} + wg sync.WaitGroup // It's only used to deal with data race in state_test and schema_test. limitJobCh chan *limitJobTask *ddlCtx @@ -456,13 +457,17 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", RunWorker)) d.quitCh = make(chan struct{}) - go tidbutil.WithRecovery( - func() { d.limitDDLJobs() }, - func(r interface{}) { - logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic", - zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace")) - metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc() - }) + d.wg.Add(1) + go func() { + defer d.wg.Done() + tidbutil.WithRecovery( + func() { d.limitDDLJobs() }, + func(r interface{}) { + logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic", + zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace")) + metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc() + }) + }() // If RunWorker is true, we need campaign owner and do DDL job. // Otherwise, we needn't do that. @@ -513,6 +518,7 @@ func (d *ddl) close() { startTime := time.Now() close(d.quitCh) + d.wg.Wait() d.ownerManager.Cancel() d.schemaSyncer.CloseCleanWork() err := d.schemaSyncer.RemoveSelfVersionPath() diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index d39db842f5c3d..64e67bcfb8065 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -71,13 +71,18 @@ func (d *ddl) generalWorker() *worker { // It only starts the original workers. func (d *ddl) restartWorkers(ctx context.Context) { d.quitCh = make(chan struct{}) - go util.WithRecovery( - func() { d.limitDDLJobs() }, - func(r interface{}) { - logutil.BgLogger().Error("[ddl] DDL add batch DDL jobs meet panic", - zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace")) - metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc() - }) + + d.wg.Add(1) + go func() { + defer d.wg.Done() + util.WithRecovery( + func() { d.limitDDLJobs() }, + func(r interface{}) { + logutil.BgLogger().Error("[ddl] DDL add batch DDL jobs meet panic", + zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace")) + metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc() + }) + }() if !RunWorker { return } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 3bd4edc7f3e81..9f84b5e5389a5 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -116,18 +116,6 @@ func (s *testDDLSuite) TestReorg(c *C) { } c.Assert(err, IsNil) - d.Stop() - err = d.generalWorker().runReorgJob(m, rInfo, nil, d.lease, func() error { - time.Sleep(4 * testLease) - return nil - }) - c.Assert(err, NotNil) - txn, err = ctx.Txn(true) - c.Assert(err, IsNil) - err = txn.Commit(context.Background()) - c.Assert(err, IsNil) - - d.start(context.Background(), nil) job = &model.Job{ ID: 2, SchemaID: 1, @@ -157,6 +145,17 @@ func (s *testDDLSuite) TestReorg(c *C) { return nil }) c.Assert(err, IsNil) + + d.Stop() + err = d.generalWorker().runReorgJob(m, rInfo, nil, d.lease, func() error { + time.Sleep(4 * testLease) + return nil + }) + c.Assert(err, NotNil) + txn, err = ctx.Txn(true) + c.Assert(err, IsNil) + err = txn.Commit(context.Background()) + c.Assert(err, IsNil) } func (s *testDDLSuite) TestReorgOwner(c *C) { diff --git a/ddl/schema_test.go b/ddl/schema_test.go index b9c2485a08189..9cea3ec0b7e97 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -226,6 +226,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) { func testRunInterruptedJob(c *C, d *ddl, job *model.Job) { ctx := mock.NewContext() ctx.Store = d.store + done := make(chan error, 1) go func() { done <- d.doDDLJob(ctx, job) @@ -233,7 +234,6 @@ func testRunInterruptedJob(c *C, d *ddl, job *model.Job) { ticker := time.NewTicker(d.lease * 1) defer ticker.Stop() - LOOP: for { select { diff --git a/ddl/stat_test.go b/ddl/stat_test.go index 37a60d8a8df72..0bb6b18b6e56a 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -51,8 +51,6 @@ func (s *testStatSuite) TestStat(c *C) { ) defer d.Stop() - time.Sleep(testLease) - dbInfo := testSchemaInfo(c, d, "test") testCreateSchema(c, testNewContext(d), d, dbInfo) @@ -77,13 +75,12 @@ func (s *testStatSuite) TestStat(c *C) { ticker := time.NewTicker(d.lease * 1) defer ticker.Stop() - ver := s.getDDLSchemaVer(c, d) LOOP: for { select { case <-ticker.C: - d.close() + d.Stop() c.Assert(s.getDDLSchemaVer(c, d), GreaterEqual, ver) d.restartWorkers(context.Background()) time.Sleep(time.Millisecond * 20) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 5b172ab733461..50f9402ca51d3 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1340,7 +1340,7 @@ func (s *testOOMSuite) TearDownSuite(c *C) { func (s *testOOMSuite) registerHook() { conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}} _, r, _ := log.InitLogger(conf) - s.oom = &oomCapturer{r.Core, ""} + s.oom = &oomCapturer{r.Core, "", sync.Mutex{}} lg := zap.New(s.oom) log.ReplaceGlobals(lg, r) } @@ -1483,6 +1483,7 @@ func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) { type oomCapturer struct { zapcore.Core tracker string + mu sync.Mutex } func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error { @@ -1500,7 +1501,10 @@ func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error { h.tracker = str[begin+len("8001]") : end] return nil } + + h.mu.Lock() h.tracker = entry.Message + h.mu.Unlock() return nil }