Skip to content

Commit

Permalink
domain: avoid check schema changed when change is tiflash replica rep…
Browse files Browse the repository at this point in the history
…lica status (#13034) (#14884)
  • Loading branch information
crazycs520 authored Feb 21, 2020
1 parent 28c9efc commit f90a789
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 5 deletions.
32 changes: 32 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,38 @@ func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) {
c.Assert(err.Error(), Equals, "[autoid:1467]Failed to read auto-increment value from storage engine")
}

func (s *testDBSuite2) TestSkipSchemaChecker(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
tk := s.tk
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
defer tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int)")
tk2 := testkit.NewTestKit(c, s.store)
tk2.MustExec("use test")

// Test skip schema checker for ActionSetTiFlashReplica.
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1;")
tk2.MustExec("alter table t1 set tiflash replica 2 location labels 'a','b';")
tk.MustExec("commit")

// Test skip schema checker for ActionUpdateTiFlashReplicaStatus.
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1;")
tb := testGetTableByName(c, s.tk.Se, "test", "t1")
err := domain.GetDomain(s.tk.Se).DDL().UpdateTableReplicaInfo(s.tk.Se, tb.Meta().ID, true)
c.Assert(err, IsNil)
tk.MustExec("commit")

// Test can't skip schema checker.
tk.MustExec("begin")
tk.MustExec("insert into t1 set a=1;")
tk2.MustExec("alter table t1 add column b int;")
_, err = tk.Exec("commit")
c.Assert(terror.ErrorEqual(domain.ErrInfoSchemaChanged, err), IsTrue)
}

func (s *testDBSuite2) TestLockTables(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
tk := s.tk
Expand Down
11 changes: 11 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,12 +254,23 @@ func (do *Domain) tryLoadSchemaDiffs(m *meta.Meta, usedVersion, newVersion int64
if err != nil {
return false, nil, err
}
if canSkipSchemaCheckerDDL(diff.Type) {
continue
}
tblIDs = append(tblIDs, ids...)
}
builder.Build()
return true, tblIDs, nil
}

func canSkipSchemaCheckerDDL(tp model.ActionType) bool {
switch tp {
case model.ActionUpdateTiFlashReplicaStatus, model.ActionSetTiFlashReplica:
return true
}
return false
}

// InfoSchema gets information schema from domain.
func (do *Domain) InfoSchema() infoschema.InfoSchema {
return do.infoHandle.Get()
Expand Down
6 changes: 4 additions & 2 deletions store/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,15 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
}

// SplitRaw splits a Region at the key (not encoded) and creates new Region.
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *metapb.Region {
c.Lock()
defer c.Unlock()

newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
c.regions[newRegionID] = newRegion
return newRegion
// The mocktikv should return a deep copy of meta info to avoid data race
meta := proto.Clone(newRegion.Meta)
return meta.(*metapb.Region)
}

// Merge merges 2 regions, their key ranges should be adjacent.
Expand Down
4 changes: 1 addition & 3 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,9 +624,7 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
}
newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0])
// The mocktikv should return a deep copy of meta info to avoid data race
metaCloned := proto.Clone(newRegion.Meta)
resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region))
resp.Regions = append(resp.Regions, newRegion)
}
return resp
}
Expand Down

0 comments on commit f90a789

Please sign in to comment.