diff --git a/lightning/backend/local.go b/lightning/backend/local.go index ca81581d9..b4b264509 100644 --- a/lightning/backend/local.go +++ b/lightning/backend/local.go @@ -23,6 +23,7 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "sync/atomic" "time" @@ -288,7 +289,7 @@ func (local *local) makeConn(ctx context.Context, storeID uint64) (*grpc.ClientC ) cancel() if err != nil { - return nil, errors.WithStack(err) + return nil, errors.Trace(err) } return conn, nil } @@ -500,7 +501,7 @@ func (local *local) WriteToTiKV( wstream, err := cli.Write(ctx) if err != nil { - return nil, nil, err + return nil, nil, errors.Trace(err) } // Bind uuid for this write request @@ -510,7 +511,7 @@ func (local *local) WriteToTiKV( }, } if err = wstream.Send(req); err != nil { - return nil, nil, err + return nil, nil, errors.Trace(err) } req.Chunk = &sst.WriteRequest_Batch{ Batch: &sst.WriteBatch{ @@ -589,7 +590,7 @@ func (local *local) WriteToTiKV( // if there is not leader currently, we should directly return an error if leaderPeerMetas == nil { - log.L().Error("write to tikv no leader", zap.Reflect("region", region), + log.L().Warn("write to tikv no leader", zap.Reflect("region", region), zap.Uint64("leader_id", leaderID), zap.Reflect("meta", meta), zap.Int("kv_pairs", totalCount), zap.Int64("total_bytes", size)) return nil, nil, errors.Errorf("write to tikv with no leader returned, region '%d', leader: %d", @@ -1085,11 +1086,17 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID) erro for { // split region by given ranges - err = local.SplitAndScatterRegionByRanges(ctx, ranges) + for i := 0; i < maxRetryTimes; i++ { + err = local.SplitAndScatterRegionByRanges(ctx, ranges) + if err == nil { + break + } + } if err != nil { log.L().Error("split & scatter ranges failed", zap.Error(err)) return err } + // start to write to kv and ingest err = local.WriteAndIngestByRanges(ctx, engineFile.(*LocalFile), ranges, remains) if err != nil { @@ -1124,7 +1131,7 @@ func (local *local) CleanupEngine(ctx context.Context, engineUUID uuid.UUID) err } local.engines.Delete(engineUUID) } else { - log.L().Error("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) + log.L().Warn("could not find engine in cleanupEngine", zap.Stringer("uuid", engineUUID)) } return nil } @@ -1202,7 +1209,27 @@ func (local *local) isIngestRetryable( return false, nil, nil } + getRegion := func() (*split.RegionInfo, error) { + for i := 0; ; i++ { + newRegion, err := local.splitCli.GetRegion(ctx, region.Region.GetStartKey()) + if err != nil { + return nil, errors.Trace(err) + } + if newRegion != nil { + return newRegion, nil + } + log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region), + zap.Int("retry", i)) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(time.Second): + } + } + } + var newRegion *split.RegionInfo + var err error switch errPb := resp.GetError(); { case errPb.NotLeader != nil: if newLeader := errPb.GetNotLeader().GetLeader(); newLeader != nil { @@ -1211,65 +1238,32 @@ func (local *local) isIngestRetryable( Region: region.Region, } } else { - var err error - for i := 0; ; i++ { - newRegion, err = local.splitCli.GetRegion(ctx, region.Region.GetStartKey()) - if err != nil { - return false, nil, errors.Trace(err) - } - if newRegion != nil { - break - } - log.L().Warn("get region by key return nil, will retry", zap.Reflect("region", region), - zap.Int("retry", i)) - time.Sleep(time.Second) + newRegion, err = getRegion() + if err != nil { + return false, nil, errors.Trace(err) } } return true, newRegion, errors.Errorf("not leader: %s", errPb.GetMessage()) - case errPb.EpochNotMatch != nil: - if currentRegions := errPb.GetEpochNotMatch().GetCurrentRegions(); currentRegions != nil { - var currentRegion *metapb.Region - for _, r := range currentRegions { - if insideRegion(r, meta) { - currentRegion = r - break - } - } - if currentRegion != nil { - var newLeader *metapb.Peer - for _, p := range currentRegion.Peers { - if p.GetStoreId() == region.Leader.GetStoreId() { - newLeader = p - break - } - } - if newLeader != nil { - newRegion = &split.RegionInfo{ - Leader: newLeader, - Region: currentRegion, - } - } - } + case strings.Contains(errPb.Message, "raft: proposal dropped"): + // TODO: we should change 'Raft raft: proposal dropped' to a error type like 'NotLeader' + newRegion, err = getRegion() + if err != nil { + return false, nil, errors.Trace(err) } - return true, newRegion, errors.Errorf("epoch not match: %s", errPb.GetMessage()) + return true, newRegion, errors.New(errPb.GetMessage()) } - return false, nil, errors.Errorf("non retryable error: %s", resp.GetError().GetMessage()) + return false, nil, errors.Errorf("non-retryable error: %s", resp.GetError().GetMessage()) } +// return the smallest []byte that is bigger than current bytes. +// special case when key is empty, empty bytes means infinity in our context, so directly return itself. func nextKey(key []byte) []byte { if len(key) == 0 { return []byte{} } res := make([]byte, 0, len(key)+1) - pos := 0 - for i := len(key) - 1; i >= 0; i-- { - if key[i] != '\xff' { - pos = i - break - } - } - s, e := key[:pos], key[pos]+1 - res = append(append(res, s...), e) + res = append(res, key...) + res = append(res, 0) return res } diff --git a/lightning/backend/local_test.go b/lightning/backend/local_test.go new file mode 100644 index 000000000..ec33119f7 --- /dev/null +++ b/lightning/backend/local_test.go @@ -0,0 +1,33 @@ +package backend + +import ( + "bytes" + + . "github.com/pingcap/check" +) + +type localSuite struct{} + +var _ = Suite(&localSuite{}) + +func (s *localSuite) TestNextKey(c *C) { + c.Assert(nextKey([]byte{}), DeepEquals, []byte{}) + + cases := [][]byte{ + {0}, + {255}, + {1, 255}, + } + for _, b := range cases { + next := nextKey(b) + c.Assert(next, DeepEquals, append(b, 0)) + } + + // in the old logic, this should return []byte{} which is not the actually smallest eky + next := nextKey([]byte{1, 255}) + c.Assert(bytes.Compare(next, []byte{2}), Equals, -1) + + // another test case, nextkey()'s return should be smaller than key with a prefix of the origin key + next = nextKey([]byte{1, 255}) + c.Assert(bytes.Compare(next, []byte{1, 255, 0, 1, 2}), Equals, -1) +}