Skip to content

Commit

Permalink
Merge branch 'master' into bug-fix/fix_enum_batch_point_get
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed May 13, 2021
2 parents 208eb58 + 7c8ddd8 commit 7c74030
Show file tree
Hide file tree
Showing 36 changed files with 782 additions and 647 deletions.
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func (s *session) doCommit(ctx context.Context) error {
s.txn.SetOption(tikvstore.EnableAsyncCommit, s.GetSessionVars().EnableAsyncCommit)
s.txn.SetOption(tikvstore.Enable1PC, s.GetSessionVars().Enable1PC)
// priority of the sysvar is lower than `start transaction with causal consistency only`
if s.txn.GetOption(tikvstore.GuaranteeLinearizability) == nil {
if val := s.txn.GetOption(tikvstore.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
// because the property is naturally holds:
// We guarantee the commitTS of any transaction must not exceed the next timestamp from the TSO.
Expand Down
4 changes: 2 additions & 2 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (c *CopClient) sendBatch(ctx context.Context, req *kv.Request, vars *tikv.V
if req.KeepOrder || req.Desc {
return copErrorResponse{errors.New("batch coprocessor cannot prove keep order or desc property")}
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildBatchCopTasks(bo, c.store.GetRegionCache(), ranges, req.StoreType)
Expand Down Expand Up @@ -381,7 +381,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down
11 changes: 7 additions & 4 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
logutil.BgLogger().Debug("send batch requests")
return c.sendBatch(ctx, req, vars)
}
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTs)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
ranges := toTiKVKeyRanges(req.KeyRanges)
tasks, err := buildCopTasks(bo, c.store.GetRegionCache(), ranges, req)
Expand Down Expand Up @@ -829,11 +829,14 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *backoffer, rpcCtx *ti
return nil, nil
}

boRPCType := tikv.BoTiKVRPC
err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task)
if task.storeType == kv.TiFlash {
boRPCType = tikv.BoTiFlashRPC
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.b.BackoffTiKVRPC(err1)
}
if err1 := bo.Backoff(boRPCType, errors.Errorf("recv stream response error: %v, task: %s", err, task)); err1 != nil {

if err1 != nil {
return nil, errors.Trace(err)
}

Expand Down
4 changes: 2 additions & 2 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *MPPClient) selectAllTiFlashStore() []kv.MPPTaskMeta {

// ConstructMPPTasks receives ScheduleRequest, which are actually collects of kv ranges. We allocates MPPTaskMeta for them and returns.
func (c *MPPClient) ConstructMPPTasks(ctx context.Context, req *kv.MPPBuildTasksRequest) ([]kv.MPPTaskMeta, error) {
ctx = context.WithValue(ctx, tikv.TxnStartKey, req.StartTS)
ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTS)
bo := newBackofferWithVars(ctx, copBuildTaskMaxBackoff, nil)
if req.KeyRanges == nil {
return c.selectAllTiFlashStore(), nil
Expand Down Expand Up @@ -343,7 +343,7 @@ func (m *mppIterator) establishMPPConns(bo *backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.b.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down
4 changes: 3 additions & 1 deletion store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ var (
ldflagGetEtcdAddrsFromConfig = "0" // 1:Yes, otherwise:No
)

const getAllMembersBackoff = 5000

// EtcdAddrs returns etcd server addresses.
func (s *tikvStore) EtcdAddrs() ([]string, error) {
if s.etcdAddrs == nil {
Expand All @@ -220,7 +222,7 @@ func (s *tikvStore) EtcdAddrs() ([]string, error) {
}

ctx := context.Background()
bo := tikv.NewBackoffer(ctx, tikv.GetAllMembersBackoff)
bo := tikv.NewBackoffer(ctx, getAllMembersBackoff)
etcdAddrs := make([]string, 0)
pdClient := s.GetPDClient()
if pdClient == nil {
Expand Down
11 changes: 9 additions & 2 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,19 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
s.KVSnapshot.SetSampleStep(val.(uint32))
case tikvstore.TaskID:
s.KVSnapshot.SetTaskID(val.(uint64))
case tikvstore.CollectRuntimeStats:
s.KVSnapshot.SetRuntimeStats(val.(*tikv.SnapshotRuntimeStats))
case tikvstore.IsStalenessReadOnly:
s.KVSnapshot.SetIsStatenessReadOnly(val.(bool))
case tikvstore.MatchStoreLabels:
s.KVSnapshot.SetMatchStoreLabels(val.([]*metapb.StoreLabel))
default:
s.KVSnapshot.SetOption(opt, val)
}
}

func (s *tikvSnapshot) DelOption(opt int) {
switch opt {
case tikvstore.CollectRuntimeStats:
s.KVSnapshot.SetRuntimeStats(nil)
}
}

Expand Down
15 changes: 15 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.KVTxn.GetSnapshot().SetTaskID(val.(uint64))
case tikvstore.InfoSchema:
txn.SetSchemaVer(val.(tikv.SchemaVer))
case tikvstore.CollectRuntimeStats:
txn.KVTxn.GetSnapshot().SetRuntimeStats(val.(*tikv.SnapshotRuntimeStats))
case tikvstore.SchemaAmender:
txn.SetSchemaAmender(val.(tikv.SchemaAmender))
case tikvstore.SampleStep:
Expand All @@ -162,6 +164,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.SetEnableAsyncCommit(val.(bool))
case tikvstore.Enable1PC:
txn.SetEnable1PC(val.(bool))
case tikvstore.GuaranteeLinearizability:
txn.SetCausalConsistency(!val.(bool))
case tikvstore.TxnScope:
txn.SetScope(val.(string))
case tikvstore.IsStalenessReadOnly:
Expand All @@ -175,13 +179,24 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {

func (txn *tikvTxn) GetOption(opt int) interface{} {
switch opt {
case tikvstore.GuaranteeLinearizability:
return !txn.KVTxn.IsCasualConsistency()
case tikvstore.TxnScope:
return txn.KVTxn.GetScope()
default:
return txn.KVTxn.GetOption(opt)
}
}

func (txn *tikvTxn) DelOption(opt int) {
switch opt {
case tikvstore.CollectRuntimeStats:
txn.KVTxn.GetSnapshot().SetRuntimeStats(nil)
default:
txn.KVTxn.DelOption(opt)
}
}

// SetVars sets variables to the transaction.
func (txn *tikvTxn) SetVars(vars interface{}) {
if vs, ok := vars.(*tikv.Variables); ok {
Expand Down
14 changes: 8 additions & 6 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1050,7 +1050,7 @@ func (w *GCWorker) resolveLocksForRange(ctx context.Context, safePoint uint64, s

var stat tikv.RangeTaskStat
key := startKey
bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
// cooperate with github.com/pingcap/tidb/store/tikv/invalidCacheAndRetry
Expand Down Expand Up @@ -1147,7 +1147,7 @@ retryScanAndResolve:
if len(key) == 0 || (len(endKey) != 0 && bytes.Compare(key, endKey) >= 0) {
break
}
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
failpoint.Inject("setGcResolveMaxBackoff", func(v failpoint.Value) {
sleep := v.(int)
bo = tikv.NewBackofferWithVars(ctx, sleep, nil)
Expand Down Expand Up @@ -1460,7 +1460,7 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
failpoint.Return(errors.New("injectedError"))
})

bo := tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo := tikv.NewGcResolveLockMaxBackoffer(ctx)

for {
if len(locks) == 0 {
Expand Down Expand Up @@ -1496,18 +1496,20 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv.
}

// Recreate backoffer for next region
bo = tikv.NewBackofferWithVars(ctx, tikv.GcResolveLockMaxBackoff, nil)
bo = tikv.NewGcResolveLockMaxBackoffer(ctx)
locks = locks[len(locksInRegion):]
}

return nil
}

const gcOneRegionMaxBackoff = 20000

func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) error {
var newSafePoint uint64
var err error

bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
for {
newSafePoint, err = w.pdClient.UpdateGCSafePoint(ctx, safePoint)
if err != nil {
Expand Down Expand Up @@ -1544,7 +1546,7 @@ func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []b
}()
key := startKey
for {
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := w.tikvStore.GetRegionCache().LocateKey(bo, key)
if err != nil {
return stat, errors.Trace(err)
Expand Down
5 changes: 3 additions & 2 deletions store/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/store/tikv/mockstore/mocktikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/retry"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
pd "github.com/tikv/pd/client"
)
Expand Down Expand Up @@ -412,7 +413,7 @@ func (s *testGCWorkerSuite) TestStatusVars(c *C) {

func (s *testGCWorkerSuite) TestDoGCForOneRegion(c *C) {
ctx := context.Background()
bo := tikv.NewBackofferWithVars(ctx, tikv.GcOneRegionMaxBackoff, nil)
bo := tikv.NewBackofferWithVars(ctx, gcOneRegionMaxBackoff, nil)
loc, err := s.tikvStore.GetRegionCache().LocateKey(bo, []byte(""))
c.Assert(err, IsNil)
var regionErr *errorpb.Error
Expand Down Expand Up @@ -943,7 +944,7 @@ func (s *testGCWorkerSuite) TestResolveLockRangeMeetRegionEnlargeCausedByRegionM
mCluster.Merge(s.initRegion.regionID, region2)
regionMeta, _ := mCluster.GetRegion(s.initRegion.regionID)
err := s.tikvStore.GetRegionCache().OnRegionEpochNotMatch(
tikv.NewNoopBackoff(context.Background()),
retry.NewNoopBackoff(context.Background()),
&tikv.RPCContext{Region: regionID, Store: &tikv.Store{}},
[]*metapb.Region{regionMeta})
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 7c74030

Please sign in to comment.