Skip to content

Commit

Permalink
ddl: add a channel to limit multiple DDL jobs writing at the same time (
Browse files Browse the repository at this point in the history
  • Loading branch information
sre-bot authored Mar 11, 2020
1 parent 3d41a61 commit 315b9f2
Show file tree
Hide file tree
Showing 17 changed files with 266 additions and 127 deletions.
16 changes: 11 additions & 5 deletions cmd/ddltest/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
zaplog "github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -104,9 +105,8 @@ func (s *TestDDLSuite) SetUpSuite(c *C) {
// Make sure the schema lease of this session is equal to other TiDB servers'.
session.SetSchemaLease(time.Duration(*lease) * time.Second)

dom, err := session.BootstrapSession(s.store)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.dom = dom

s.s, err = session.CreateSession(s.store)
c.Assert(err, IsNil)
Expand All @@ -116,14 +116,20 @@ func (s *TestDDLSuite) SetUpSuite(c *C) {
_, err = s.s.Execute(goCtx, "create database if not exists test_ddl")
c.Assert(err, IsNil)

_, err = s.s.Execute(goCtx, "use test_ddl")
c.Assert(err, IsNil)

s.Bootstrap(c)

// Stop current DDL worker, so that we can't be the owner now.
err = domain.GetDomain(s.ctx).DDL().Stop()
c.Assert(err, IsNil)
ddl.RunWorker = false
session.ResetStoreForWithTiKVTest(s.store)
s.s, err = session.CreateSession(s.store)
c.Assert(err, IsNil)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
s.ctx = s.s.(sessionctx.Context)
_, err = s.s.Execute(goCtx, "use test_ddl")
c.Assert(err, IsNil)

addEnvPath("..")

Expand Down
4 changes: 0 additions & 4 deletions ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
}

func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Context, d *ddl, tblInfo *model.TableInfo) {
d.Stop()
tc := &TestDDLCallback{}
// set up hook
prevState := model.StateNone
Expand Down Expand Up @@ -190,14 +189,12 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx sessionctx.Cont
}
}
d.SetHook(tc)
d.start(context.Background(), nil)
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, nil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
testCheckJobDone(c, d, job, true)
}

func (s *testColumnChangeSuite) testColumnDrop(c *C, ctx sessionctx.Context, d *ddl, tbl table.Table) {
d.Stop()
dropCol := tbl.Cols()[2]
tc := &TestDDLCallback{}
// set up hook
Expand All @@ -219,7 +216,6 @@ func (s *testColumnChangeSuite) testColumnDrop(c *C, ctx sessionctx.Context, d *
}
}
d.SetHook(tc)
d.start(context.Background(), nil)
c.Assert(errors.ErrorStack(checkErr), Equals, "")
testDropColumn(c, ctx, d, s.dbInfo, tbl.Meta(), dropCol.Name.L, false)
}
Expand Down
107 changes: 46 additions & 61 deletions ddl/column_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,18 @@ var _ = Suite(&testColumnSuite{})
type testColumnSuite struct {
store kv.Storage
dbInfo *model.DBInfo

d *ddl
}

func (s *testColumnSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_column")
s.d = testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)

s.dbInfo = testSchemaInfo(c, s.d, "test_column")
testCreateSchema(c, testNewContext(s.d), s.d, s.dbInfo)
s.dbInfo = testSchemaInfo(c, d, "test_column")
testCreateSchema(c, testNewContext(d), d, s.dbInfo)
d.Stop()
}

func (s *testColumnSuite) TearDownSuite(c *C) {
testDropSchema(c, testNewContext(s.d), s.d, s.dbInfo)
s.d.Stop()

err := s.store.Close()
c.Assert(err, IsNil)
}
Expand Down Expand Up @@ -114,11 +110,14 @@ func testDropColumn(c *C, ctx sessionctx.Context, d *ddl, dbInfo *model.DBInfo,
}

func (s *testColumnSuite) TestColumn(c *C) {
tblInfo := testTableInfo(c, s.d, "t1", 3)
ctx := testNewContext(s.d)
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
defer d.Stop()

tblInfo := testTableInfo(c, d, "t1", 3)
ctx := testNewContext(d)

testCreateTable(c, ctx, s.d, s.dbInfo, tblInfo)
t := testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

num := 10
for i := 0; i < num; i++ {
Expand All @@ -143,10 +142,10 @@ func (s *testColumnSuite) TestColumn(c *C) {

c.Assert(table.FindCol(t.Cols(), "c4"), IsNil)

job := testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionAfter, RelativeColumn: &ast.ColumnName{Name: model.NewCIStr("c3")}}, 100)
testCheckJobDone(c, s.d, job, true)
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionAfter, RelativeColumn: &ast.ColumnName{Name: model.NewCIStr("c3")}}, 100)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
c.Assert(table.FindCol(t.Cols(), "c4"), NotNil)

i = int64(0)
Expand All @@ -173,40 +172,40 @@ func (s *testColumnSuite) TestColumn(c *C) {
c.Assert(values, HasLen, 4)
c.Assert(values[3].GetInt64(), Equals, int64(14))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, d, job, false)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 3)
c.Assert(values[2].GetInt64(), Equals, int64(13))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 111)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 111)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 4)
c.Assert(values[3].GetInt64(), Equals, int64(111))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 101)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, 101)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)

c.Assert(values, HasLen, 5)
c.Assert(values[4].GetInt64(), Equals, int64(101))

job = testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c6", &ast.ColumnPosition{Tp: ast.ColumnPositionFirst}, 202)
testCheckJobDone(c, s.d, job, true)
job = testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, "c6", &ast.ColumnPosition{Tp: ast.ColumnPositionFirst}, 202)
testCheckJobDone(c, d, job, true)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
cols := t.Cols()
c.Assert(cols, HasLen, 6)
c.Assert(cols[0].Offset, Equals, 0)
Expand All @@ -229,40 +228,40 @@ func (s *testColumnSuite) TestColumn(c *C) {
c.Assert(values[0].GetInt64(), Equals, int64(202))
c.Assert(values[5].GetInt64(), Equals, int64(101))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c2", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c2", false)
testCheckJobDone(c, d, job, false)

t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)

values, err = t.RowWithCols(ctx, h, t.Cols())
c.Assert(err, IsNil)
c.Assert(values, HasLen, 5)
c.Assert(values[0].GetInt64(), Equals, int64(202))
c.Assert(values[4].GetInt64(), Equals, int64(101))

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c1", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c1", false)
testCheckJobDone(c, d, job, false)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c3", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c3", false)
testCheckJobDone(c, d, job, false)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c4", false)
testCheckJobDone(c, d, job, false)

job = testCreateIndex(c, ctx, s.d, s.dbInfo, tblInfo, false, "c5_idx", "c5")
testCheckJobDone(c, s.d, job, true)
job = testCreateIndex(c, ctx, d, s.dbInfo, tblInfo, false, "c5_idx", "c5")
testCheckJobDone(c, d, job, true)

testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", true)
testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", true)

testDropIndex(c, ctx, s.d, s.dbInfo, tblInfo, "c5_idx")
testCheckJobDone(c, s.d, job, true)
testDropIndex(c, ctx, d, s.dbInfo, tblInfo, "c5_idx")
testCheckJobDone(c, d, job, true)

job = testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c5", false)
testCheckJobDone(c, s.d, job, false)
job = testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c5", false)
testCheckJobDone(c, d, job, false)

testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, "c6", true)
testDropColumn(c, ctx, d, s.dbInfo, tblInfo, "c6", true)

testDropTable(c, ctx, s.d, s.dbInfo, tblInfo)
testDropTable(c, ctx, d, s.dbInfo, tblInfo)
}

func (s *testColumnSuite) checkColumnKVExist(ctx sessionctx.Context, t table.Table, handle int64, col *table.Column, columnValue interface{}, isExist bool) error {
Expand Down Expand Up @@ -810,12 +809,6 @@ func (s *testColumnSuite) TestAddColumn(c *C) {

d.SetHook(tc)

// Use local ddl for callback test.
s.d.Stop()

d.Stop()
d.start(context.Background(), nil)

job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, newColName, &ast.ColumnPosition{Tp: ast.ColumnPositionNone}, defaultColValue)

testCheckJobDone(c, d, job, true)
Expand All @@ -838,12 +831,11 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
c.Assert(err, IsNil)

d.Stop()
s.d.start(context.Background(), nil)
}

