From 7fa8297fd8306a19d19946e242823ed9b8006836 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 3 Sep 2019 13:30:49 +0800 Subject: [PATCH 1/2] *: support a region divided into multiple regions (#11739) --- ddl/split_region.go | 39 +++--- executor/executor_test.go | 13 +- executor/split.go | 61 +++------ go.mod | 2 +- go.sum | 4 +- kv/kv.go | 2 +- store/mockstore/mocktikv/rpc.go | 27 ++-- store/tikv/2pc.go | 2 +- store/tikv/backoff.go | 1 + store/tikv/rawkv.go | 4 +- store/tikv/region_cache.go | 6 +- store/tikv/region_request_test.go | 6 +- store/tikv/scan_test.go | 4 +- store/tikv/snapshot.go | 2 +- store/tikv/split_region.go | 216 +++++++++++++++++++++++------- store/tikv/split_test.go | 2 +- 16 files changed, 252 insertions(+), 139 deletions(-) mode change 100644 => 100755 store/mockstore/mocktikv/rpc.go diff --git a/ddl/split_region.go b/ddl/split_region.go index 11287e2455ae9..18e6cbad2afc1 100644 --- a/ddl/split_region.go +++ b/ddl/split_region.go @@ -70,20 +70,20 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat // And the max _tidb_rowid is 9223372036854775807, it won't be negative number. // Split table region. - regionIDs := make([]uint64, 0, 1<<(tbInfo.PreSplitRegions)+len(tbInfo.Indices)) step := int64(1 << (tbInfo.ShardRowIDBits - tbInfo.PreSplitRegions)) max := int64(1 << tbInfo.ShardRowIDBits) + splitTableKeys := make([][]byte, 0, 1<<(tbInfo.PreSplitRegions)) for p := int64(step); p < max; p += step { recordID := p << (64 - tbInfo.ShardRowIDBits - 1) recordPrefix := tablecodec.GenTableRecordPrefix(tbInfo.ID) key := tablecodec.EncodeRecordKey(recordPrefix, recordID) - regionID, err := store.SplitRegion(key, scatter) - if err != nil { - logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", zap.Int64("recordID", recordID), - zap.Error(err)) - } else { - regionIDs = append(regionIDs, regionID) - } + splitTableKeys = append(splitTableKeys, key) + } + var err error + regionIDs, err := store.SplitRegions(context.Background(), splitTableKeys, scatter) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] pre split table region failed", + zap.Stringer("table", tbInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } regionIDs = append(regionIDs, splitIndexRegion(store, tbInfo, scatter)...) if scatter { @@ -93,26 +93,27 @@ func splitPreSplitedTable(store kv.SplitableStore, tbInfo *model.TableInfo, scat func splitRecordRegion(store kv.SplitableStore, tableID int64, scatter bool) uint64 { tableStartKey := tablecodec.GenTablePrefix(tableID) - regionID, err := store.SplitRegion(tableStartKey, scatter) + regionIDs, err := store.SplitRegions(context.Background(), [][]byte{tableStartKey}, scatter) if err != nil { // It will be automatically split by TiKV later. logutil.Logger(context.Background()).Warn("[ddl] split table region failed", zap.Error(err)) } - return regionID + if len(regionIDs) == 1 { + return regionIDs[0] + } + return 0 } func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter bool) []uint64 { - regionIDs := make([]uint64, 0, len(tblInfo.Indices)) + splitKeys := make([][]byte, 0, len(tblInfo.Indices)) for _, idx := range tblInfo.Indices { indexPrefix := tablecodec.EncodeTableIndexPrefix(tblInfo.ID, idx.ID) - regionID, err := store.SplitRegion(indexPrefix, scatter) - if err != nil { - logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed", - zap.Stringer("table", tblInfo.Name), - zap.Stringer("index", idx.Name), - zap.Error(err)) - } - regionIDs = append(regionIDs, regionID) + splitKeys = append(splitKeys, indexPrefix) + } + regionIDs, err := store.SplitRegions(context.Background(), splitKeys, scatter) + if err != nil { + logutil.Logger(context.Background()).Warn("[ddl] pre split table index region failed", + zap.Stringer("table", tblInfo.Name), zap.Int("successful region count", len(regionIDs)), zap.Error(err)) } return regionIDs } diff --git a/executor/executor_test.go b/executor/executor_test.go index e16bb22621c6b..582cfcf24bf21 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1963,7 +1963,7 @@ func (s *testSuite) TestPointGetRepeatableRead(c *C) { } func (s *testSuite) TestSplitRegionTimeout(c *C) { - c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout", `return(true)`), IsNil) + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout", `return(true)`), IsNil) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -1972,7 +1972,7 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) { tk.MustExec(`set @@tidb_wait_split_region_timeout=1`) // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) - c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) } func (s *testSuite) TestRow(c *C) { @@ -3849,7 +3849,7 @@ func (s *testSuite) TestReadPartitionedTable(c *C) { func (s *testSuite) TestSplitRegion(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") + tk.MustExec("drop table if exists t, t1") tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))") tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`) _, err := tk.Exec(`split table t index idx1 by ("abcd");`) @@ -3926,8 +3926,13 @@ func (s *testSuite) TestSplitRegion(c *C) { c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region step value should more than 1000, step 10 is invalid") - // Test split region by syntax + // Test split region by syntax. tk.MustExec(`split table t by (0),(1000),(1000000)`) + + // Test split region twice to test for multiple batch split region requests. + tk.MustExec("create table t1(a int, b int)") + tk.MustQuery("split table t1 between(0) and (10000) regions 10;").Check(testkit.Rows("9 1")) + tk.MustQuery("split table t1 between(10) and (10010) regions 5;").Check(testkit.Rows("4 1")) } func (s *testSuite) TestShowTableRegion(c *C) { diff --git a/executor/split.go b/executor/split.go index 0b4cf8a3b57ef..771b6effbb7a4 100755 --- a/executor/split.go +++ b/executor/split.go @@ -23,7 +23,6 @@ import ( "github.com/cznic/mathutil" "github.com/pingcap/errors" - "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -94,27 +93,18 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs := make([]uint64, 0, len(splitIdxKeys)) - for _, idxKey := range splitIdxKeys { - if isCtxDone(ctxWithTimeout) { - break - } - - regionID, err := s.SplitRegion(idxKey, true) - if err != nil { - logutil.Logger(context.Background()).Warn("split table index region failed", - zap.String("table", e.tableInfo.Name.L), - zap.String("index", e.indexInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + if err != nil { + logutil.Logger(context.Background()).Warn("split table index region failed", + zap.String("table", e.tableInfo.Name.L), + zap.String("index", e.indexInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } @@ -294,30 +284,17 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { if err != nil { return err } - regionIDs := make([]uint64, 0, len(splitKeys)) - for _, key := range splitKeys { - failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) { - if val.(bool) { - time.Sleep(time.Second*1 + time.Millisecond*10) - } - }) - if isCtxDone(ctxWithTimeout) { - break - } - regionID, err := s.SplitRegion(key, true) - if err != nil { - logutil.Logger(context.Background()).Warn("split table region failed", - zap.String("table", e.tableInfo.Name.L), - zap.Error(err)) - continue - } - if regionID == 0 { - continue - } - regionIDs = append(regionIDs, regionID) - + regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + if err != nil { + logutil.Logger(context.Background()).Warn("split table region failed", + zap.String("table", e.tableInfo.Name.L), + zap.Error(err)) } e.splitRegions = len(regionIDs) + if e.splitRegions == 0 { + return nil + } + if !e.ctx.GetSessionVars().WaitSplitRegionFinish { return nil } diff --git a/go.mod b/go.mod index ab04607638962..8dc89708410de 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 + github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190910041007-2a177b291004 github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 diff --git a/go.sum b/go.sum index abb651e5a721e..32e61f0eea591 100644 --- a/go.sum +++ b/go.sum @@ -162,8 +162,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA= +github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 87d993717fb27..f8f86f953ffff 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -298,7 +298,7 @@ type Iterator interface { // SplitableStore is the kv store which supports split regions. type SplitableStore interface { - SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error) + SplitRegions(ctx context.Context, splitKey [][]byte, scatter bool) (regionID []uint64, err error) WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go old mode 100644 new mode 100755 index b7be40913fb50..10e76c6cbf16f --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -592,14 +592,25 @@ func (h *rpcHandler) handleKvRawScan(req *kvrpcpb.RawScanRequest) *kvrpcpb.RawSc } func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.SplitRegionResponse { - key := NewMvccKey(req.GetSplitKey()) - region, _ := h.cluster.GetRegionByKey(key) - if bytes.Equal(region.GetStartKey(), key) { - return &kvrpcpb.SplitRegionResponse{} - } - newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) - newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0]) - return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta} + keys := req.GetSplitKeys() + resp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)+1)} + for i, key := range keys { + k := NewMvccKey(key) + region, _ := h.cluster.GetRegionByKey(k) + if bytes.Equal(region.GetStartKey(), key) { + continue + } + if i == 0 { + // Set the leftmost region. + resp.Regions = append(resp.Regions, region) + } + newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers)) + newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, k, newPeerIDs, newPeerIDs[0]) + // The mocktikv should return a deep copy of meta info to avoid data race + metaCloned := proto.Clone(newRegion.Meta) + resp.Regions = append(resp.Regions, metaCloned.(*metapb.Region)) + } + return resp } // RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9ed406862641b..6ae3f56d8e24d 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -317,7 +317,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA if len(keys) == 0 { return nil } - groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys) + groups, firstRegion, err := c.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 3dd36d219230b..95c9b5aaa1d34 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -219,6 +219,7 @@ const ( deleteRangeOneRegionMaxBackoff = 100000 rawkvMaxBackoff = 20000 splitRegionBackoff = 20000 + maxSplitRegionsBackoff = 120000 scatterRegionBackoff = 20000 waitScatterRegionFinishBackoff = 120000 locateRegionMaxBackoff = 20000 diff --git a/store/tikv/rawkv.go b/store/tikv/rawkv.go index aea8c9abf52dd..b3fbfaf064026 100644 --- a/store/tikv/rawkv.go +++ b/store/tikv/rawkv.go @@ -412,7 +412,7 @@ func (c *RawKVClient) sendReq(key []byte, req *tikvrpc.Request, reverse bool) (* } func (c *RawKVClient) sendBatchReq(bo *Backoffer, keys [][]byte, cmdType tikvrpc.CmdType) (*tikvrpc.Response, error) { // split the keys - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return nil, errors.Trace(err) } @@ -570,7 +570,7 @@ func (c *RawKVClient) sendBatchPut(bo *Backoffer, keys, values [][]byte) error { for i, key := range keys { keyToValue[string(key)] = values[i] } - groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := c.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 237cc80e0d3b5..9ef2f36a1083a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -440,7 +440,8 @@ func (c *RegionCache) LocateRegionByID(bo *Backoffer, regionID uint64) (*KeyLoca // GroupKeysByRegion separates keys into groups by their belonging Regions. // Specially it also returns the first key's region which may be used as the // 'PrimaryLockKey' and should be committed ahead of others. -func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[RegionVerID][][]byte, RegionVerID, error) { +// filter is used to filter some unwanted keys. +func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte, filter func(key, regionStartKey []byte) bool) (map[RegionVerID][][]byte, RegionVerID, error) { groups := make(map[RegionVerID][][]byte) var first RegionVerID var lastLoc *KeyLocation @@ -451,6 +452,9 @@ func (c *RegionCache) GroupKeysByRegion(bo *Backoffer, keys [][]byte) (map[Regio if err != nil { return nil, first, errors.Trace(err) } + if filter != nil && filter(k, lastLoc.StartKey) { + continue + } } id := lastLoc.Region if i == 0 { diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 896253c3217e0..3533d59a71a7f 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -310,18 +310,18 @@ func (s *mockTikvGrpcServer) MvccGetByStartTs(context.Context, *kvrpcpb.MvccGetB func (s *mockTikvGrpcServer) SplitRegion(context.Context, *kvrpcpb.SplitRegionRequest) (*kvrpcpb.SplitRegionResponse, error) { return nil, errors.New("unreachable") } - func (s *mockTikvGrpcServer) CoprocessorStream(*coprocessor.Request, tikvpb.Tikv_CoprocessorStreamServer) error { return errors.New("unreachable") } - func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) error { return errors.New("unreachable") } - func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { + return nil, errors.New("unreachable") +} func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 605e2c17d28d4..41d0280935a97 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -91,12 +91,12 @@ func (s *testScanSuite) TestScan(c *C) { c.Assert(err, IsNil) if rowNum > 123 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 123)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 123))}, false) c.Assert(err, IsNil) } if rowNum > 456 { - _, err = s.store.SplitRegion(encodeKey(s.prefix, s08d("key", 456)), false) + _, err = s.store.SplitRegions(context.Background(), [][]byte{encodeKey(s.prefix, s08d("key", 456))}, false) c.Assert(err, IsNil) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 34b8da9518462..a1a9fda17ef29 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -143,7 +143,7 @@ func (s *tikvSnapshot) BatchGet(keys []kv.Key) (map[string][]byte, error) { } func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error { - groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys) + groups, _, err := s.store.regionCache.GroupKeysByRegion(bo, keys, nil) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4a84f29ddb774..4d55d10d3a357 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -16,71 +16,186 @@ package tikv import ( "bytes" "context" + "math" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" ) -// SplitRegion splits the region contains splitKey into 2 regions: [start, -// splitKey) and [splitKey, end). -func (s *tikvStore) SplitRegion(splitKey kv.Key, scatter bool) (regionID uint64, err error) { - logutil.Logger(context.Background()).Info("start split region", - zap.Binary("at", splitKey)) - bo := NewBackoffer(context.Background(), splitRegionBackoff) - sender := NewRegionRequestSender(s.regionCache, s.client) - req := &tikvrpc.Request{ - Type: tikvrpc.CmdSplitRegion, - SplitRegion: &kvrpcpb.SplitRegionRequest{ - SplitKey: splitKey, - }, +func equalRegionStartKey(key, regionStartKey []byte) bool { + if bytes.Equal(key, regionStartKey) { + return true } - req.Context.Priority = kvrpcpb.CommandPri_Normal - for { - loc, err := s.regionCache.LocateKey(bo, splitKey) - if err != nil { - return 0, errors.Trace(err) + return false +} + +func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter bool) (*tikvrpc.Response, error) { + // equalRegionStartKey is used to filter split keys. + // If the split key is equal to the start key of the region, then the key has been split, we need to skip the split key. + groups, _, err := s.regionCache.GroupKeysByRegion(bo, keys, equalRegionStartKey) + if err != nil { + return nil, errors.Trace(err) + } + + var batches []batch + for regionID, groupKeys := range groups { + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + } + + if len(batches) == 0 { + return nil, nil + } + // The first time it enters this function. + if bo.totalSleep == 0 { + logutil.Logger(context.Background()).Info("split batch regions request", + zap.Int("split key count", len(keys)), + zap.Int("batch count", len(batches)), + zap.Uint64("first batch, region ID", batches[0].regionID.id), + zap.Binary("first split key", batches[0].keys[0])) + } + if len(batches) == 1 { + resp := s.batchSendSingleRegion(bo, batches[0], scatter) + return resp.resp, errors.Trace(resp.err) + } + ch := make(chan singleBatchResp, len(batches)) + for _, batch1 := range batches { + batch := batch1 + go func() { + backoffer, cancel := bo.Fork() + defer cancel() + + util.WithRecovery(func() { + select { + case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case <-bo.ctx.Done(): + ch <- singleBatchResp{err: bo.ctx.Err()} + } + }, func(r interface{}) { + if r != nil { + ch <- singleBatchResp{err: errors.Errorf("%v", r)} + } + }) + }() + } + + srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} + for i := 0; i < len(batches); i++ { + batchResp := <-ch + if batchResp.err != nil { + logutil.Logger(context.Background()).Debug("tikv store batch send failed", + zap.Error(batchResp.err)) + if err == nil { + err = batchResp.err + } + continue } - if bytes.Equal(splitKey, loc.StartKey) { - logutil.Logger(context.Background()).Info("skip split region", - zap.Binary("at", splitKey)) - return 0, nil + + spResp := batchResp.resp.SplitRegion + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } + return &tikvrpc.Response{SplitRegion: srResp}, errors.Trace(err) +} + +func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bool) singleBatchResp { + failpoint.Inject("MockSplitRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Second*1 + time.Millisecond*10) } - res, err := sender.SendReq(bo, req, loc.Region, readTimeoutShort) + }) + + req := &tikvrpc.Request{ + Type: tikvrpc.CmdSplitRegion, + SplitRegion: &kvrpcpb.SplitRegionRequest{SplitKeys: batch.keys}, + Context: kvrpcpb.Context{Priority: kvrpcpb.CommandPri_Normal}, + } + + sender := NewRegionRequestSender(s.regionCache, s.client) + resp, err := sender.SendReq(bo, req, batch.regionID, readTimeoutShort) + + batchResp := singleBatchResp{resp: resp} + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + regionErr, err := resp.GetRegionError() + if err != nil { + batchResp.err = errors.Trace(err) + return batchResp + } + if regionErr != nil { + err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) if err != nil { - return 0, errors.Trace(err) + batchResp.err = errors.Trace(err) + return batchResp } - regionErr, err := res.GetRegionError() - if err != nil { - return 0, errors.Trace(err) + resp, err = s.splitBatchRegionsReq(bo, batch.keys, scatter) + batchResp.resp = resp + batchResp.err = err + return batchResp + } + + spResp := resp.SplitRegion + regions := spResp.GetRegions() + if len(regions) > 0 { + // Divide a region into n, one of them may not need to be scattered, + // so n-1 needs to be scattered to other stores. + spResp.Regions = regions[:len(regions)-1] + } + if !scatter { + if len(spResp.Regions) == 0 { + return batchResp } - if regionErr != nil { - err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())) - if err != nil { - return 0, errors.Trace(err) - } + logutil.Logger(context.Background()).Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Binary("first at", batch.keys[0]), + zap.String("first new region left", spResp.Regions[0].String()), + zap.Int("new region count", len(spResp.Regions))) + return batchResp + } + + for i, r := range spResp.Regions { + if err = s.scatterRegion(r.Id); err == nil { + logutil.Logger(context.Background()).Info("batch split regions, scatter a region complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Binary("at", batch.keys[i]), + zap.String("new region left", r.String())) continue } - logutil.Logger(context.Background()).Info("split region complete", - zap.Binary("at", splitKey), - zap.Stringer("new region left", res.SplitRegion.GetLeft()), - zap.Stringer("new region right", res.SplitRegion.GetRight())) - left := res.SplitRegion.GetLeft() - if left == nil { - return 0, nil + + logutil.Logger(context.Background()).Info("batch split regions, scatter a region failed", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Binary("at", batch.keys[i]), + zap.String("new region left", r.String()), + zap.Error(err)) + if batchResp.err == nil { + batchResp.err = err } - if scatter { - err = s.scatterRegion(left.Id) - if err != nil { - return 0, errors.Trace(err) - } + } + return batchResp +} + +// SplitRegions splits regions by splitKeys. +func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatter bool) (regionIDs []uint64, err error) { + bo := NewBackoffer(ctx, int(math.Min(float64(len(splitKeys))*splitRegionBackoff, maxSplitRegionsBackoff))) + resp, err := s.splitBatchRegionsReq(bo, splitKeys, scatter) + regionIDs = make([]uint64, 0, len(splitKeys)) + if resp != nil && resp.SplitRegion != nil { + spResp := resp.SplitRegion + for _, r := range spResp.Regions { + regionIDs = append(regionIDs, r.Id) } - return left.Id, nil + logutil.Logger(context.Background()).Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) } + return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { @@ -89,14 +204,13 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { bo := NewBackoffer(context.Background(), scatterRegionBackoff) for { err := s.pdClient.ScatterRegion(context.Background(), regionID) + if err == nil { + break + } + err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) if err != nil { - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) - if err != nil { - return errors.Trace(err) - } - continue + return errors.Trace(err) } - break } logutil.Logger(context.Background()).Info("scatter region complete", zap.Uint64("regionID", regionID)) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 3a40c844b14e4..ff0c7066b3924 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -61,7 +61,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) keys := [][]byte{{'a'}, {'b'}, {'c'}} - _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) + _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys, nil) c.Assert(err, IsNil) batch := batchKeys{ region: region, From 041b8e123fb34343b16b01103256c09030387425 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 17 Sep 2019 12:00:15 +0800 Subject: [PATCH 2/2] executor: fix scatter region timeout issues and "show processlist" display issues (#12057) --- executor/executor_test.go | 7 ++++- executor/split.go | 48 ++++++++++++++++-------------- store/tikv/split_region.go | 60 +++++++++++++++++++++++--------------- 3 files changed, 68 insertions(+), 47 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 582cfcf24bf21..0685150707ffd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -1973,6 +1973,11 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) { // result 0 0 means split 0 region and 0 region finish scatter regions before timeout. tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0")) c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockSplitRegionTimeout"), IsNil) + + // Test scatter regions timeout. + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout", `return(true)`), IsNil) + tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("10 1")) + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/MockScatterRegionTimeout"), IsNil) } func (s *testSuite) TestRow(c *C) { @@ -3891,7 +3896,7 @@ func (s *testSuite) TestSplitRegion(c *C) { // Test for split table region. tk.MustExec(`split table t between (0) and (1000000000) regions 10`) - // Check the ower value is more than the upper value. + // Check the lower value is more than the upper value. _, err = tk.Exec(`split table t between (2) and (1) regions 10`) c.Assert(err, NotNil) c.Assert(err.Error(), Equals, "Split table `t` region lower value 2 should less than the upper value 1") diff --git a/executor/split.go b/executor/split.go index 771b6effbb7a4..ce1afaa4288b3 100755 --- a/executor/split.go +++ b/executor/split.go @@ -43,12 +43,13 @@ import ( type SplitIndexRegionExec struct { baseExecutor - tableInfo *model.TableInfo - indexInfo *model.IndexInfo - lower []types.Datum - upper []types.Datum - num int - valueLists [][]types.Datum + tableInfo *model.TableInfo + indexInfo *model.IndexInfo + lower []types.Datum + upper []types.Datum + num int + valueLists [][]types.Datum + splitIdxKeys [][]byte done bool splitRegionResult @@ -60,8 +61,9 @@ type splitRegionResult struct { } // Open implements the Executor Open interface. -func (e *SplitIndexRegionExec) Open(ctx context.Context) error { - return e.splitIndexRegion(ctx) +func (e *SplitIndexRegionExec) Open(ctx context.Context) (err error) { + e.splitIdxKeys, err = e.getSplitIdxKeys() + return err } // Next implements the Executor Next interface. @@ -70,8 +72,12 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + if err := e.splitIndexRegion(ctx); err != nil { + return err + } + + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -85,15 +91,11 @@ func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error { if !ok { return nil } - splitIdxKeys, err := e.getSplitIdxKeys() - if err != nil { - return err - } start := time.Now() ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - regionIDs, err := s.SplitRegions(context.Background(), splitIdxKeys, true) + regionIDs, err := s.SplitRegions(context.Background(), e.splitIdxKeys, true) if err != nil { logutil.Logger(context.Background()).Warn("split table index region failed", zap.String("table", e.tableInfo.Name.L), @@ -248,14 +250,16 @@ type SplitTableRegionExec struct { upper types.Datum num int valueLists [][]types.Datum + splitKeys [][]byte done bool splitRegionResult } // Open implements the Executor Open interface. -func (e *SplitTableRegionExec) Open(ctx context.Context) error { - return e.splitTableRegion(ctx) +func (e *SplitTableRegionExec) Open(ctx context.Context) (err error) { + e.splitKeys, err = e.getSplitTableKeys() + return err } // Next implements the Executor Next interface. @@ -264,8 +268,12 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error if e.done { return nil } - appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) e.done = true + + if err := e.splitTableRegion(ctx); err != nil { + return err + } + appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum) return nil } @@ -280,11 +288,7 @@ func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error { ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout()) defer cancel() - splitKeys, err := e.getSplitTableKeys() - if err != nil { - return err - } - regionIDs, err := s.SplitRegions(ctxWithTimeout, splitKeys, true) + regionIDs, err := s.SplitRegions(ctxWithTimeout, e.splitKeys, true) if err != nil { logutil.Logger(context.Background()).Warn("split table region failed", zap.String("table", e.tableInfo.Name.L), diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go index 4d55d10d3a357..1538ea6476a3c 100644 --- a/store/tikv/split_region.go +++ b/store/tikv/split_region.go @@ -47,7 +47,7 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b var batches []batch for regionID, groupKeys := range groups { - batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPairCount) + batches = appendKeyBatches(batches, regionID, groupKeys, rawBatchPutSize) } if len(batches) == 0 { @@ -67,14 +67,13 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b } ch := make(chan singleBatchResp, len(batches)) for _, batch1 := range batches { - batch := batch1 - go func() { + go func(b batch) { backoffer, cancel := bo.Fork() defer cancel() util.WithRecovery(func() { select { - case ch <- s.batchSendSingleRegion(backoffer, batch, scatter): + case ch <- s.batchSendSingleRegion(backoffer, b, scatter): case <-bo.ctx.Done(): ch <- singleBatchResp{err: bo.ctx.Err()} } @@ -83,24 +82,25 @@ func (s *tikvStore) splitBatchRegionsReq(bo *Backoffer, keys [][]byte, scatter b ch <- singleBatchResp{err: errors.Errorf("%v", r)} } }) - }() + }(batch1) } srResp := &kvrpcpb.SplitRegionResponse{Regions: make([]*metapb.Region, 0, len(keys)*2)} for i := 0; i < len(batches); i++ { batchResp := <-ch if batchResp.err != nil { - logutil.Logger(context.Background()).Debug("tikv store batch send failed", - zap.Error(batchResp.err)) + logutil.Logger(context.Background()).Debug("batch split regions failed", zap.Error(batchResp.err)) if err == nil { err = batchResp.err } - continue } - spResp := batchResp.resp.SplitRegion - regions := spResp.GetRegions() - srResp.Regions = append(srResp.Regions, regions...) + // If the split succeeds and the scatter fails, we also need to add the region IDs. + if batchResp.resp != nil { + spResp := batchResp.resp.SplitRegion + regions := spResp.GetRegions() + srResp.Regions = append(srResp.Regions, regions...) + } } return &tikvrpc.Response{SplitRegion: srResp}, errors.Trace(err) } @@ -150,35 +150,39 @@ func (s *tikvStore) batchSendSingleRegion(bo *Backoffer, batch batch, scatter bo // so n-1 needs to be scattered to other stores. spResp.Regions = regions[:len(regions)-1] } + logutil.Logger(context.Background()).Info("batch split regions complete", + zap.Uint64("batch region ID", batch.regionID.id), + zap.Binary("first at", batch.keys[0]), + zap.Stringer("first new region left", spResp.Regions[0]), + zap.Int("new region count", len(spResp.Regions))) + if !scatter { if len(spResp.Regions) == 0 { return batchResp } - logutil.Logger(context.Background()).Info("batch split regions complete", - zap.Uint64("batch region ID", batch.regionID.id), - zap.Binary("first at", batch.keys[0]), - zap.String("first new region left", spResp.Regions[0].String()), - zap.Int("new region count", len(spResp.Regions))) return batchResp } for i, r := range spResp.Regions { if err = s.scatterRegion(r.Id); err == nil { - logutil.Logger(context.Background()).Info("batch split regions, scatter a region complete", + logutil.Logger(context.Background()).Info("batch split regions, scatter region complete", zap.Uint64("batch region ID", batch.regionID.id), zap.Binary("at", batch.keys[i]), zap.String("new region left", r.String())) continue } - logutil.Logger(context.Background()).Info("batch split regions, scatter a region failed", + logutil.Logger(context.Background()).Info("batch split regions, scatter region failed", zap.Uint64("batch region ID", batch.regionID.id), zap.Binary("at", batch.keys[i]), - zap.String("new region left", r.String()), + zap.Stringer("new region left", r), zap.Error(err)) if batchResp.err == nil { batchResp.err = err } + if ErrPDServerTimeout.Equal(err) { + break + } } return batchResp } @@ -193,12 +197,19 @@ func (s *tikvStore) SplitRegions(ctx context.Context, splitKeys [][]byte, scatte for _, r := range spResp.Regions { regionIDs = append(regionIDs, r.Id) } - logutil.Logger(context.Background()).Info("split regions complete", zap.Uint64s("region IDs", regionIDs)) + logutil.Logger(context.Background()).Info("split regions complete", + zap.Int("region count", len(regionIDs)), zap.Uint64s("region IDs", regionIDs)) } return regionIDs, errors.Trace(err) } func (s *tikvStore) scatterRegion(regionID uint64) error { + failpoint.Inject("MockScatterRegionTimeout", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(ErrPDServerTimeout) + } + }) + logutil.Logger(context.Background()).Info("start scatter region", zap.Uint64("regionID", regionID)) bo := NewBackoffer(context.Background(), scatterRegionBackoff) @@ -207,12 +218,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { if err == nil { break } - err = bo.Backoff(BoRegionMiss, errors.New(err.Error())) + err = bo.Backoff(BoPDRPC, errors.New(err.Error())) if err != nil { return errors.Trace(err) } } - logutil.Logger(context.Background()).Info("scatter region complete", + logutil.Logger(context.Background()).Debug("scatter region complete", zap.Uint64("regionID", regionID)) return nil } @@ -221,11 +232,12 @@ func (s *tikvStore) scatterRegion(regionID uint64) error { // backOff is the back off time of the wait scatter region.(Milliseconds) // if backOff <= 0, the default wait scatter back off time will be used. func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error { - logutil.Logger(context.Background()).Info("wait scatter region", - zap.Uint64("regionID", regionID)) if backOff <= 0 { backOff = waitScatterRegionFinishBackoff } + logutil.Logger(context.Background()).Info("wait scatter region", + zap.Uint64("regionID", regionID), zap.Int("backoff(ms)", backOff)) + bo := NewBackoffer(context.Background(), backOff) logFreq := 0 for {