Skip to content

Commit

Permalink
ddl, executor: fix data race
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala committed Mar 4, 2020
1 parent a50ad38 commit 943f125
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 32 deletions.
20 changes: 13 additions & 7 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ type limitJobTask struct {
type ddl struct {
m sync.RWMutex
quitCh chan struct{}
wg sync.WaitGroup // It's only used to deal with data race in state_test and schema_test.
limitJobCh chan *limitJobTask

*ddlCtx
Expand Down Expand Up @@ -456,13 +457,17 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", RunWorker))
d.quitCh = make(chan struct{})

go tidbutil.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
})
d.wg.Add(1)
go func() {
defer d.wg.Done()
tidbutil.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
})
}()

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
Expand Down Expand Up @@ -513,6 +518,7 @@ func (d *ddl) close() {

startTime := time.Now()
close(d.quitCh)
d.wg.Wait()
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
Expand Down
19 changes: 12 additions & 7 deletions ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,18 @@ func (d *ddl) generalWorker() *worker {
// It only starts the original workers.
func (d *ddl) restartWorkers(ctx context.Context) {
d.quitCh = make(chan struct{})
go util.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
logutil.BgLogger().Error("[ddl] DDL add batch DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
})

d.wg.Add(1)
go func() {
defer d.wg.Done()
util.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
logutil.BgLogger().Error("[ddl] DDL add batch DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
})
}()
if !RunWorker {
return
}
Expand Down
23 changes: 11 additions & 12 deletions ddl/reorg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,6 @@ func (s *testDDLSuite) TestReorg(c *C) {
}
c.Assert(err, IsNil)

d.Stop()
err = d.generalWorker().runReorgJob(m, rInfo, nil, d.lease, func() error {
time.Sleep(4 * testLease)
return nil
})
c.Assert(err, NotNil)
txn, err = ctx.Txn(true)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)

d.start(context.Background(), nil)
job = &model.Job{
ID: 2,
SchemaID: 1,
Expand Down Expand Up @@ -157,6 +145,17 @@ func (s *testDDLSuite) TestReorg(c *C) {
return nil
})
c.Assert(err, IsNil)

d.Stop()
err = d.generalWorker().runReorgJob(m, rInfo, nil, d.lease, func() error {
time.Sleep(4 * testLease)
return nil
})
c.Assert(err, NotNil)
txn, err = ctx.Txn(true)
c.Assert(err, IsNil)
err = txn.Commit(context.Background())
c.Assert(err, IsNil)
}

func (s *testDDLSuite) TestReorgOwner(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion ddl/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,14 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) {
func testRunInterruptedJob(c *C, d *ddl, job *model.Job) {
ctx := mock.NewContext()
ctx.Store = d.store

done := make(chan error, 1)
go func() {
done <- d.doDDLJob(ctx, job)
}()

ticker := time.NewTicker(d.lease * 1)
defer ticker.Stop()

LOOP:
for {
select {
Expand Down
5 changes: 1 addition & 4 deletions ddl/stat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ func (s *testStatSuite) TestStat(c *C) {
)
defer d.Stop()

time.Sleep(testLease)

dbInfo := testSchemaInfo(c, d, "test")
testCreateSchema(c, testNewContext(d), d, dbInfo)

Expand All @@ -77,13 +75,12 @@ func (s *testStatSuite) TestStat(c *C) {

ticker := time.NewTicker(d.lease * 1)
defer ticker.Stop()

ver := s.getDDLSchemaVer(c, d)
LOOP:
for {
select {
case <-ticker.C:
d.close()
d.Stop()
c.Assert(s.getDDLSchemaVer(c, d), GreaterEqual, ver)
d.restartWorkers(context.Background())
time.Sleep(time.Millisecond * 20)
Expand Down
6 changes: 5 additions & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,7 +1340,7 @@ func (s *testOOMSuite) TearDownSuite(c *C) {
func (s *testOOMSuite) registerHook() {
conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
s.oom = &oomCapturer{r.Core, ""}
s.oom = &oomCapturer{r.Core, "", sync.Mutex{}}
lg := zap.New(s.oom)
log.ReplaceGlobals(lg, r)
}
Expand Down Expand Up @@ -1483,6 +1483,7 @@ func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) {
type oomCapturer struct {
zapcore.Core
tracker string
mu sync.Mutex
}

func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error {
Expand All @@ -1500,7 +1501,10 @@ func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error {
h.tracker = str[begin+len("8001]") : end]
return nil
}

h.mu.Lock()
h.tracker = entry.Message
h.mu.Unlock()
return nil
}

Expand Down

0 comments on commit 943f125

Please sign in to comment.