Skip to content

Commit

Permalink
ddl: implement ALTER DATABASE to alter charset/collation (#10393)
Browse files Browse the repository at this point in the history
  • Loading branch information
bb7133 authored and winkyao committed May 15, 2019
1 parent 480c605 commit 5c21fa4
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 15 deletions.
103 changes: 103 additions & 0 deletions ddl/db_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,3 +1638,106 @@ func (s *testIntegrationSuite3) TestDefaultValueIsString(c *C) {
tbl := testGetTableByName(c, s.ctx, "test", "t")
c.Assert(tbl.Meta().Columns[0].DefaultValue, Equals, "1")
}

func (s *testIntegrationSuite11) TestChangingDBCharset(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("DROP DATABASE IF EXISTS alterdb1")
tk.MustExec("CREATE DATABASE alterdb1 CHARSET=utf8 COLLATE=utf8_unicode_ci")

// No default DB errors.
noDBFailedCases := []struct {
stmt string
errMsg string
}{
{
"ALTER DATABASE CHARACTER SET = 'utf8'",
"[planner:1046]No database selected",
},
{
"ALTER SCHEMA `` CHARACTER SET = 'utf8'",
"[ddl:1102]Incorrect database name ''",
},
}
for _, fc := range noDBFailedCases {
c.Assert(tk.ExecToErr(fc.stmt).Error(), Equals, fc.errMsg, Commentf("%v", fc.stmt))
}

verifyDBCharsetAndCollate := func(dbName, chs string, coll string) {
// check `SHOW CREATE SCHEMA`.
r := tk.MustQuery("SHOW CREATE SCHEMA " + dbName).Rows()[0][1].(string)
c.Assert(strings.Contains(r, "CHARACTER SET "+chs), IsTrue)

template := `SELECT
DEFAULT_CHARACTER_SET_NAME,
DEFAULT_COLLATION_NAME
FROM INFORMATION_SCHEMA.SCHEMATA
WHERE SCHEMA_NAME = '%s'`
sql := fmt.Sprintf(template, dbName)
tk.MustQuery(sql).Check(testkit.Rows(fmt.Sprintf("%s %s", chs, coll)))

dom := domain.GetDomain(s.ctx)
// Make sure the table schema is the new schema.
err := dom.Reload()
c.Assert(err, IsNil)
dbInfo, ok := dom.InfoSchema().SchemaByName(model.NewCIStr(dbName))
c.Assert(ok, Equals, true)
c.Assert(dbInfo.Charset, Equals, chs)
c.Assert(dbInfo.Collate, Equals, coll)
}

tk.MustExec("ALTER SCHEMA alterdb1 COLLATE = utf8mb4_general_ci")
verifyDBCharsetAndCollate("alterdb1", "utf8mb4", "utf8mb4_general_ci")

tk.MustExec("DROP DATABASE IF EXISTS alterdb2")
tk.MustExec("CREATE DATABASE alterdb2 CHARSET=utf8 COLLATE=utf8_unicode_ci")
tk.MustExec("USE alterdb2")

failedCases := []struct {
stmt string
errMsg string
}{
{
"ALTER SCHEMA `` CHARACTER SET = 'utf8'",
"[ddl:1102]Incorrect database name ''",
},
{
"ALTER DATABASE CHARACTER SET = ''",
"[parser:1115]Unknown character set: ''",
},
{
"ALTER DATABASE CHARACTER SET = 'INVALID_CHARSET'",
"[parser:1115]Unknown character set: 'INVALID_CHARSET'",
},
{
"ALTER SCHEMA COLLATE = ''",
"[ddl:1273]Unknown collation: ''",
},
{
"ALTER DATABASE COLLATE = 'INVALID_COLLATION'",
"[ddl:1273]Unknown collation: 'INVALID_COLLATION'",
},
{
"ALTER DATABASE CHARACTER SET = 'utf8' DEFAULT CHARSET = 'utf8mb4'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8' and 'CHARACTER SET utf8mb4'",
},
{
"ALTER SCHEMA CHARACTER SET = 'utf8' COLLATE = 'utf8mb4_bin'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8' and 'CHARACTER SET utf8mb4'",
},
{
"ALTER DATABASE COLLATE = 'utf8mb4_bin' COLLATE = 'utf8_bin'",
"[ddl:1302]Conflicting declarations: 'CHARACTER SET utf8mb4' and 'CHARACTER SET utf8'",
},
}

for _, fc := range failedCases {
c.Assert(tk.ExecToErr(fc.stmt).Error(), Equals, fc.errMsg, Commentf("%v", fc.stmt))
}

tk.MustExec("ALTER SCHEMA CHARACTER SET = 'utf8mb4'")
verifyDBCharsetAndCollate("alterdb2", "utf8mb4", "utf8mb4_bin")

err := tk.ExecToErr("ALTER SCHEMA CHARACTER SET = 'utf8mb4' COLLATE = 'utf8mb4_general_ci'")
c.Assert(err.Error(), Equals, "[ddl:210]unsupported modify collate from utf8mb4_bin to utf8mb4_general_ci")
}
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ var (
// DDL is responsible for updating schema in data store and maintaining in-memory InfoSchema cache.
type DDL interface {
CreateSchema(ctx sessionctx.Context, name model.CIStr, charsetInfo *ast.CharsetOpt) error
AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) error
DropSchema(ctx sessionctx.Context, schema model.CIStr) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
Expand Down
57 changes: 57 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,63 @@ func (d *ddl) CreateSchema(ctx sessionctx.Context, schema model.CIStr, charsetIn
err = d.callHookOnChanged(err)
return errors.Trace(err)
}
func (d *ddl) AlterSchema(ctx sessionctx.Context, stmt *ast.AlterDatabaseStmt) (err error) {
// Resolve target charset and collation from options.
var toCharset, toCollate string
for _, val := range stmt.Options {
switch val.Tp {
case ast.DatabaseOptionCharset:
if toCharset == "" {
toCharset = val.Value
} else if toCharset != val.Value {
return ErrConflictingDeclarations.GenWithStackByArgs(toCharset, val.Value)
}
case ast.DatabaseOptionCollate:
info, err := charset.GetCollationByName(val.Value)
if err != nil {
return errors.Trace(err)
}
if toCharset == "" {
toCharset = info.CharsetName
} else if toCharset != info.CharsetName {
return ErrConflictingDeclarations.GenWithStackByArgs(toCharset, info.CharsetName)
}
toCollate = info.Name
}
}
if toCollate == "" {
if toCollate, err = charset.GetDefaultCollation(toCharset); err != nil {
return errors.Trace(err)
}
}

// Check if need to change charset/collation.
dbName := model.NewCIStr(stmt.Name)
is := d.GetInfoSchemaWithInterceptor(ctx)
dbInfo, ok := is.SchemaByName(dbName)
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName.O)
}
if dbInfo.Charset == toCharset && dbInfo.Charset == toCollate {
return nil
}

// Check the current TiDB limitations.
if err = modifiableCharsetAndCollation(toCharset, toCollate, dbInfo.Charset, dbInfo.Collate); err != nil {
return errors.Trace(err)
}

// Do the DDL job.
job := &model.Job{
SchemaID: dbInfo.ID,
Type: model.ActionModifySchemaCharsetAndCollate,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{toCharset, toCollate},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

func (d *ddl) DropSchema(ctx sessionctx.Context, schema model.CIStr) (err error) {
is := d.GetInfoSchemaWithInterceptor(ctx)
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
switch job.Type {
case model.ActionCreateSchema:
ver, err = onCreateSchema(d, t, job)
case model.ActionModifySchemaCharsetAndCollate:
ver, err = onModifySchemaCharsetAndCollate(t, job)
case model.ActionDropSchema:
ver, err = onDropSchema(t, job)
case model.ActionCreateTable:
Expand Down
3 changes: 2 additions & 1 deletion ddl/rollingback.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,8 @@ func convertJob2RollbackJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
case model.ActionRebaseAutoID, model.ActionShardRowID,
model.ActionModifyColumn, model.ActionAddForeignKey,
model.ActionDropForeignKey, model.ActionRenameTable,
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition:
model.ActionModifyTableCharsetAndCollate, model.ActionTruncateTablePartition,
model.ActionModifySchemaCharsetAndCollate:
ver, err = cancelOnlyNotHandledJob(job)
default:
job.State = model.JobStateCancelled
Expand Down
31 changes: 31 additions & 0 deletions ddl/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,37 @@ func checkSchemaNotExistsFromStore(t *meta.Meta, schemaID int64, dbInfo *model.D
return nil
}

func onModifySchemaCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var toCharset, toCollate string
if err := job.DecodeArgs(&toCharset, &toCollate); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if dbInfo.Charset == toCharset && dbInfo.Collate == toCollate {
job.State = model.JobStateCancelled
return ver, nil
}

dbInfo.Charset = toCharset
dbInfo.Collate = toCollate

if err = t.UpdateDatabase(dbInfo); err != nil {
return ver, errors.Trace(err)
}
if ver, err = updateSchemaVersion(t, job); err != nil {
return ver, errors.Trace(err)
}
job.FinishDBJob(model.JobStateDone, model.StatePublic, ver, dbInfo)
return ver, nil
}

func onDropSchema(t *meta.Meta, job *model.Job) (ver int64, _ error) {
dbInfo, err := checkSchemaExistAndCancelNotExistJob(t, job)
if err != nil {
Expand Down
27 changes: 17 additions & 10 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,30 @@ func (e *DDLExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error)
defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }()

switch x := e.stmt.(type) {
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
case *ast.AlterDatabaseStmt:
err = e.executeAlterDatabase(x)
case *ast.AlterTableStmt:
err = e.executeAlterTable(x)
case *ast.CreateIndexStmt:
err = e.executeCreateIndex(x)
case *ast.CreateDatabaseStmt:
err = e.executeCreateDatabase(x)
case *ast.CreateTableStmt:
err = e.executeCreateTable(x)
case *ast.CreateViewStmt:
err = e.executeCreateView(x)
case *ast.CreateIndexStmt:
err = e.executeCreateIndex(x)
case *ast.DropIndexStmt:
err = e.executeDropIndex(x)
case *ast.DropDatabaseStmt:
err = e.executeDropDatabase(x)
case *ast.DropTableStmt:
err = e.executeDropTableOrView(x)
case *ast.DropIndexStmt:
err = e.executeDropIndex(x)
case *ast.AlterTableStmt:
err = e.executeAlterTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.RecoverTableStmt:
err = e.executeRecoverTable(x)
case *ast.RenameTableStmt:
err = e.executeRenameTable(x)
case *ast.TruncateTableStmt:
err = e.executeTruncateTable(x)
}
if err != nil {
return e.toErr(err)
Expand Down Expand Up @@ -163,6 +165,11 @@ func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error {
return err
}

func (e *DDLExec) executeAlterDatabase(s *ast.AlterDatabaseStmt) error {
err := domain.GetDomain(e.ctx).DDL().AlterSchema(e.ctx, s)
return err
}

func (e *DDLExec) executeCreateTable(s *ast.CreateTableStmt) error {
err := domain.GetDomain(e.ctx).DDL().CreateTable(e.ctx, s)
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible
github.com/pingcap/tipb v0.0.0-20190428032612-535e1abaa330
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ github.com/pingcap/kvproto v0.0.0-20190425131531-4ed0aa16f7ea/go.mod h1:QMdbTAXC
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825 h1:U9Kdnknj4n2v76Mg7wazevZ5N9U1OIaMwSNRVLEcLX0=
github.com/pingcap/parser v0.0.0-20190506092653-e336082eb825/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b h1:IC1S/ZfKdQRccGJG7C3IkWB6S+d0OkYj8vnmNrMCu2c=
github.com/pingcap/parser v0.0.0-20190509110453-7a8657e6052b/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669 h1:ZoKjndm/Ig7Ru/wojrQkc/YLUttUdQXoH77gtuWCvL4=
github.com/pingcap/pd v0.0.0-20190424024702-bd1e2496a669/go.mod h1:MUCxRzOkYiWZtlyi4MhxjCIj9PgQQ/j+BLNGm7aUsnM=
github.com/pingcap/tidb-tools v2.1.3-0.20190321065848-1e8b48f5c168+incompatible h1:MkWCxgZpJBgY2f4HtwWMMFzSBb3+JPzeJgF3VrXE/bU=
Expand Down
19 changes: 19 additions & 0 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
} else if diff.Type == model.ActionDropSchema {
tblIDs := b.applyDropSchema(diff.SchemaID)
return tblIDs, nil
} else if diff.Type == model.ActionModifySchemaCharsetAndCollate {
return nil, b.applyModifySchemaCharsetAndCollate(m, diff)
}

roDBInfo, ok := b.is.SchemaByID(diff.SchemaID)
Expand Down Expand Up @@ -127,6 +129,23 @@ func (b *Builder) applyCreateSchema(m *meta.Meta, diff *model.SchemaDiff) error
return nil
}

func (b *Builder) applyModifySchemaCharsetAndCollate(m *meta.Meta, diff *model.SchemaDiff) error {
di, err := m.GetDatabase(diff.SchemaID)
if err != nil {
return errors.Trace(err)
}
if di == nil {
// This should never happen.
return ErrDatabaseNotExists.GenWithStackByArgs(
fmt.Sprintf("(Schema ID %d)", diff.SchemaID),
)
}
newDbInfo := b.copySchemaTables(di.Name.O)
newDbInfo.Charset = di.Charset
newDbInfo.Collate = di.Collate
return nil
}

func (b *Builder) applyDropSchema(schemaID int64) []int64 {
di, ok := b.is.SchemaByID(schemaID)
if !ok {
Expand Down
12 changes: 12 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1650,6 +1650,18 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitIndexRegionStmt) (Pla
func (b *PlanBuilder) buildDDL(node ast.DDLNode) (Plan, error) {
var authErr error
switch v := node.(type) {
case *ast.AlterDatabaseStmt:
if v.AlterDefaultDatabase {
v.Name = b.ctx.GetSessionVars().CurrentDB
}
if v.Name == "" {
return nil, ErrNoDB
}
if b.ctx.GetSessionVars().User != nil {
authErr = ErrDBaccessDenied.GenWithStackByArgs("ALTER", b.ctx.GetSessionVars().User.Hostname,
b.ctx.GetSessionVars().User.Username, v.Name)
}
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.AlterPriv, v.Name, "", "", authErr)
case *ast.AlterTableStmt:
if b.ctx.GetSessionVars().User != nil {
authErr = ErrTableaccessDenied.GenWithStackByArgs("ALTER", b.ctx.GetSessionVars().User.Hostname,
Expand Down
9 changes: 9 additions & 0 deletions planner/core/preprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
p.checkAlterTableGrammar(node)
case *ast.CreateDatabaseStmt:
p.checkCreateDatabaseGrammar(node)
case *ast.AlterDatabaseStmt:
p.checkAlterDatabaseGrammar(node)
case *ast.DropDatabaseStmt:
p.checkDropDatabaseGrammar(node)
case *ast.ShowStmt:
Expand Down Expand Up @@ -324,6 +326,13 @@ func (p *preprocessor) checkCreateDatabaseGrammar(stmt *ast.CreateDatabaseStmt)
}
}

func (p *preprocessor) checkAlterDatabaseGrammar(stmt *ast.AlterDatabaseStmt) {
// for 'ALTER DATABASE' statement, database name can be empty to alter default database.
if isIncorrectName(stmt.Name) && !stmt.AlterDefaultDatabase {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
}
}

func (p *preprocessor) checkDropDatabaseGrammar(stmt *ast.DropDatabaseStmt) {
if isIncorrectName(stmt.Name) {
p.err = ddl.ErrWrongDBName.GenWithStackByArgs(stmt.Name)
Expand Down
3 changes: 3 additions & 0 deletions planner/core/preprocess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ func (s *testValidatorSuite) TestValidator(c *C) {
{"drop table `t `", true, errors.New("[ddl:1103]Incorrect table name 't '")},
{"create database ``", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"create database `test `", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"alter database collate = 'utf8mb4_bin'", true, nil},
{"alter database `` collate = 'utf8mb4_bin'", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"alter database `test ` collate = 'utf8mb4_bin'", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"drop database ``", true, errors.New("[ddl:1102]Incorrect database name ''")},
{"drop database `test `", true, errors.New("[ddl:1102]Incorrect database name 'test '")},
{"alter table `t ` add column c int", true, errors.New("[ddl:1103]Incorrect table name 't '")},
Expand Down
Loading

0 comments on commit 5c21fa4

Please sign in to comment.