Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add a channel to limit multiple DDL jobs writing at the same time #14342

Merged
merged 13 commits into from
Mar 5, 2020
16 changes: 11 additions & 5 deletions cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
zaplog "github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -103,9 +104,8 @@ func (s *TestDDLSuite) SetUpSuite(c *C) {
// Make sure the schema lease of this session is equal to other TiDB servers'.
session.SetSchemaLease(time.Duration(*lease) * time.Second)

dom, err := session.BootstrapSession(s.store)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom = dom

s.s, err = session.CreateSession(s.store)
c.Assert(err, IsNil)
Expand All @@ -115,14 +115,20 @@ func (s *TestDDLSuite) SetUpSuite(c *C) {
_, err = s.s.Execute(goCtx, "create database if not exists test_ddl")
c.Assert(err, IsNil)

_, err = s.s.Execute(goCtx, "use test_ddl")
c.Assert(err, IsNil)

s.Bootstrap(c)

// Stop current DDL worker, so that we can't be the owner now.
err = domain.GetDomain(s.ctx).DDL().Stop()
c.Assert(err, IsNil)
ddl.RunWorker = false
session.ResetStoreForWithTiKVTest(s.store)
s.s, err = session.CreateSession(s.store)
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.ctx = s.s.(sessionctx.Context)
_, err = s.s.Execute(goCtx, "use test_ddl")
c.Assert(err, IsNil)

addEnvPath("..")

Expand Down
4 changes: 0 additions & 4 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
}

func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Context, d *ddl, tblInfo *model.TableInfo) {
d.Stop()
tc := &TestDDLCallback{}
// set up hook
prevState := model.StateNone
Expand Down Expand Up @@ -194,14 +193,12 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Cont
}
}
d.SetHook(tc)
d.start(context.Background(), nil)
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, nil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
testCheckJobDone(c, d, job, true)
}

func (s *testColumnChangeSuite) testColumnDrop(c *C, ctx sessionctx.Context, d *ddl, tbl table.Table) {
d.Stop()
dropCol := tbl.Cols()[2]
tc := &TestDDLCallback{}
// set up hook
Expand All @@ -223,7 +220,6 @@ func (s *testColumnChangeSuite) testColumnDrop(c *C, ctx sessionctx.Context, d *
}
}
d.SetHook(tc)
d.start(context.Background(), nil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
testDropColumn(c, ctx, d, s.dbInfo, tbl.Meta(), dropCol.Name.L, false)
}
Expand Down
111 changes: 50 additions & 61 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,22 @@ var _ = Suite(&testColumnSuite{})
type testColumnSuite struct {
store kv.Storage
dbInfo *model.DBInfo

d *ddl
}

func (s *testColumnSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_column")
s.d = newDDL(
d := newDDL(
context.Background(),
WithStore(s.store),
WithLease(testLease),
)

s.dbInfo = testSchemaInfo(c, s.d, "test_column")
testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo)
s.dbInfo = testSchemaInfo(c, d, "test_column")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
d.Stop()
}

func (s *testColumnSuite) TearDownSuite(c *C) {
testDropSchema(c, testNewContext(s.d), s.d, s.dbInfo)
s.d.Stop()

err := s.store.Close()
c.Assert(err, IsNil)
}
Expand Down Expand Up @@ -118,11 +114,18 @@ func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
}

func (s *testColumnSuite) TestColumn(c *C) {
tblInfo := testTableInfo(c, s.d, "t1", 3)
ctx := testNewContext(s.d)
d := newDDL(
context.Background(),
WithStore(s.store),
WithLease(testLease),
)
defer d.Stop()

tblInfo := testTableInfo(c, d, "t1", 3)
ctx := testNewContext(d)

testCreateTable(c, ctx, s.d, s.dbInfo, tblInfo)
t := testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

num := 10
for i := 0; i < num; i++ {
Expand All @@ -147,10 +150,10 @@ func (s *testColumnSuite) TestColumn(c *C) {

c.Assert(table.FindCol(t.Cols(), "c4"), IsNil)

job := testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionAfter, RelativeColumn: &ast.ColumnName{Name: model.NewCIStr("c3")}}, 100)
testCheckJobDone(c, s.d, job, true)
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionAfter, RelativeColumn: &ast.ColumnName{Name: model.NewCIStr("c3")}}, 100)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
c.Assert(table.FindCol(t.Cols(), "c4"), NotNil)

i = int64(0)
Expand All @@ -177,40 +180,40 @@ func (s *testColumnSuite) TestColumn(c *C) {
c.Assert(values, HasLen, 4)
c.Assert(values[3].GetInt64(), Equals, int64(14))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, d, job, false)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 3)
c.Assert(values[2].GetInt64(), Equals, int64(13))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 111)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 111)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 4)
c.Assert(values[3].GetInt64(), Equals, int64(111))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 101)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 101)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 5)
c.Assert(values[4].GetInt64(), Equals, int64(101))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c6", &ast.ColumnPosition{Tp: ast.ColumnPositionFirst}, 202)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c6", &ast.ColumnPosition{Tp: ast.ColumnPositionFirst}, 202)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
cols := t.Cols()
c.Assert(cols, HasLen, 6)
c.Assert(cols[0].Offset, Equals, 0)
Expand All @@ -233,40 +236,40 @@ func (s *testColumnSuite) TestColumn(c *C) {
c.Assert(values[0].GetInt64(), Equals, int64(202))
c.Assert(values[5].GetInt64(), Equals, int64(101))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c2", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c2", false)
testCheckJobDone(c, d, job, false)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)
c.Assert(values, HasLen, 5)
c.Assert(values[0].GetInt64(), Equals, int64(202))
c.Assert(values[4].GetInt64(), Equals, int64(101))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c1", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c1", false)
testCheckJobDone(c, d, job, false)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c3", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", false)
testCheckJobDone(c, d, job, false)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, d, job, false)

