From f240aedb94b405ecf45ab43f64b402b8fa5cbf03 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 8 Nov 2019 12:19:50 +0800 Subject: [PATCH] *: address lints (#40) * *: address lints Signed-off-by: Neil Shen * MarkPersistentFlagRequired Signed-off-by: Neil Shen * fix tests and goimports Signed-off-by: Neil Shen * do not require pd and storage Signed-off-by: Neil Shen * address comments Signed-off-by: Neil Shen * Remove makefile variable Signed-off-by: Neil Shen --- .golangci.yml | 11 +++++++ Makefile | 12 ++++--- cmd/cmd.go | 7 ++-- cmd/meta.go | 41 ++++-------------------- cmd/raw.go | 28 ++++++++++------ cmd/restore.go | 24 ++++++++++---- cmd/version.go | 3 +- pkg/meta/meta.go | 2 -- pkg/raw/full.go | 57 +++++++++++++++++++-------------- pkg/raw/push.go | 3 +- pkg/raw/range_tree.go | 23 ++++---------- pkg/restore/client.go | 67 ++++++++++++++++++++++++++------------- pkg/restore/db.go | 7 ++-- pkg/restore/db_test.go | 13 +++++--- pkg/restore/import.go | 26 +++++++++------ pkg/restore/util.go | 23 +++++++++++--- pkg/utils/storage.go | 3 +- pkg/utils/storage_test.go | 3 +- tools.json | 2 +- 19 files changed, 206 insertions(+), 149 deletions(-) create mode 100644 .golangci.yml diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000000000..0f012cb83723e --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,11 @@ +linters-settings: + gocyclo: + min-complexity: 40 + +issues: + # Excluding configuration per-path, per-linter, per-text and per-source + exclude-rules: + - path: _test\.go + text: "Potential HTTP request made with variable url" + linters: + - gosec diff --git a/Makefile b/Makefile index dbc50c7e4bf80..b6430f10a2fca 100644 --- a/Makefile +++ b/Makefile @@ -42,13 +42,15 @@ check: tools check-all static: export GO111MODULE=on static: @ # Not running vet and fmt through metalinter becauase it ends up looking at vendor - gofmt -s -l $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(GOCHECKER) + retool do goimports -w -d -format-only -local $(BR_PKG) $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(GOCHECKER) retool do govet --shadow $$($(PACKAGE_DIRECTORIES)) 2>&1 | $(GOCHECKER) - CGO_ENABLED=0 retool do golangci-lint run --disable-all --deadline 120s \ - --enable misspell \ - --enable staticcheck \ - --enable ineffassign \ + CGO_ENABLED=0 retool do golangci-lint run --enable-all --deadline 120s \ + --disable gochecknoglobals \ + --disable gochecknoinits \ + --disable interfacer \ + --disable goimports \ + --disable gofmt \ $$($(PACKAGE_DIRECTORIES)) lint: diff --git a/cmd/cmd.go b/cmd/cmd.go index 1ba521e794536..ff24ced9d7988 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -5,11 +5,12 @@ import ( "net/http" "sync" - "github.com/pingcap/br/pkg/meta" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/spf13/cobra" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/meta" ) var ( @@ -54,8 +55,6 @@ func AddFlags(cmd *cobra.Command) { "Set the log file path. If not set, logs will output to stdout") cmd.PersistentFlags().String(FlagStatusAddr, "", "Set the HTTP listening address for the status report service. Set to empty string to disable") - cmd.MarkFlagRequired(FlagPD) - cmd.MarkFlagRequired(FlagStorage) } // Init ... @@ -100,7 +99,7 @@ func Init(cmd *cobra.Command) (err error) { return } }) - return + return err } // GetDefaultBacker returns the default backer for command line usage. diff --git a/cmd/meta.go b/cmd/meta.go index c14db2fa466cc..f6b5aafa7f289 100644 --- a/cmd/meta.go +++ b/cmd/meta.go @@ -6,10 +6,11 @@ import ( "encoding/hex" "github.com/gogo/protobuf/proto" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/spf13/cobra" + + "github.com/pingcap/br/pkg/utils" ) // NewMetaCommand return a meta subcommand. @@ -25,39 +26,6 @@ func NewMetaCommand() *cobra.Command { return nil }, } - meta.AddCommand(&cobra.Command{ - Use: "version", - Short: "show cluster version", - RunE: func(cmd *cobra.Command, _ []string) error { - backer, err := GetDefaultBacker() - if err != nil { - return err - } - v, err := backer.GetClusterVersion() - if err != nil { - return err - } - cmd.Println(v) - return nil - }, - }) - meta.AddCommand(&cobra.Command{ - Use: "safepoint", - Short: "show the current GC safepoint of cluster", - RunE: func(cmd *cobra.Command, _ []string) error { - backer, err := GetDefaultBacker() - if err != nil { - return err - } - sp, err := backer.GetGCSafePoint() - if err != nil { - return err - } - cmd.Printf("Timestamp { Physical: %d, Logical: %d }\n", - sp.Physical, sp.Logical) - return nil - }, - }) meta.AddCommand(&cobra.Command{ Use: "checksum", Short: "check the backup data", @@ -94,7 +62,10 @@ func NewMetaCommand() *cobra.Command { hexBytes := make([]byte, hex.EncodedLen(len(s))) hex.Encode(hexBytes, s[:]) if !bytes.Equal(hexBytes, file.Sha256) { - return errors.Errorf("backup data checksum failed: %s may be changed\n calculated sha256 is %s\n, origin sha256 is %s", file.Name, s, file.Sha256) + return errors.Errorf(` +backup data checksum failed: %s may be changed +calculated sha256 is %s, +origin sha256 is %s`, file.Name, s, file.Sha256) } } diff --git a/cmd/raw.go b/cmd/raw.go index e00c550246bcb..43c07be2ec521 100644 --- a/cmd/raw.go +++ b/cmd/raw.go @@ -3,16 +3,17 @@ package cmd import ( "context" - "github.com/pingcap/br/pkg/raw" - "github.com/pingcap/br/pkg/utils" "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/spf13/cobra" + + "github.com/pingcap/br/pkg/raw" + "github.com/pingcap/br/pkg/utils" ) // NewBackupCommand return a full backup subcommand. func NewBackupCommand() *cobra.Command { - bp := &cobra.Command{ + command := &cobra.Command{ Use: "backup", Short: "backup a TiKV cluster", PersistentPreRunE: func(c *cobra.Command, args []string) error { @@ -23,18 +24,21 @@ func NewBackupCommand() *cobra.Command { return nil }, } - bp.AddCommand( + command.AddCommand( newFullBackupCommand(), newTableBackupCommand(), ) - bp.PersistentFlags().StringP("timeago", "", "", "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") + command.PersistentFlags().StringP( + "timeago", "", "", + "The history version of the backup task, e.g. 1m, 1h. Do not exceed GCSafePoint") - bp.PersistentFlags().Uint64P( + command.PersistentFlags().Uint64P( "ratelimit", "", 0, "The rate limit of the backup task, MB/s per node") - bp.PersistentFlags().Uint32P( + command.PersistentFlags().Uint32P( "concurrency", "", 4, "The size of thread pool on each node that execute the backup task") - return bp + + return command } // newFullBackupCommand return a full backup subcommand. @@ -238,7 +242,11 @@ func newTableBackupCommand() *cobra.Command { } command.Flags().StringP("db", "", "", "backup a table in the specific db") command.Flags().StringP("table", "t", "", "backup the specific table") - command.MarkFlagRequired("db") - command.MarkFlagRequired("table") + if err := command.MarkFlagRequired("db"); err != nil { + panic(err) + } + if err := command.MarkFlagRequired("table"); err != nil { + panic(err) + } return command } diff --git a/cmd/restore.go b/cmd/restore.go index 657654da64ac9..b5cc896caddf5 100644 --- a/cmd/restore.go +++ b/cmd/restore.go @@ -122,7 +122,9 @@ func newFullRestoreCommand() *cobra.Command { command.Flags().String("connect", "", "the address to connect tidb, format: username:password@protocol(address)/") command.Flags().Uint("concurrency", 128, "The size of thread pool that execute the restore task") - command.MarkFlagRequired("connect") + if err := command.MarkFlagRequired("connect"); err != nil { + panic(err) + } return command } @@ -213,8 +215,12 @@ func newDbRestoreCommand() *cobra.Command { command.Flags().String("db", "", "database name") - command.MarkFlagRequired("connect") - command.MarkFlagRequired("db") + if err := command.MarkFlagRequired("connect"); err != nil { + panic(err) + } + if err := command.MarkFlagRequired("db"); err != nil { + panic(err) + } return command } @@ -310,9 +316,15 @@ func newTableRestoreCommand() *cobra.Command { command.Flags().String("db", "", "database name") command.Flags().String("table", "", "table name") - command.MarkFlagRequired("connect") - command.MarkFlagRequired("db") - command.MarkFlagRequired("table") + if err := command.MarkFlagRequired("connect"); err != nil { + panic(err) + } + if err := command.MarkFlagRequired("db"); err != nil { + panic(err) + } + if err := command.MarkFlagRequired("table"); err != nil { + panic(err) + } return command } diff --git a/cmd/version.go b/cmd/version.go index efe94dd4434fa..af4f7d386a228 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -1,8 +1,9 @@ package cmd import ( - "github.com/pingcap/br/pkg/utils" "github.com/spf13/cobra" + + "github.com/pingcap/br/pkg/utils" ) // NewVersionCommand returns a restore subcommand diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 7bc9b1a65f862..3cdc85e0f39b3 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -143,8 +143,6 @@ func (backer *Backer) GetClusterVersion() (string, error) { // GetRegionCount returns the total region count in the cluster func (backer *Backer) GetRegionCount() (int, error) { - var regionCountPrefix = "pd/api/v1/regions/count" - var err error for _, addr := range backer.pdHTTP.addrs { v, e := backer.PDHTTPGet(addr, regionCountPrefix, backer.pdHTTP.cli) diff --git a/pkg/raw/full.go b/pkg/raw/full.go index 2e588ce336e3f..f0a2ba8acbe8d 100644 --- a/pkg/raw/full.go +++ b/pkg/raw/full.go @@ -155,10 +155,8 @@ func (bc *BackupClient) GetBackupTableRanges( } tableInfo.AutoIncID = globalAutoID - var tblChecksum, tblKvs, tblBytes uint64 - dbSession.GetSessionVars().SnapshotTS = backupTS - tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo) + tblChecksum, err := bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo) if err != nil { return nil, errors.Trace(err) } @@ -177,9 +175,9 @@ func (bc *BackupClient) GetBackupTableRanges( backupSchema := &backup.Schema{ Db: dbData, Table: tableData, - Crc64Xor: tblChecksum, - TotalKvs: tblKvs, - TotalBytes: tblBytes, + Crc64Xor: tblChecksum.checksum, + TotalKvs: tblChecksum.totalKvs, + TotalBytes: tblChecksum.totalBytes, } log.Info("save table schema", zap.Stringer("db", dbInfo.Name), @@ -272,9 +270,7 @@ LoadDb: } idAlloc := autoid.NewAllocator(bc.backer.GetTiKV(), dbInfo.ID, false) for _, tableInfo := range dbInfo.Tables { - var tblChecksum, tblKvs, tblBytes uint64 - - tblChecksum, tblKvs, tblBytes, err = bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo) + tblChecksum, err := bc.getChecksumFromTiDB(dbSession, dbInfo, tableInfo) if err != nil { return nil, errors.Trace(err) } @@ -292,9 +288,9 @@ LoadDb: backupSchema := &backup.Schema{ Db: dbData, Table: tableData, - Crc64Xor: tblChecksum, - TotalKvs: tblKvs, - TotalBytes: tblBytes, + Crc64Xor: tblChecksum.checksum, + TotalKvs: tblChecksum.totalKvs, + TotalBytes: tblChecksum.totalBytes, } log.Info("save table schema", zap.Stringer("db", dbInfo.Name), @@ -702,19 +698,30 @@ func (bc *BackupClient) FastChecksum() (bool, error) { return true, nil } -func (bc *BackupClient) getChecksumFromTiDB(dbSession session.Session, dbl *model.DBInfo, tbl *model.TableInfo) (checksum uint64, totalKvs uint64, totalBytes uint64, err error) { +type tableChecksum struct { + checksum uint64 + totalKvs uint64 + totalBytes uint64 +} + +func (bc *BackupClient) getChecksumFromTiDB( + dbSession session.Session, + dbl *model.DBInfo, + tbl *model.TableInfo, +) (*tableChecksum, error) { var recordSets []sqlexec.RecordSet // TODO figure out why // must set to true to avoid load global vars, otherwise we got error dbSession.GetSessionVars().CommonGlobalLoaded = true - recordSets, err = dbSession.Execute(bc.ctx, fmt.Sprintf("ADMIN CHECKSUM TABLE %s.%s", utils.EncloseName(dbl.Name.L), utils.EncloseName(tbl.Name.L))) + recordSets, err := dbSession.Execute(bc.ctx, fmt.Sprintf( + "ADMIN CHECKSUM TABLE %s.%s", utils.EncloseName(dbl.Name.L), utils.EncloseName(tbl.Name.L))) if err != nil { - return 0, 0, 0, errors.Trace(err) + return nil, errors.Trace(err) } records, err := utils.ResultSetToStringSlice(bc.ctx, dbSession, recordSets[0]) if err != nil { - return 0, 0, 0, errors.Trace(err) + return nil, errors.Trace(err) } log.Info("tidb table checksum", zap.String("db", dbl.Name.L), @@ -723,17 +730,21 @@ func (bc *BackupClient) getChecksumFromTiDB(dbSession session.Session, dbl *mode ) record := records[0] - checksum, err = strconv.ParseUint(record[2], 10, 64) + checksum, err := strconv.ParseUint(record[2], 10, 64) if err != nil { - return 0, 0, 0, errors.Trace(err) + return nil, errors.Trace(err) } - totalKvs, err = strconv.ParseUint(record[3], 10, 64) + totalKvs, err := strconv.ParseUint(record[3], 10, 64) if err != nil { - return 0, 0, 0, errors.Trace(err) + return nil, errors.Trace(err) } - totalBytes, err = strconv.ParseUint(record[4], 10, 64) + totalBytes, err := strconv.ParseUint(record[4], 10, 64) if err != nil { - return 0, 0, 0, errors.Trace(err) + return nil, errors.Trace(err) } - return + return &tableChecksum{ + checksum: checksum, + totalKvs: totalKvs, + totalBytes: totalBytes, + }, nil } diff --git a/pkg/raw/push.go b/pkg/raw/push.go index 1fc73bf58bd96..644040d17f1f1 100644 --- a/pkg/raw/push.go +++ b/pkg/raw/push.go @@ -4,12 +4,13 @@ import ( "context" "sync" - "github.com/pingcap/br/pkg/meta" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/backup" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "go.uber.org/zap" + + "github.com/pingcap/br/pkg/meta" ) // pushDown warps a backup task. diff --git a/pkg/raw/range_tree.go b/pkg/raw/range_tree.go index eba59622a92ef..25c820cc23117 100644 --- a/pkg/raw/range_tree.go +++ b/pkg/raw/range_tree.go @@ -35,13 +35,14 @@ func (rg *Range) intersect( } else { subStart = rg.StartKey } - if len(end) == 0 { + switch { + case len(end) == 0: subEnd = rg.EndKey - } else if len(rg.EndKey) == 0 { + case len(rg.EndKey) == 0: subEnd = end - } else if bytes.Compare(end, rg.EndKey) < 0 { + case bytes.Compare(end, rg.EndKey) < 0: subEnd = end - } else { + default: subEnd = rg.EndKey } return @@ -130,18 +131,6 @@ func (rangeTree *RangeTree) update(rg *Range) { rangeTree.tree.Delete(item) } rangeTree.tree.ReplaceOrInsert(rg) - return -} - -func (rangeTree *RangeTree) putErr( - startKey, endKey []byte, err *backup.Error, -) { - rg := &Range{ - StartKey: startKey, - EndKey: endKey, - Error: err, - } - rangeTree.update(rg) } func (rangeTree *RangeTree) putOk( @@ -158,7 +147,7 @@ func (rangeTree *RangeTree) putOk( func (rangeTree *RangeTree) getIncompleteRange( startKey, endKey []byte, ) []Range { - if len(startKey) != 0 && bytes.Compare(startKey, endKey) == 0 { + if len(startKey) != 0 && bytes.Equal(startKey, endKey) { return []Range{} } incomplete := make([]Range, 0, 64) diff --git a/pkg/restore/client.go b/pkg/restore/client.go index e1b4d3d5f2ac0..a338922528323 100644 --- a/pkg/restore/client.go +++ b/pkg/restore/client.go @@ -62,16 +62,19 @@ func NewRestoreClient(ctx context.Context, pdAddrs string) (*Client, error) { addrs := strings.Split(pdAddrs, ",") backer, err := meta.NewBacker(ctx, addrs[0]) if err != nil { + cancel() return nil, errors.Trace(err) } pdClient, err := pd.NewClient(addrs, pd.SecurityOption{}) if err != nil { + cancel() return nil, errors.Trace(err) } tikvCli, err := tikv.Driver{}.Open( // Disable GC because TiDB enables GC already. fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs)) if err != nil { + cancel() return nil, errors.Trace(err) } @@ -403,7 +406,8 @@ func (rc *Client) ValidateChecksum(rewriteRules []*import_sstpb.RewriteRule) err log.Info("Restore Checksum", zap.Duration("take", elapsed)) }() - var tables []*utils.Table + // Assume one database one table. + tables := make([]*utils.Table, 0, len(rc.databases)) for _, db := range rc.databases { tables = append(tables, db.Tables...) } @@ -472,7 +476,12 @@ func (rc *Client) ValidateChecksum(rewriteRules []*import_sstpb.RewriteRule) err } // checksum key range of [boundedStart, boundedEnd) -func (rc *Client) checksumRange(triedTime int, boundedStart []byte, boundedEnd []byte, reqData []byte) (*tipb.ChecksumResponse, error) { +func (rc *Client) checksumRange( + triedTime int, + boundedStart []byte, + boundedEnd []byte, + reqData []byte, +) (*tipb.ChecksumResponse, error) { if triedTime >= tikvChecksumRetryTimes { return nil, errors.New("exceeded checksum retry time") } @@ -480,7 +489,12 @@ func (rc *Client) checksumRange(triedTime int, boundedStart []byte, boundedEnd [ resCh := make(chan tipb.ChecksumResponse) errCh := make(chan error) wg := sync.WaitGroup{} - regions, peers, err := rc.pdClient.ScanRegions(rc.ctx, codec.EncodeBytes([]byte{}, boundedStart), codec.EncodeBytes([]byte{}, boundedEnd), 10000) + regions, peers, err := rc.pdClient.ScanRegions( + rc.ctx, + codec.EncodeBytes([]byte{}, boundedStart), + codec.EncodeBytes([]byte{}, boundedEnd), + 10000, + ) if err != nil { return nil, errors.Trace(err) } @@ -526,7 +540,14 @@ func (rc *Client) checksumRange(triedTime int, boundedStart []byte, boundedEnd [ } // checksum key range [start, end) in region with retry -func (rc *Client) checksumRegion(triedTime int, start *[]byte, end *[]byte, region *metapb.Region, peer *metapb.Peer, reqData []byte) (*tipb.ChecksumResponse, error) { +func (rc *Client) checksumRegion( + triedTime int, + start *[]byte, + end *[]byte, + region *metapb.Region, + peer *metapb.Peer, + reqData []byte, +) (*tipb.ChecksumResponse, error) { reqCtx := &kvrpcpb.Context{ RegionId: region.GetId(), RegionEpoch: region.GetRegionEpoch(), @@ -597,35 +618,37 @@ func getTableRewriteRule(tid int64, rules []*import_sstpb.RewriteRule) *tipb.Che } // get intersect key range of [start, end] and [region.StartKey, region.EndKey] -func getIntersectRange(start *[]byte, end *[]byte, region *metapb.Region) (innerStart *[]byte, innerEnd *[]byte, err error) { - switch { - case len(region.GetStartKey()) < 9: // 8 (encode group size) + 1 +func getIntersectRange( + start *[]byte, + end *[]byte, + region *metapb.Region, +) (innerStart *[]byte, innerEnd *[]byte, err error) { + if len(region.GetStartKey()) < 9 { // 8 (encode group size) + 1 innerStart = start - default: + } else { _, regionStart, err := codec.DecodeBytes(region.GetStartKey(), nil) - switch { - case err != nil: - return nil, nil, errors.Trace(err) - case bytes.Compare(regionStart, *start) < 0: + if err != nil { + return nil, nil, err + } + if bytes.Compare(regionStart, *start) < 0 { innerStart = start - default: + } else { innerStart = ®ionStart } } - switch { - case len(region.GetEndKey()) < 9: // 8 (encode group size) + 1 + if len(region.GetEndKey()) < 9 { // 8 (encode group size) + 1 innerEnd = end - default: + } else { _, regionEnd, err := codec.DecodeBytes(region.GetEndKey(), nil) - switch { - case err != nil: - return nil, nil, errors.Trace(err) - case bytes.Compare(regionEnd, *end) < 0: + if err != nil { + return nil, nil, err + } + if bytes.Compare(regionEnd, *end) < 0 { innerEnd = ®ionEnd - default: + } else { innerEnd = end } } - return + return innerStart, innerEnd, nil } diff --git a/pkg/restore/db.go b/pkg/restore/db.go index bdebc17bc4caa..05a1e0f36d638 100644 --- a/pkg/restore/db.go +++ b/pkg/restore/db.go @@ -194,11 +194,12 @@ func GetCreateTableSQL(t *model.TableInfo) string { } for i, idx := range publicIndices { - if idx.Primary { + switch { + case idx.Primary: buf.WriteString(" PRIMARY KEY ") - } else if idx.Unique { + case idx.Unique: fmt.Fprintf(&buf, " UNIQUE KEY %s ", utils.EncloseName(idx.Name.String())) - } else { + default: fmt.Fprintf(&buf, " KEY %s ", utils.EncloseName(idx.Name.String())) } diff --git a/pkg/restore/db_test.go b/pkg/restore/db_test.go index afa04406bec24..b80fc34fc8dff 100644 --- a/pkg/restore/db_test.go +++ b/pkg/restore/db_test.go @@ -90,11 +90,15 @@ func (s *testRestoreSchemaSuite) startServer(c *C) { svr, err := server.NewServer(cfg, s.tidbdrv) c.Assert(err, IsNil) s.server = svr - go svr.Run() + go func() { + if err1 := svr.Run(); err != nil { + panic(err1) + } + }() waitUntilServerOnline(cfg.Status.StatusPort) } -func (s *testRestoreSchemaSuite) stopServer(c *C) { +func (s *testRestoreSchemaSuite) stopServer(*C) { if s.dom != nil { s.dom.Close() } @@ -185,8 +189,9 @@ func waitUntilServerOnline(statusPort uint) { for retry = 0; retry < retryTime; retry++ { resp, err := http.Get(statusURL) if err == nil { - ioutil.ReadAll(resp.Body) - resp.Body.Close() + // Ignore errors. + _, _ = ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() break } time.Sleep(time.Millisecond * 10) diff --git a/pkg/restore/import.go b/pkg/restore/import.go index 17c05c3cc6b60..bbaba596ea3dd 100644 --- a/pkg/restore/import.go +++ b/pkg/restore/import.go @@ -106,23 +106,22 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut ) continue } + info := regionInfo err = withRetry(func() error { - err = importer.ingestSST(fileMeta, regionInfo, rewriteRules) + err = importer.ingestSST(fileMeta, info) if err != nil { log.Warn("ingest file failed", zap.Reflect("file", file), zap.Reflect("range", fileMeta.GetRange()), - zap.Reflect("region", regionInfo.Region), + zap.Reflect("region", info.Region), zap.Error(err), ) return err } return nil }, func(e error) bool { - if e == errEpochNotMatch { - return false - } - return true + // Do not retry if epoch not match. + return e != errEpochNotMatch }, downloadSSTRetryTimes, downloadSSTWaitInterval, downloadSSTMaxWaitInterval) if err != nil { return err @@ -135,7 +134,9 @@ func (importer *FileImporter) Import(file *backup.File, rewriteRules *restore_ut return errors.Trace(err) } -func (importer *FileImporter) getImportClient(storeID uint64) (import_sstpb.ImportSSTClient, error) { +func (importer *FileImporter) getImportClient( + storeID uint64, +) (import_sstpb.ImportSSTClient, error) { importer.mu.Lock() defer importer.mu.Unlock() client, ok := importer.importClients[storeID] @@ -155,7 +156,11 @@ func (importer *FileImporter) getImportClient(storeID uint64) (import_sstpb.Impo return client, errors.Trace(err) } -func (importer *FileImporter) downloadSST(regionInfo *restore_util.RegionInfo, file *backup.File, rewriteRules *restore_util.RewriteRules) (*import_sstpb.SSTMeta, bool, error) { +func (importer *FileImporter) downloadSST( + regionInfo *restore_util.RegionInfo, + file *backup.File, + rewriteRules *restore_util.RewriteRules, +) (*import_sstpb.SSTMeta, bool, error) { id, err := uuid.New().MarshalBinary() if err != nil { return nil, true, errors.Trace(err) @@ -192,7 +197,10 @@ func (importer *FileImporter) downloadSST(regionInfo *restore_util.RegionInfo, f return &sstMeta, false, nil } -func (importer *FileImporter) ingestSST(fileMeta *import_sstpb.SSTMeta, regionInfo *restore_util.RegionInfo, rewriteRules *restore_util.RewriteRules) error { +func (importer *FileImporter) ingestSST( + fileMeta *import_sstpb.SSTMeta, + regionInfo *restore_util.RegionInfo, +) error { leader := regionInfo.Leader if leader == nil { leader = regionInfo.Region.GetPeers()[0] diff --git a/pkg/restore/util.go b/pkg/restore/util.go index 7ef2e75e19da7..3268fcb6cd9bc 100644 --- a/pkg/restore/util.go +++ b/pkg/restore/util.go @@ -77,7 +77,12 @@ func GetRewriteRules(newTable *model.TableInfo, oldTable *model.TableInfo) *rest // getSSTMetaFromFile compares the keys in file, region and rewrite rules, then returns a sst meta. // The range of the returned sst meta is [regionRule.NewKeyPrefix, append(regionRule.NewKeyPrefix, 0xff)] -func getSSTMetaFromFile(id []byte, file *backup.File, region *metapb.Region, regionRule *import_sstpb.RewriteRule) import_sstpb.SSTMeta { +func getSSTMetaFromFile( + id []byte, + file *backup.File, + region *metapb.Region, + regionRule *import_sstpb.RewriteRule, +) import_sstpb.SSTMeta { // Get the column family of the file by the file name. var cfName string if strings.Contains(file.GetName(), "default") { @@ -110,7 +115,13 @@ func getSSTMetaFromFile(id []byte, file *backup.File, region *metapb.Region, reg type retryableFunc func() error type continueFunc func(error) bool -func withRetry(retryableFunc retryableFunc, continueFunc continueFunc, attempts uint, delayTime time.Duration, maxDelayTime time.Duration) error { +func withRetry( + retryableFunc retryableFunc, + continueFunc continueFunc, + attempts uint, + delayTime time.Duration, + maxDelayTime time.Duration, +) error { var lastErr error for i := uint(0); i < attempts; i++ { err := retryableFunc() @@ -151,7 +162,10 @@ func GetRanges(files []*backup.File) []restore_util.Range { } // rules must be encoded -func findRegionRewriteRule(region *metapb.Region, rewriteRules *restore_util.RewriteRules) *import_sstpb.RewriteRule { +func findRegionRewriteRule( + region *metapb.Region, + rewriteRules *restore_util.RewriteRules, +) *import_sstpb.RewriteRule { for _, rule := range rewriteRules.Data { // regions may have the new prefix if bytes.HasPrefix(region.GetStartKey(), rule.GetNewKeyPrefix()) { @@ -185,7 +199,8 @@ func encodeRewriteRules(rewriteRules *restore_util.RewriteRules) *restore_util.R func encodeKeyPrefix(key []byte) []byte { encodedPrefix := make([]byte, 0) ungroupedLen := len(key) % 8 - encodedPrefix = append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) + encodedPrefix = + append(encodedPrefix, codec.EncodeBytes([]byte{}, key[:len(key)-ungroupedLen])...) return append(encodedPrefix[:len(encodedPrefix)-9], key[len(key)-ungroupedLen:]...) } diff --git a/pkg/utils/storage.go b/pkg/utils/storage.go index 544e32172577f..52245496d34bd 100644 --- a/pkg/utils/storage.go +++ b/pkg/utils/storage.go @@ -1,10 +1,11 @@ package utils import ( - "github.com/pingcap/errors" "io/ioutil" "net/url" "path" + + "github.com/pingcap/errors" ) // ExternalStorage represents a kind of file system storage diff --git a/pkg/utils/storage_test.go b/pkg/utils/storage_test.go index 7c70317c5a5a5..e8e49b5e961c0 100644 --- a/pkg/utils/storage_test.go +++ b/pkg/utils/storage_test.go @@ -1,8 +1,9 @@ package utils import ( - . "github.com/pingcap/check" "testing" + + . "github.com/pingcap/check" ) type testStorageSuite struct{} diff --git a/tools.json b/tools.json index 5b91770879026..d492d22b5c683 100644 --- a/tools.json +++ b/tools.json @@ -2,7 +2,7 @@ "Tools": [ { "Repository": "golang.org/x/tools/cmd/goimports", - "Commit": "04b5d21e00f1f47bd824a6ade581e7189bacde87" + "Commit": "0133cac3176f225883c5d817146de8633ed07ebc" }, { "Repository": "github.com/dnephin/govet",