Skip to content

Commit

Permalink
ddl: correct the job.SchemaState in DROP cases (#34235)
Browse files Browse the repository at this point in the history
ref #32939
  • Loading branch information
tangenta authored Apr 28, 2022
1 parent d22a8ce commit 93859dd
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 124 deletions.
48 changes: 22 additions & 26 deletions ddl/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/util/logutil"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

type testCancelJob struct {
Expand All @@ -49,8 +47,7 @@ var allTestCase = []testCancelJob{
{"alter table t add primary key idx_pc2 (c2)", true, model.StateWriteReorganization, true, true, nil},
{"alter table t add primary key idx_pc2 (c2)", false, model.StatePublic, false, true, nil},
// Drop primary key
// TODO: fix schema state.
{"alter table t drop primary key", true, model.StateNone, true, false, nil},
{"alter table t drop primary key", true, model.StatePublic, true, false, nil},
{"alter table t drop primary key", false, model.StateWriteOnly, true, false, nil},
{"alter table t drop primary key", false, model.StateWriteOnly, true, false, []string{"alter table t add primary key idx_pc2 (c2)"}},
{"alter table t drop primary key", false, model.StateDeleteOnly, true, false, []string{"alter table t add primary key idx_pc2 (c2)"}},
Expand All @@ -75,36 +72,32 @@ var allTestCase = []testCancelJob{
{"create table test_create_table(a int)", true, model.StateNone, true, false, nil},
{"create table test_create_table(a int)", false, model.StatePublic, false, true, nil},
// Drop table.
// TODO: fix schema state.
{"drop table test_create_table", true, model.StateNone, true, false, nil},
{"drop table test_create_table", true, model.StatePublic, true, false, nil},
{"drop table test_create_table", false, model.StateWriteOnly, true, true, []string{"create table if not exists test_create_table(a int)"}},
{"drop table test_create_table", false, model.StateDeleteOnly, true, true, []string{"create table if not exists test_create_table(a int)"}},
{"drop table test_create_table", false, model.StatePublic, false, true, []string{"create table if not exists test_create_table(a int)"}},
{"drop table test_create_table", false, model.StateNone, false, true, []string{"create table if not exists test_create_table(a int)"}},
// Create schema.
{"create database test_create_db", true, model.StateNone, true, false, nil},
{"create database test_create_db", false, model.StatePublic, false, true, nil},
// Drop schema.
// TODO: fix schema state.
{"drop database test_create_db", true, model.StateNone, true, false, nil},
{"drop database test_create_db", true, model.StatePublic, true, false, nil},
{"drop database test_create_db", false, model.StateWriteOnly, true, true, []string{"create database if not exists test_create_db"}},
{"drop database test_create_db", false, model.StateDeleteOnly, true, true, []string{"create database if not exists test_create_db"}},
{"drop database test_create_db", false, model.StatePublic, false, true, []string{"create database if not exists test_create_db"}},
{"drop database test_create_db", false, model.StateNone, false, true, []string{"create database if not exists test_create_db"}},
// Drop column.
// TODO: fix schema state.
{"alter table t drop column c3", true, model.StateNone, true, false, nil},
{"alter table t drop column c3", true, model.StatePublic, true, false, nil},
{"alter table t drop column c3", false, model.StateDeleteOnly, true, false, nil},
{"alter table t drop column c3", false, model.StateDeleteOnly, false, true, []string{"alter table t add column c3 bigint"}},
{"alter table t drop column c3", false, model.StateWriteOnly, true, true, []string{"alter table t add column c3 bigint"}},
{"alter table t drop column c3", false, model.StateDeleteReorganization, true, true, []string{"alter table t add column c3 bigint"}},
{"alter table t drop column c3", false, model.StatePublic, false, true, []string{"alter table t add column c3 bigint"}},
{"alter table t drop column c3", false, model.StateNone, false, true, []string{"alter table t add column c3 bigint"}},
// Drop column with index.
// TODO: fix schema state.
{"alter table t drop column c3", true, model.StateNone, true, false, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", true, model.StatePublic, true, false, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", false, model.StateDeleteOnly, true, false, nil},
{"alter table t drop column c3", false, model.StateDeleteOnly, false, true, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", false, model.StateWriteOnly, true, true, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", false, model.StateDeleteReorganization, true, true, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", false, model.StatePublic, false, true, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
{"alter table t drop column c3", false, model.StateNone, false, true, []string{"alter table t add column c3 bigint", "alter table t add index idx_c3(c3)"}},
// rebase auto ID.
{"alter table t_rebase auto_increment = 6000", true, model.StateNone, true, false, []string{"create table t_rebase (c1 bigint auto_increment primary key, c2 bigint);"}},
{"alter table t_rebase auto_increment = 9000", false, model.StatePublic, false, true, nil},
Expand All @@ -128,9 +121,8 @@ var allTestCase = []testCancelJob{
{"alter table t add constraint fk foreign key a(c1) references t_ref(c1)", true, model.StateNone, true, false, []string{"create table t_ref (c1 int, c2 int, c3 int, c11 tinyint);"}},
{"alter table t add constraint fk foreign key a(c1) references t_ref(c1)", false, model.StatePublic, false, true, nil},
// Drop foreign key.
// TODO: fix schema state.
{"alter table t drop foreign key fk", true, model.StateNone, true, false, nil},
{"alter table t drop foreign key fk", false, model.StatePublic, false, true, nil},
{"alter table t drop foreign key fk", true, model.StatePublic, true, false, nil},
{"alter table t drop foreign key fk", false, model.StateNone, false, true, nil},
// Rename table.
{"rename table t_rename1 to t_rename11", true, model.StateNone, true, false, []string{"create table t_rename1 (c1 bigint , c2 bigint);", "create table t_rename2 (c1 bigint , c2 bigint);"}},
{"rename table t_rename1 to t_rename11", false, model.StatePublic, false, true, nil},
Expand Down Expand Up @@ -179,12 +171,11 @@ var allTestCase = []testCancelJob{
{"alter table t_partition add partition (partition p6 values less than (8192))", true, model.StateReplicaOnly, true, true, nil},
{"alter table t_partition add partition (partition p6 values less than (8192))", false, model.StatePublic, false, true, nil},
// Drop partition.
// TODO: fix schema state.
{"alter table t_partition drop partition p6", true, model.StateNone, true, false, nil},
{"alter table t_partition drop partition p6", true, model.StatePublic, true, false, nil},
{"alter table t_partition drop partition p6", false, model.StateDeleteOnly, true, false, nil},
{"alter table t_partition drop partition p6", false, model.StateDeleteOnly, false, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
{"alter table t_partition drop partition p6", false, model.StateDeleteReorganization, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
{"alter table t_partition drop partition p6", false, model.StatePublic, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
{"alter table t_partition drop partition p6", false, model.StateNone, true, true, []string{"alter table t_partition add partition (partition p6 values less than (8192))"}},
// Drop indexes.
// TODO: fix schema state.
{"alter table t drop index mul_idx1, drop index mul_idx2", true, model.StateNone, true, false, []string{"alter table t add index mul_idx1(c1)", "alter table t add index mul_idx2(c1)"}},
Expand Down Expand Up @@ -242,6 +233,7 @@ func TestCancel(t *testing.T) {
hook := &ddl.TestDDLCallback{Do: dom}
i := 0
cancel := false
cancelResult := false
cancelWhenReorgNotStart := false

hookFunc := func(job *model.Job) {
Expand All @@ -250,7 +242,7 @@ func TestCancel(t *testing.T) {
return
}
rs := tkCancel.MustQuery(fmt.Sprintf("admin cancel ddl jobs %d", job.ID))
require.Equal(t, allTestCase[i].ok, cancelSuccess(rs))
cancelResult = cancelSuccess(rs)
cancel = true
}
}
Expand All @@ -270,6 +262,7 @@ func TestCancel(t *testing.T) {

for j, tc := range allTestCase {
i = j
msg := fmt.Sprintf("sql: %s, state: %s", tc.sql, tc.cancelState)
if tc.onJobBefore {
restHook(hook)
for _, prepareSQL := range tc.prepareSQL {
Expand All @@ -279,28 +272,31 @@ func TestCancel(t *testing.T) {
cancel = false
cancelWhenReorgNotStart = true
registHook(hook, true)
logutil.BgLogger().Info("test case", zap.Int("", i))
if tc.ok {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel {
require.Equal(t, tc.ok, cancelResult, msg)
}
}
if tc.onJobUpdate {
restHook(hook)
for _, prepareSQL := range tc.prepareSQL {
tk.MustExec(prepareSQL)
}

cancel = false
cancelWhenReorgNotStart = false
registHook(hook, false)
logutil.BgLogger().Info("test case", zap.Int("", i))
if tc.ok {
tk.MustGetErrCode(tc.sql, errno.ErrCancelledDDLJob)
} else {
tk.MustExec(tc.sql)
}
if cancel {
require.Equal(t, tc.ok, cancelResult, msg)
}
}
}
}
6 changes: 2 additions & 4 deletions ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,6 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateWriteOnly
case model.StateWriteOnly:
// write only -> delete only
colInfo.State = model.StateDeleteOnly
Expand All @@ -574,15 +573,13 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}
job.Args = append(job.Args, indexInfosToIDList(idxInfos))
job.SchemaState = model.StateDeleteOnly
case model.StateDeleteOnly:
// delete only -> reorganization
colInfo.State = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(d, t, job, tblInfo, originalState != colInfo.State)
if err != nil {
return ver, errors.Trace(err)
}
job.SchemaState = model.StateDeleteReorganization
case model.StateDeleteReorganization:
// reorganization -> absent
// All reorganization jobs are done, drop this column.
Expand All @@ -602,8 +599,9 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
job.Args = append(job.Args, getPartitionIDs(tblInfo))
}
default:
err = dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State)
return ver, errors.Trace(dbterror.ErrInvalidDDLJob.GenWithStackByArgs("table", tblInfo.State))
}
job.SchemaState = colInfo.State
return ver, errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion ddl/column_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestColumnAdd(t *testing.T) {
tc.OnJobUpdatedExported = func(job *model.Job) {
jobID = job.ID
tbl := external.GetTableByName(t, internal, "test", "t")
if job.SchemaState != model.StateNone {
if job.SchemaState != model.StatePublic {
for _, col := range tbl.Cols() {
require.NotEqualf(t, col.ID, dropCol.ID, "column is not dropped")
}
Expand Down
5 changes: 2 additions & 3 deletions ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,16 +822,15 @@ func (s *stateChangeSuite) runTestInSchemaState(
callback := &ddl.TestDDLCallback{Do: s.dom}
prevState := model.StateNone
var checkErr error
times := 0
se, err := session.CreateSession(s.store)
s.Require().NoError(err)
_, err = se.Execute(context.Background(), "use test_db_state")
s.Require().NoError(err)
cbFunc := func(job *model.Job) {
if job.SchemaState == prevState || checkErr != nil || times >= 3 {
if job.SchemaState == prevState || checkErr != nil {
return
}
times++
prevState = job.SchemaState
if job.SchemaState != state {
return
}
Expand Down
94 changes: 51 additions & 43 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,10 +527,11 @@ func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error)
return errors.Trace(infoschema.ErrDatabaseNotExists)
}
job := &model.Job{
SchemaID: old.ID,
SchemaName: old.Name.L,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
SchemaID: old.ID,
SchemaName: old.Name.L,
SchemaState: old.State,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -3784,13 +3785,14 @@ func (d *ddl) DropTablePartition(ctx sessionctx.Context, ident ast.Ident, spec *
}

job := &model.Job{
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
TableName: meta.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames},
SchemaID: schema.ID,
TableID: meta.ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: meta.Name.L,
Type: model.ActionDropTablePartition,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{partNames},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -4029,6 +4031,7 @@ func (d *ddl) DropColumn(ctx sessionctx.Context, ti ast.Ident, spec *ast.AlterTa
Type: model.ActionDropColumn,
BinlogInfo: &model.HistoryInfo{},
MultiSchemaInfo: multiSchemaInfo,
SchemaState: model.StatePublic,
Args: []interface{}{colName},
}

Expand Down Expand Up @@ -5267,12 +5270,13 @@ func (d *ddl) DropTable(ctx sessionctx.Context, ti ast.Ident) (err error) {
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: schema.State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -5301,12 +5305,13 @@ func (d *ddl) DropView(ctx sessionctx.Context, ti ast.Ident) (err error) {
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
TableName: tb.Meta().Name.L,
Type: model.ActionDropView,
BinlogInfo: &model.HistoryInfo{},
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tb.Meta().State,
TableName: tb.Meta().Name.L,
Type: model.ActionDropView,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -5969,13 +5974,14 @@ func (d *ddl) DropForeignKey(ctx sessionctx.Context, ti ast.Ident, fkName model.
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: model.ActionDropForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkName},
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: model.StatePublic,
TableName: t.Meta().Name.L,
Type: model.ActionDropForeignKey,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{fkName},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -6025,13 +6031,14 @@ func (d *ddl) DropIndex(ctx sessionctx.Context, ti ast.Ident, indexName model.CI
}

job := &model.Job{
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: jobTp,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{indexName},
SchemaID: schema.ID,
TableID: t.Meta().ID,
SchemaName: schema.Name.L,
TableName: t.Meta().Name.L,
Type: jobTp,
BinlogInfo: &model.HistoryInfo{},
SchemaState: indexInfo.State,
Args: []interface{}{indexName},
}

err = d.DoDDLJob(ctx, job)
Expand Down Expand Up @@ -6596,12 +6603,13 @@ func (d *ddl) DropSequence(ctx sessionctx.Context, ti ast.Ident, ifExists bool)
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
TableName: tbl.Meta().Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
SchemaID: schema.ID,
TableID: tbl.Meta().ID,
SchemaName: schema.Name.L,
SchemaState: tbl.Meta().State,
TableName: tbl.Meta().Name.L,
Type: model.ActionDropSequence,
BinlogInfo: &model.HistoryInfo{},
}

err = d.DoDDLJob(ctx, job)
Expand Down
1 change: 1 addition & 0 deletions ddl/foreign_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func onDropForeignKey(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ err
}
// Finish this job.
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
job.SchemaState = fkInfo.State
return ver, nil
default:
return ver, dbterror.ErrInvalidDDLState.GenWithStackByArgs("foreign key", fkInfo.State)
Expand Down
Loading

0 comments on commit 93859dd

Please sign in to comment.