job = testCreateIndex(c, ctx, s.d, s.dbInfo, tblInfo, false, "c5_idx", "c5")
testCheckJobDone(c, s.d, job, true)
job = testCreateIndex(c, ctx, d, s.dbInfo, tblInfo, false, "c5_idx", "c5")
testCheckJobDone(c, d, job, true)

testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", true)
testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", true)

testDropIndex(c, ctx, s.d, s.dbInfo, tblInfo, "c5_idx")
testCheckJobDone(c, s.d, job, true)
testDropIndex(c, ctx, d, s.dbInfo, tblInfo, "c5_idx")
testCheckJobDone(c, d, job, true)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", false)
testCheckJobDone(c, d, job, false)

testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c6", true)
testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c6", true)

testDropTable(c, ctx, s.d, s.dbInfo, tblInfo)
testDropTable(c, ctx, d, s.dbInfo, tblInfo)
}

func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Table, handle int64, col *table.Column, columnValue interface{}, isExist bool) error {
Expand Down Expand Up @@ -818,12 +821,6 @@ func (s *testColumnSuite) TestAddColumn(c *C) {

d.SetHook(tc)

// Use local ddl for callback test.
s.d.Stop()

d.Stop()
d.start(context.Background(), nil)

job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, newColName, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, defaultColValue)

testCheckJobDone(c, d, job, true)
Expand All @@ -846,7 +843,6 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
c.Assert(err, IsNil)

d.Stop()
s.d.start(context.Background(), nil)
}

func (s *testColumnSuite) TestDropColumn(c *C) {
Expand All @@ -855,7 +851,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
WithStore(s.store),
WithLease(testLease),
)
tblInfo := testTableInfo(c, d, "t", 4)
tblInfo := testTableInfo(c, d, "t2", 4)
ctx := testNewContext(d)

err := ctx.NewTxn(context.Background())
Expand Down Expand Up @@ -900,13 +896,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {

d.SetHook(tc)

// Use local ddl for callback test.
s.d.Stop()

d.Stop()
d.start(context.Background(), nil)

job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false)
job := testDropColumn(c, ctx, d, s.dbInfo, tblInfo, colName, false)
testCheckJobDone(c, d, job, false)
mu.Lock()
hErr := hookErr
Expand All @@ -927,7 +917,6 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
c.Assert(err, IsNil)

d.Stop()
s.d.start(context.Background(), nil)
}

func (s *testColumnSuite) TestModifyColumn(c *C) {
Expand Down
38 changes: 33 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (

shardRowIDBitsMax = 15

batchAddingJobs = 10

// PartitionCountLimit is limit of the number of partitions in a table.
// Mysql maximum number of partitions is 8192, our maximum number of partitions is 1024.
// Reference linking https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations.html.
Expand Down Expand Up @@ -298,10 +300,17 @@ type DDL interface {
GetHook() Callback
}

type limitJobTask struct {
job *model.Job
err chan error
}

// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
m sync.RWMutex
quitCh chan struct{}
m sync.RWMutex
quitCh chan struct{}
wg sync.WaitGroup // It's only used to deal with data race in state_test and schema_test.
limitJobCh chan *limitJobTask

*ddlCtx
workers map[workerType]*worker
Expand Down Expand Up @@ -407,7 +416,8 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
d := &ddl{
ddlCtx: ddlCtx,
ddlCtx: ddlCtx,
limitJobCh: make(chan *limitJobTask, batchAddingJobs),
}

d.start(ctx, opt.ResourcePool)
Expand Down Expand Up @@ -446,6 +456,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
logutil.BgLogger().Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", RunWorker))
d.quitCh = make(chan struct{})

d.wg.Add(1)
go func() {
defer d.wg.Done()
tidbutil.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
if r != nil {
logutil.BgLogger().Error("[ddl] limit DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
})
}()

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
if RunWorker {
Expand All @@ -465,7 +489,7 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
func(r interface{}) {
if r != nil {
logutil.Logger(w.logCtx).Error("[ddl] DDL worker meet panic", zap.String("ID", d.uuid))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLWorker).Inc()
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, worker.String())).Inc()
Expand Down Expand Up @@ -495,6 +519,7 @@ func (d *ddl) close() {

startTime := time.Now()
close(d.quitCh)
d.wg.Wait()
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
Expand Down Expand Up @@ -596,7 +621,10 @@ func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Get a global job ID and put the DDL job in the queue.
err := d.addDDLJob(ctx, job)
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
task := &limitJobTask{job, make(chan error)}
d.limitJobCh <- task
err := <-task.err
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading