From 348e9af170687e035a6be99e012198b493563f7c Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 21 Feb 2020 15:36:24 +0800 Subject: [PATCH 1/8] conn, restore: paginate scan regions Signed-off-by: Neil Shen --- pkg/conn/conn.go | 9 +++- pkg/restore/import.go | 4 +- pkg/restore/split.go | 3 +- pkg/restore/split_test.go | 26 ++++++---- pkg/restore/util.go | 35 +++++++++++++ pkg/restore/util_test.go | 103 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 166 insertions(+), 14 deletions(-) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 3695a2a0c..5098ad059 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -14,6 +14,7 @@ import ( "sync/atomic" "time" + "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" @@ -34,6 +35,7 @@ const ( clusterVersionPrefix = "pd/api/v1/config/cluster-version" regionCountPrefix = "pd/api/v1/stats/region" schdulerPrefix = "pd/api/v1/schedulers" + maxMsgSize = int(128 * utils.MB) // pd.ScanRegion may return a large response ) // Mgr manages connections to a TiDB cluster. @@ -103,7 +105,12 @@ func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, er return nil, errors.Annotatef(failure, "pd address (%s) not available, please check network", pdAddrs) } - pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) + maxCallMsgSize := []grpc.DialOption{ + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)), + grpc.WithDefaultCallOptions(grpc.MaxCallSendMsgSize(maxMsgSize)), + } + pdClient, err := pd.NewClient( + addrs, pd.SecurityOption{}, pd.WithGRPCDialOptions(maxCallMsgSize...)) if err != nil { log.Error("fail to create pd client", zap.Error(err)) return nil, err diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 01f8456ef..689a23ff9 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -21,6 +21,7 @@ import ( ) const importScanRegionTime = 10 * time.Second +const scanRegionPaginationLimit = int(128) // ImporterClient is used to import a file to TiKV type ImporterClient interface { @@ -163,7 +164,8 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul ctx, cancel := context.WithTimeout(importer.ctx, importScanRegionTime) defer cancel() // Scan regions covered by the file range - regionInfos, err1 := importer.metaClient.ScanRegions(ctx, startKey, endKey, 0) + regionInfos, err1 := paginateScanRegion( + ctx, importer.metaClient, startKey, endKey, scanRegionPaginationLimit) if err1 != nil { return errors.Trace(err1) } diff --git a/pkg/restore/split.go b/pkg/restore/split.go index 3248fdd0d..c0f421c2a 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -87,8 +87,7 @@ func (rs *RegionSplitter) Split( scatterRegions := make([]*RegionInfo, 0) SplitRegions: for i := 0; i < SplitRetryTimes; i++ { - var regions []*RegionInfo - regions, err = rs.client.ScanRegions(ctx, minKey, maxKey, 0) + regions, err := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) if err != nil { return errors.Trace(err) } diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 509c4cfa0..62e7f2c76 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -3,6 +3,7 @@ package restore import ( "bytes" "context" + "fmt" "sync" . "github.com/pingcap/check" @@ -10,6 +11,7 @@ import ( "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule/placement" "github.com/pingcap/tidb/util/codec" ) @@ -18,13 +20,19 @@ type testClient struct { mu sync.RWMutex stores map[uint64]*metapb.Store regions map[uint64]*RegionInfo + regionsInfo *core.RegionsInfo // For now it's only used in ScanRegions nextRegionID uint64 } func newTestClient(stores map[uint64]*metapb.Store, regions map[uint64]*RegionInfo, nextRegionID uint64) *testClient { + regionsInfo := core.NewRegionsInfo() + for _, regionInfo := range regions { + regionsInfo.AddRegion(core.NewRegionInfo(regionInfo.Region, regionInfo.Leader)) + } return &testClient{ stores: stores, regions: regions, + regionsInfo: regionsInfo, nextRegionID: nextRegionID, } } @@ -142,16 +150,14 @@ func (c *testClient) GetOperator(ctx context.Context, regionID uint64) (*pdpb.Ge } func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*RegionInfo, error) { - regions := make([]*RegionInfo, 0) - for _, region := range c.regions { - if limit > 0 && len(regions) >= limit { - break - } - if (len(region.Region.GetEndKey()) != 0 && bytes.Compare(region.Region.GetEndKey(), key) <= 0) || - bytes.Compare(region.Region.GetStartKey(), endKey) > 0 { - continue - } - regions = append(regions, region) + infos := c.regionsInfo.ScanRange(key, endKey, limit) + regions := make([]*RegionInfo, 0, len(infos)) + for _, info := range infos { + fmt.Printf("region %v\n", info) + regions = append(regions, &RegionInfo{ + Region: info.GetMeta(), + Leader: info.GetLeader(), + }) } return regions, nil } diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 63ee92969..811cfecdd 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -3,6 +3,7 @@ package restore import ( "bytes" "context" + "fmt" "strings" "time" @@ -333,3 +334,37 @@ func escapeTableName(cis model.CIStr) string { quote := "`" return quote + strings.Replace(cis.O, quote, quote+quote, -1) + quote } + +// paginateScanRegion scan regions with a limit pagination and +// return all regions at once. +// It reduces max gRPC message size. +func paginateScanRegion( + ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, +) ([]*RegionInfo, error) { + regions := []*RegionInfo{} + + if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 { + log.Fatal("startKey >= endKey", + zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) + } + + for { + batch, err := client.ScanRegions(ctx, startKey, endKey, limit) + if err != nil { + return nil, errors.Trace(err) + } + fmt.Println("scan regions length", len(batch)) + regions = append(regions, batch...) + if len(batch) < limit { + // No more region + break + } + startKey = batch[len(batch)-1].Region.GetEndKey() + if len(startKey) == 0 || + (len(endKey) > 0 && bytes.Compare(startKey, endKey) >= 0) { + // All key space have scanned + break + } + } + return regions, nil +} diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index bc4da9168..5b2168e3f 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -1,11 +1,15 @@ package restore import ( + "context" + "encoding/binary" + . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" ) var _ = Suite(&testRestoreUtilSuite{}) @@ -103,3 +107,102 @@ func (s *testRestoreUtilSuite) TestValidateFileRanges(c *C) { ) c.Assert(err, ErrorMatches, "unexpected rewrite rules") } + +func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) { + peers := make([]*metapb.Peer, 1) + peers[0] = &metapb.Peer{ + Id: 1, + StoreId: 1, + } + stores := make(map[uint64]*metapb.Store) + stores[1] = &metapb.Store{ + Id: 1, + } + + makeRegions := func(num uint64) (map[uint64]*RegionInfo, []*RegionInfo) { + regionsMap := make(map[uint64]*RegionInfo, num) + regions := make([]*RegionInfo, 0, num) + endKey := make([]byte, 8) + for i := uint64(0); i < num-1; i++ { + ri := &RegionInfo{ + Region: &metapb.Region{ + Id: i + 1, + Peers: peers, + }, + } + + if i != 0 { + startKey := make([]byte, 8) + binary.BigEndian.PutUint64(startKey, i) + ri.Region.StartKey = codec.EncodeBytes([]byte{}, startKey) + } + endKey = make([]byte, 8) + binary.BigEndian.PutUint64(endKey, i+1) + ri.Region.EndKey = codec.EncodeBytes([]byte{}, endKey) + + regionsMap[i] = ri + regions = append(regions, ri) + } + + if num == 1 { + endKey = []byte{} + } else { + endKey = codec.EncodeBytes([]byte{}, endKey) + } + ri := &RegionInfo{ + Region: &metapb.Region{ + Id: num, + Peers: peers, + StartKey: endKey, + EndKey: []byte{}, + }, + } + regionsMap[num] = ri + regions = append(regions, ri) + + return regionsMap, regions + } + + ctx := context.Background() + regionMap := make(map[uint64]*RegionInfo) + regions := []*RegionInfo{} + batch, err := paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(1) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(2) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(3) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(8) + batch, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{}, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions) + + regionMap, regions = makeRegions(8) + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, []byte{}, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[1:]) + + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), []byte{}, regions[6].Region.EndKey, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[:7]) + + batch, err = paginateScanRegion( + ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, regions[1].Region.EndKey, 3) + c.Assert(err, IsNil) + c.Assert(batch, DeepEquals, regions[1:2]) +} From 01ad07ae83e040cafa79c5013e7d54f39b6d05fe Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 21 Feb 2020 16:02:37 +0800 Subject: [PATCH 2/8] address lints Signed-off-by: Neil Shen --- pkg/conn/conn.go | 3 ++- pkg/restore/split.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 5098ad059..1fab81132 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -14,7 +14,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" @@ -28,6 +27,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/backoff" "google.golang.org/grpc/keepalive" + + "github.com/pingcap/br/pkg/utils" ) const ( diff --git a/pkg/restore/split.go b/pkg/restore/split.go index c0f421c2a..e1d7a84f6 100644 --- a/pkg/restore/split.go +++ b/pkg/restore/split.go @@ -87,9 +87,9 @@ func (rs *RegionSplitter) Split( scatterRegions := make([]*RegionInfo, 0) SplitRegions: for i := 0; i < SplitRetryTimes; i++ { - regions, err := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) - if err != nil { - return errors.Trace(err) + regions, err1 := paginateScanRegion(ctx, rs.client, minKey, maxKey, scanRegionPaginationLimit) + if err1 != nil { + return errors.Trace(err1) } if len(regions) == 0 { log.Warn("cannot scan any region") From 3eb663910661e4682e683f89ce2af7d0f04bd5f0 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Sat, 22 Feb 2020 15:03:56 +0800 Subject: [PATCH 3/8] claen up Signed-off-by: Neil Shen --- pkg/restore/split_test.go | 2 -- pkg/restore/util.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/pkg/restore/split_test.go b/pkg/restore/split_test.go index 62e7f2c76..cdb1e936b 100644 --- a/pkg/restore/split_test.go +++ b/pkg/restore/split_test.go @@ -3,7 +3,6 @@ package restore import ( "bytes" "context" - "fmt" "sync" . "github.com/pingcap/check" @@ -153,7 +152,6 @@ func (c *testClient) ScanRegions(ctx context.Context, key, endKey []byte, limit infos := c.regionsInfo.ScanRange(key, endKey, limit) regions := make([]*RegionInfo, 0, len(infos)) for _, info := range infos { - fmt.Printf("region %v\n", info) regions = append(regions, &RegionInfo{ Region: info.GetMeta(), Leader: info.GetLeader(), diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 811cfecdd..08f3481da 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -3,7 +3,6 @@ package restore import ( "bytes" "context" - "fmt" "strings" "time" @@ -353,7 +352,6 @@ func paginateScanRegion( if err != nil { return nil, errors.Trace(err) } - fmt.Println("scan regions length", len(batch)) regions = append(regions, batch...) if len(batch) < limit { // No more region From 478e4a68e6ba063406b086526a3efac0b8ada1fb Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 24 Feb 2020 14:15:20 +0800 Subject: [PATCH 4/8] address comment Signed-off-by: Neil Shen --- pkg/restore/util.go | 8 ++++---- pkg/restore/util_test.go | 3 +++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 548d10bcf..747b7e34c 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -3,6 +3,7 @@ package restore import ( "bytes" "context" + "encoding/hex" "strings" "time" @@ -331,13 +332,12 @@ func encodeKeyPrefix(key []byte) []byte { func paginateScanRegion( ctx context.Context, client SplitClient, startKey, endKey []byte, limit int, ) ([]*RegionInfo, error) { - regions := []*RegionInfo{} - if len(endKey) != 0 && bytes.Compare(startKey, endKey) >= 0 { - log.Fatal("startKey >= endKey", - zap.Binary("startKey", startKey), zap.Binary("endKey", endKey)) + return nil, errors.Errorf("startKey >= endKey, startKey %s, endkey %s", + hex.EncodeToString(startKey), hex.EncodeToString(endKey)) } + regions := []*RegionInfo{} for { batch, err := client.ScanRegions(ctx, startKey, endKey, limit) if err != nil { diff --git a/pkg/restore/util_test.go b/pkg/restore/util_test.go index 5b2168e3f..1b5e86b96 100644 --- a/pkg/restore/util_test.go +++ b/pkg/restore/util_test.go @@ -205,4 +205,7 @@ func (s *testRestoreUtilSuite) TestPaginateScanRegion(c *C) { ctx, newTestClient(stores, regionMap, 0), regions[1].Region.StartKey, regions[1].Region.EndKey, 3) c.Assert(err, IsNil) c.Assert(batch, DeepEquals, regions[1:2]) + + _, err = paginateScanRegion(ctx, newTestClient(stores, regionMap, 0), []byte{2}, []byte{1}, 3) + c.Assert(err, ErrorMatches, "startKey >= endKey.*") } From fdb23ec96f8a4012ca77da566841550aadb92c4d Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 2 Mar 2020 14:10:58 +0800 Subject: [PATCH 5/8] restore: log leader on ingest Signed-off-by: Neil Shen --- pkg/restore/import.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 6eda691b3..c7df7d43e 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -306,7 +306,7 @@ func (importer *FileImporter) ingestSST( Context: reqCtx, Sst: sstMeta, } - log.Debug("download SST", zap.Stringer("sstMeta", sstMeta)) + log.Debug("download SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader)) resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) if err != nil { if strings.Contains(err.Error(), "RegionNotFound") { From d2e120f2a2eb7603cab72c43b8e75263d02fb9d7 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 2 Mar 2020 14:16:20 +0800 Subject: [PATCH 6/8] restore: correct log message Signed-off-by: Neil Shen --- pkg/restore/import.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index c7df7d43e..9672c9430 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -306,7 +306,7 @@ func (importer *FileImporter) ingestSST( Context: reqCtx, Sst: sstMeta, } - log.Debug("download SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader)) + log.Debug("ingest SST", zap.Stringer("sstMeta", sstMeta), zap.Reflect("leader", leader)) resp, err := importer.importClient.IngestSST(importer.ctx, leader.GetStoreId(), req) if err != nil { if strings.Contains(err.Error(), "RegionNotFound") { From 8b3e8ed64ea96ce6f4ef4b85faf85efebf126501 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 3 Mar 2020 13:43:40 +0800 Subject: [PATCH 7/8] restore: log new region on not leader Signed-off-by: Neil Shen --- pkg/restore/import.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 9672c9430..84d515263 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -204,10 +204,11 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *RewriteRul err1 = importer.ingestSST(downloadMeta, info) // If error is `NotLeader`, update the region info and retry for errors.Cause(err1) == errNotLeader { - log.Debug("ingest sst returns not leader error, retry it", - zap.Stringer("region", info.Region)) var newInfo *RegionInfo newInfo, err1 = importer.metaClient.GetRegion(importer.ctx, info.Region.GetStartKey()) + log.Debug("ingest sst returns not leader error, retry it", + zap.Stringer("region", info.Region), + zap.Stringer("newRegion", newInfo.Region)) if err1 != nil { break } From d5563eaf11f5f286fb99ae984d218c52fc3acd7a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Thu, 5 Mar 2020 14:15:27 +0800 Subject: [PATCH 8/8] tests: large timeout Signed-off-by: Neil Shen --- tests/_utils/run_services | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/tests/_utils/run_services b/tests/_utils/run_services index 1118d7ccc..769b9b22a 100644 --- a/tests/_utils/run_services +++ b/tests/_utils/run_services @@ -48,7 +48,7 @@ start_services() { i=0 while ! curl -o /dev/null -sf "http://$PD_ADDR/pd/api/v1/version"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start PD' exit 1 fi @@ -70,7 +70,7 @@ start_services() { echo "Waiting initializing TiKV..." while ! curl -sf "http://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to initialize TiKV cluster' exit 1 fi @@ -90,7 +90,7 @@ start_services() { i=0 while ! curl -o /dev/null -sf "http://$TIDB_IP:10080/status"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start TiDB' exit 1 fi @@ -100,7 +100,7 @@ start_services() { i=0 while ! curl "http://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to bootstrap cluster' exit 1 fi @@ -132,7 +132,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -o /dev/null -sf "https://$PD_ADDR/pd/api/v1/version"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start PD' exit 1 fi @@ -155,7 +155,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -sf "https://$PD_ADDR/pd/api/v1/cluster/status" | grep '"is_initialized": true'; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to initialize TiKV cluster' exit 1 fi @@ -178,7 +178,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ -o /dev/null -sf "https://$TIDB_IP:10080/status"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to start TiDB' exit 1 fi @@ -191,7 +191,7 @@ start_services_withTLS() { --key $1/certificates/client-key.pem \ "https://$PD_ADDR/pd/api/v1/cluster/status" -sf | grep -q "\"is_initialized\": true"; do i=$((i+1)) - if [ "$i" -gt 10 ]; then + if [ "$i" -gt 20 ]; then echo 'Failed to bootstrap cluster' exit 1 fi