func (s *testColumnSuite) TestDropColumn(c *C) {
d := testNewDDL(context.Background(), nil, s.store, nil, nil, testLease)
tblInfo := testTableInfo(c, d, "t", 4)
tblInfo := testTableInfo(c, d, "t2", 4)
ctx := testNewContext(d)

err := ctx.NewTxn(context.Background())
Expand Down Expand Up @@ -888,13 +880,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {

d.SetHook(tc)

// Use local ddl for callback test.
s.d.Stop()

d.Stop()
d.start(context.Background(), nil)

job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false)
job := testDropColumn(c, ctx, d, s.dbInfo, tblInfo, colName, false)
testCheckJobDone(c, d, job, false)
mu.Lock()
hErr := hookErr
Expand All @@ -915,7 +901,6 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
c.Assert(err, IsNil)

d.Stop()
s.d.start(context.Background(), nil)
}

func (s *testColumnSuite) TestModifyColumn(c *C) {
Expand Down
38 changes: 33 additions & 5 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (

shardRowIDBitsMax = 15

batchAddingJobs = 10

// PartitionCountLimit is limit of the number of partitions in a table.
// Mysql maximum number of partitions is 8192, our maximum number of partitions is 1024.
// Reference linking https://dev.mysql.com/doc/refman/5.7/en/partitioning-limitations.html.
Expand Down Expand Up @@ -277,10 +279,17 @@ type DDL interface {
GetHook() Callback
}

type limitJobTask struct {
job *model.Job
err chan error
}

// ddl is used to handle the statements that define the structure or schema of the database.
type ddl struct {
m sync.RWMutex
quitCh chan struct{}
m sync.RWMutex
quitCh chan struct{}
wg sync.WaitGroup // It's only used to deal with data race in state_test and schema_test.
limitJobCh chan *limitJobTask

*ddlCtx
workers map[workerType]*worker
Expand Down Expand Up @@ -385,7 +394,8 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
ddlCtx.mu.hook = hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
d := &ddl{
ddlCtx: ddlCtx,
ddlCtx: ddlCtx,
limitJobCh: make(chan *limitJobTask, batchAddingJobs),
}

d.start(ctx, ctxPool)
Expand Down Expand Up @@ -424,6 +434,20 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
logutil.Logger(ddlLogCtx).Info("[ddl] start DDL", zap.String("ID", d.uuid), zap.Bool("runWorker", RunWorker))
d.quitCh = make(chan struct{})

d.wg.Add(1)
go func() {
defer d.wg.Done()
tidbutil.WithRecovery(
func() { d.limitDDLJobs() },
func(r interface{}) {
if r != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] limit DDL jobs meet panic",
zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace"))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
}
})
}()

// If RunWorker is true, we need campaign owner and do DDL job.
// Otherwise, we needn't do that.
if RunWorker {
Expand All @@ -443,7 +467,7 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
func(r interface{}) {
if r != nil {
logutil.Logger(w.logCtx).Error("[ddl] DDL worker meet panic", zap.String("ID", d.uuid))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDL).Inc()
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLWorker).Inc()
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDL, worker.String())).Inc()
Expand Down Expand Up @@ -473,6 +497,7 @@ func (d *ddl) close() {

startTime := time.Now()
close(d.quitCh)
d.wg.Wait()
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
Expand Down Expand Up @@ -578,7 +603,10 @@ func (d *ddl) asyncNotifyWorker(jobTp model.ActionType) {

func (d *ddl) doDDLJob(ctx sessionctx.Context, job *model.Job) error {
// Get a global job ID and put the DDL job in the queue.
err := d.addDDLJob(ctx, job)
job.Query, _ = ctx.Value(sessionctx.QueryString).(string)
task := &limitJobTask{job, make(chan error)}
d.limitJobCh <- task
err := <-task.err
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 315b9f2

Please sign in to comment.