Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*:fix the bug about auto-increment key after renaming a table …(#5248) #5314

Merged
merged 1 commit into from
Dec 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,11 +786,7 @@ func (d *ddl) CreateTable(ctx context.Context, ident ast.Ident, colDefs []*ast.C
// handleAutoIncID handles auto_increment option in DDL. It creates a ID counter for the table and initiates the counter to a proper value.
// For example if the option sets auto_increment to 10. The counter will be set to 9. So the next allocated ID will be 10.
func (d *ddl) handleAutoIncID(tbInfo *model.TableInfo, schemaID int64) error {
if tbInfo.OldSchemaID != 0 {
schemaID = tbInfo.OldSchemaID
}
alloc := autoid.NewAllocator(d.store, schemaID)

alloc := autoid.NewAllocator(d.store, tbInfo.GetDBID(schemaID))
tbInfo.State = model.StatePublic
tb, err := table.TableFromMeta(alloc, tbInfo)
if err != nil {
Expand Down
27 changes: 19 additions & 8 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
if err = t.DropTable(job.SchemaID, job.TableID, true); err != nil {
if err = t.DropTable(job.SchemaID, tableID, true); err != nil {
break
}
// Finish this job.
Expand Down Expand Up @@ -154,10 +154,7 @@ func (d *ddl) splitTableRegion(tableID int64) error {
var reorgTableDeleteLimit = 65536

func (d *ddl) getTable(schemaID int64, tblInfo *model.TableInfo) (table.Table, error) {
if tblInfo.OldSchemaID != 0 {
schemaID = tblInfo.OldSchemaID
}
alloc := autoid.NewAllocator(d.store, schemaID)
alloc := autoid.NewAllocator(d.store, tblInfo.GetDBID(schemaID))
tbl, err := table.TableFromMeta(alloc, tblInfo)
return tbl, errors.Trace(err)
}
Expand Down Expand Up @@ -206,7 +203,7 @@ func (d *ddl) onTruncateTable(t *meta.Meta, job *model.Job) (ver int64, _ error)
return ver, errors.Trace(err)
}

err = t.DropTable(schemaID, tableID, true)
err = t.DropTable(schemaID, tblInfo.ID, true)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
Expand Down Expand Up @@ -242,15 +239,21 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
if err != nil {
return ver, errors.Trace(err)
}
var baseID int64
newSchemaID := job.SchemaID
if newSchemaID != oldSchemaID {
err = checkTableNotExists(t, job, newSchemaID, tblInfo.Name.L)
if err != nil {
return ver, errors.Trace(err)
}
if tblInfo.OldSchemaID == 0 {
tblInfo.OldSchemaID = oldSchemaID
baseID, err = t.GetAutoTableID(tblInfo.GetDBID(oldSchemaID), tblInfo.ID)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
// It's compatible with old version.
// TODO: Remove it.
tblInfo.OldSchemaID = 0
}

err = t.DropTable(oldSchemaID, tblInfo.ID, false)
Expand All @@ -264,6 +267,14 @@ func (d *ddl) onRenameTable(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
// Update the table's auto-increment ID.
if newSchemaID != oldSchemaID {
_, err = t.GenAutoTableID(newSchemaID, tblInfo.ID, baseID)
if err != nil {
job.State = model.JobCancelled
return ver, errors.Trace(err)
}
}

ver, err = updateSchemaVersion(t, job)
if err != nil {
Expand Down
27 changes: 25 additions & 2 deletions executor/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,37 @@ func (s *testSuite) TestRenameTable(c *C) {
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("insert rename1.t values ()")
tk.MustExec("rename table rename1.t to rename2.t")
// Make sure the drop old database doesn't affect the rename3.t's operations.
tk.MustExec("drop database rename1")
tk.MustExec("insert rename2.t values ()")
tk.MustExec("rename table rename2.t to rename3.t")
tk.MustExec("insert rename3.t values ()")
tk.MustQuery("select * from rename3.t").Check(testkit.Rows("1", "2", "3"))
tk.MustQuery("select * from rename3.t").Check(testkit.Rows("1", "5001", "10001"))
// Make sure the drop old database doesn't affect the rename3.t's operations.
tk.MustExec("drop database rename2")
tk.MustExec("insert rename3.t values ()")
tk.MustQuery("select * from rename3.t").Check(testkit.Rows("1", "5001", "10001", "10002"))
tk.MustExec("drop database rename3")

tk.MustExec("create database rename1")
tk.MustExec("create database rename2")
tk.MustExec("create table rename1.t (a int primary key auto_increment)")
tk.MustExec("rename table rename1.t to rename2.t1")
tk.MustExec("insert rename2.t1 values ()")
result := tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1"))
// Make sure the drop old database doesn't affect the t1's operations.
tk.MustExec("drop database rename1")
tk.MustExec("insert rename2.t1 values ()")
result = tk.MustQuery("select * from rename2.t1")
result.Check(testkit.Rows("1", "2"))
// Rename a table to another table in the same database.
tk.MustExec("rename table rename2.t1 to rename2.t2")
tk.MustExec("insert rename2.t2 values ()")
result = tk.MustQuery("select * from rename2.t2")
result.Check(testkit.Rows("1", "2", "5001"))

tk.MustExec("drop database rename2")
tk.MustExec("drop database rename3")
}

func (s *testSuite) TestUnsupportedCharset(c *C) {
Expand Down
12 changes: 3 additions & 9 deletions infoschema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
// We try to reuse the old allocator, so the cached auto ID can be reused.
var alloc autoid.Allocator
if tableIDIsValid(oldTableID) {
if oldTableID == newTableID {
if oldTableID == newTableID && diff.Type != model.ActionRenameTable {
alloc, _ = b.is.AllocByID(oldTableID)
}
if diff.Type == model.ActionRenameTable {
Expand Down Expand Up @@ -162,10 +162,7 @@ func (b *Builder) applyCreateTable(m *meta.Meta, roDBInfo *model.DBInfo, tableID
}
if alloc == nil {
schemaID := roDBInfo.ID
if tblInfo.OldSchemaID != 0 {
schemaID = tblInfo.OldSchemaID
}
alloc = autoid.NewAllocator(b.handle.store, schemaID)
alloc = autoid.NewAllocator(b.handle.store, tblInfo.GetDBID(schemaID))
}
tbl, err := tables.TableFromMeta(alloc, tblInfo)
if err != nil {
Expand Down Expand Up @@ -267,10 +264,7 @@ func (b *Builder) createSchemaTablesForDB(di *model.DBInfo) error {
b.is.schemaMap[di.Name.L] = schTbls
for _, t := range di.Tables {
schemaID := di.ID
if t.OldSchemaID != 0 {
schemaID = t.OldSchemaID
}
alloc := autoid.NewAllocator(b.handle.store, schemaID)
alloc := autoid.NewAllocator(b.handle.store, t.GetDBID(schemaID))
var tbl table.Table
tbl, err := tables.TableFromMeta(alloc, t)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ type allocator struct {
base int64
end int64
store kv.Storage
dbID int64
// dbID is current database's ID.
dbID int64
}

// GetStep is only used by tests
Expand Down
19 changes: 9 additions & 10 deletions meta/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (m *Meta) parseDatabaseID(key string) (int64, error) {
return n, errors.Trace(err)
}

func (m *Meta) autoTalbeIDKey(tableID int64) []byte {
func (m *Meta) autoTableIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
}

Expand All @@ -140,9 +140,9 @@ func (m *Meta) parseTableID(key string) (int64, error) {
return n, errors.Trace(err)
}

// GenAutoTableID adds step to the auto id of the table and returns the sum.
func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
// Check if db exists.
// GenAutoTableID adds step to the auto ID of the table and returns the sum.
func (m *Meta) GenAutoTableID(dbID, tableID, step int64) (int64, error) {
// Check if DB exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return 0, errors.Trace(err)
Expand All @@ -154,12 +154,12 @@ func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, err
return 0, errors.Trace(err)
}

return m.txn.HInc(dbKey, m.autoTalbeIDKey(tableID), step)
return m.txn.HInc(dbKey, m.autoTableIDKey(tableID), step)
}

// GetAutoTableID gets current auto id with table id.
func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID))
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTableIDKey(tableID))
}

// GetSchemaVersion gets current global schema version.
Expand Down Expand Up @@ -294,25 +294,24 @@ func (m *Meta) DropDatabase(dbID int64) error {
// DropTable drops table in database.
// If delAutoID is true, it will delete the auto_increment id key-value of the table.
// For rename table, we do not need to rename auto_increment id key-value.
func (m *Meta) DropTable(dbID int64, tableID int64, delAutoID bool) error {
func (m *Meta) DropTable(dbID int64, tblID int64, delAutoID bool) error {
// Check if db exists.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return errors.Trace(err)
}

// Check if table exists.
tableKey := m.tableKey(tableID)
tableKey := m.tableKey(tblID)
if err := m.checkTableExists(dbKey, tableKey); err != nil {
return errors.Trace(err)
}

if err := m.txn.HDel(dbKey, tableKey); err != nil {
return errors.Trace(err)
}

if delAutoID {
if err := m.txn.HDel(dbKey, m.autoTalbeIDKey(tableID)); err != nil {
if err := m.txn.HDel(dbKey, m.autoTableIDKey(tblID)); err != nil {
return errors.Trace(err)
}
}
Expand Down
27 changes: 14 additions & 13 deletions meta/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))

err = t.DropTable(1, 2, true)
err = t.DropTable(1, tbInfo2.ID, true)
c.Assert(err, IsNil)
// Make sure auto id key-value entry is gone.
n, err = t.GetAutoTableID(1, 2)
Expand All @@ -164,20 +164,21 @@ func (s *testSuite) TestMeta(c *C) {
// Create table.
err = t.CreateTable(1, tbInfo100)
c.Assert(err, IsNil)
// Update auto id.
n, err = t.GenAutoTableID(1, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
n, err = t.GetAutoTableID(1, tid)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Drop table without touch auto id key-value entry.
err = t.DropTable(1, 100, false)
c.Assert(err, IsNil)
// Make sure that auto id key-value entry is still there.
n, err = t.GetAutoTableID(1, tid)
// Update auto ID.
currentDBID := int64(1)
n, err = t.GenAutoTableID(currentDBID, tid, 10)
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(10))
// Fail to update auto ID.
// The table ID doesn't exist.
nonExistentID := int64(1234)
_, err = t.GenAutoTableID(currentDBID, nonExistentID, 10)
c.Assert(err, NotNil)
// Fail to update auto ID.
// The current database ID doesn't exist.
currentDBID = nonExistentID
_, err = t.GenAutoTableID(currentDBID, tid, 10)
c.Assert(err, NotNil)

err = t.DropDatabase(1)
c.Assert(err, IsNil)
Expand Down
11 changes: 11 additions & 0 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,20 @@ type TableInfo struct {
// Because auto increment ID has schemaID as prefix,
// We need to save original schemaID to keep autoID unchanged
// while renaming a table from one database to another.
// TODO: Remove it.
// Now it only uses for compatibility with the old version that already uses this field.
OldSchemaID int64 `json:"old_schema_id,omitempty"`
}

// GetDBID returns the schema ID that is used to create an allocator.
// TODO: Remove it after removing OldSchemaID.
func (t *TableInfo) GetDBID(dbID int64) int64 {
if t.OldSchemaID != 0 {
return t.OldSchemaID
}
return dbID
}

// Clone clones TableInfo.
func (t *TableInfo) Clone() *TableInfo {
nt := *t
Expand Down