diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 6818579c5a2a5..ee88b98cf8ed7 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -153,12 +153,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int { } // SetFromSessionVars sets the following fields for "kv.Request" from session variables: -// "Concurrency", "IsolationLevel", "NotFillCache". +// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead". func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder { builder.Request.Concurrency = sv.DistSQLScanConcurrency builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) + builder.Request.ReplicaRead = sv.ReplicaRead return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index a2b472b5ad833..7b353d8f29c82 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -311,6 +311,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -429,6 +431,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -474,6 +477,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) { Streaming: true, NotFillCache: false, SyncLog: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -554,3 +558,32 @@ func (s *testSuite) TestRequestBuilder6(c *C) { c.Assert(actual, DeepEquals, expect) } + +func (s *testSuite) TestRequestBuilder7(c *C) { + vars := variable.NewSessionVars() + vars.ReplicaRead = kv.ReplicaReadFollower + + concurrency := 10 + + actual, err := (&RequestBuilder{}). + SetFromSessionVars(vars). + SetConcurrency(concurrency). + Build() + c.Assert(err, IsNil) + + expect := &kv.Request{ + Tp: 0, + StartTs: 0x0, + KeepOrder: false, + Desc: false, + Concurrency: concurrency, + IsolationLevel: 0, + Priority: 0, + NotFillCache: false, + SyncLog: false, + Streaming: false, + ReplicaRead: kv.ReplicaReadFollower, + } + + c.Assert(actual, DeepEquals, expect) +} diff --git a/executor/analyze.go b/executor/analyze.go index b0568abefd789..eb6f5c17a5818 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -620,7 +620,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild } var resp *tikvrpc.Response var rpcCtx *tikv.RPCContext - rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region) + // we always use the first follower when follower read is enabled + rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0) if *err != nil { return } @@ -914,6 +915,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error { if err != nil { return err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } for _, t := range e.scanTasks { iter, err := snapshot.Iter(t.StartKey, t.EndKey) if err != nil { @@ -931,10 +935,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e defer e.wg.Done() var snapshot kv.Snapshot snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) - rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) if *err != nil { return } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) + for i := workID; i < len(e.sampTasks); i += e.concurrency { task := e.sampTasks[i] if task.SampSize == 0 { diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 6509192073006..5eb773cb3880b 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -105,6 +105,16 @@ PARTITION BY RANGE ( a ) ( } } +func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + ctx := tk.Se.(sessionctx.Context) + ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower + tk.MustExec("analyze table t") +} + func (s *testSuite1) TestAnalyzeParameters(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/point_get.go b/executor/point_get.go index d480f1ac1f913..aeba761ffec1f 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } if e.idxInfo != nil { idxKey, err1 := e.encodeIndexKey() if err1 != nil && !kv.ErrNotExist.Equal(err1) { diff --git a/go.mod b/go.mod index a133839a2971a..f59025debc90d 100644 --- a/go.mod +++ b/go.mod @@ -40,7 +40,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 + github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190917040145-a90dba59f50d github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9 diff --git a/go.sum b/go.sum index 04dfdb8bc1c5b..3e211d1f19be0 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d h1:vgmgBxetJN6YXdlbpjC0sxVceFrCV2aWlXETt7hGwH0= +github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 87d993717fb27..2911b4ba0a2e5 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -49,6 +49,8 @@ const ( Pessimistic // SnapshotTS is defined to set snapshot ts. SnapshotTS + // Set replica read + ReplicaRead ) // Priority value for transaction priority. @@ -68,6 +70,23 @@ const ( RC ) +// ReplicaReadType is the type of replica to read data from +type ReplicaReadType byte + +const ( + // ReplicaReadLeader stands for 'read from leader'. + ReplicaReadLeader ReplicaReadType = 1 << iota + // ReplicaReadFollower stands for 'read from follower'. + ReplicaReadFollower + // ReplicaReadLearner stands for 'read from learner'. + ReplicaReadLearner +) + +// IsFollowerRead checks if leader is going to be used to read data. +func (r ReplicaReadType) IsFollowerRead() bool { + return r == ReplicaReadFollower +} + // Those limits is enforced to make sure the transaction can be well handled by TiKV. var ( // TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)). @@ -214,6 +233,8 @@ type Request struct { Streaming bool // MemTracker is used to trace and control memory usage in co-processor layer. MemTracker *memory.Tracker + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead ReplicaReadType } // ResultSubset represents a result subset from a single storage unit. @@ -245,6 +266,12 @@ type Snapshot interface { BatchGet(keys []Key) (map[string][]byte, error) // SetPriority snapshot set the priority SetPriority(priority int) + + // SetOption sets an option with a value, when val is nil, uses the default + // value of this option. Only ReplicaRead is supported for snapshot + SetOption(opt Option, val interface{}) + // DelOption deletes an option. + DelOption(opt Option) } // Driver is the interface that must be implemented by a KV storage. diff --git a/kv/mock.go b/kv/mock.go index 8d007e64e6893..1af5e4c8a7a3d 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -232,3 +232,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) { func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) { return s.store.IterReverse(k) } + +func (s *mockSnapshot) SetOption(opt Option, val interface{}) {} +func (s *mockSnapshot) DelOption(opt Option) {} diff --git a/session/session.go b/session/session.go index df3e3f24e1d7a..4d90bc0bc7b49 100644 --- a/session/session.go +++ b/session/session.go @@ -186,6 +186,9 @@ type session struct { statsCollector *handle.SessionStatsCollector // ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement; ddlOwnerChecker owner.DDLOwnerChecker + + // shared coprocessor client per session + client kv.Client } // DDLOwnerChecker returns s.ddlOwnerChecker. @@ -495,7 +498,7 @@ func (s *session) RollbackTxn(ctx context.Context) { } func (s *session) GetClient() kv.Client { - return s.store.GetClient() + return s.client } func (s *session) String() string { @@ -1268,6 +1271,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { if !s.sessionVars.IsAutocommit() { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } + if s.sessionVars.ReplicaRead.IsFollowerRead() { + s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } } return &s.txn, nil } @@ -1291,6 +1297,9 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetCap(s.getMembufCap()) txn.SetVars(s.sessionVars.KVVars) + if s.GetSessionVars().ReplicaRead.IsFollowerRead() { + txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ @@ -1570,6 +1579,7 @@ func createSession(store kv.Storage) (*session, error) { parser: parser.New(), sessionVars: variable.NewSessionVars(), ddlOwnerChecker: dom.DDL().OwnerManager(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, @@ -1593,6 +1603,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er store: store, parser: parser.New(), sessionVars: variable.NewSessionVars(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, diff --git a/session/session_test.go b/session/session_test.go index 6a279e80f230d..857c415070fc7 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2714,3 +2714,15 @@ func (s *testSessionSuite) TestGrantViewRelated(c *C) { tkUser.MustQuery("select current_user();").Check(testkit.Rows("u_version29@%")) tkUser.MustExec("create view v_version29_c as select * from v_version29;") } + +func (s *testSessionSuite) TestReplicaRead(c *C) { + var err error + tk := testkit.NewTestKit(c, s.store) + tk.Se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + tk.MustExec("set @@tidb_replica_read = 'follower';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower) + tk.MustExec("set @@tidb_replica_read = 'leader';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 005e930e86f5e..a709832df8a1f 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -392,6 +392,9 @@ type SessionVars struct { // ConnectionInfo indicates current connection info used by current session, only be lazy assigned by plugin. ConnectionInfo *ConnectionInfo + + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead kv.ReplicaReadType } // ConnectionInfo present connection used by audit. @@ -444,6 +447,7 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, + ReplicaRead: kv.ReplicaReadLeader, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -828,6 +832,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { // It's a global variable, but it also wants to be cached in server. case TiDBMaxDeltaSchemaCount: SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) + case TiDBReplicaRead: + if strings.EqualFold(val, "follower") { + s.ReplicaRead = kv.ReplicaReadFollower + } else if strings.EqualFold(val, "leader") || len(val) == 0 { + s.ReplicaRead = kv.ReplicaReadLeader + } } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index ef56743a9d879..4bab540ceb85a 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -711,6 +711,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)}, {ScopeSession, TiDBLowResolutionTSO, "0"}, {ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)}, + {ScopeSession, TiDBReplicaRead, "leader"}, {ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"}, } diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index a084c39414014..5a23978f98f56 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) { f = GetSysVar("tidb_low_resolution_tso") c.Assert(f.Value, Equals, "0") + + f = GetSysVar("tidb_replica_read") + c.Assert(f.Value, Equals, "leader") } func (*testSysVarSuite) TestTxnMode(c *C) { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3771b7d9145e9..38c56f52c5b69 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -142,6 +142,9 @@ const ( // TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds TiDBLowResolutionTSO = "tidb_low_resolution_tso" + + // TiDBReplicaRead is used for reading data from replicas, followers for example. + TiDBReplicaRead = "tidb_replica_read" ) // TiDB system variable names that both in session and global scope. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index c45704729b007..345600be7cda5 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -583,6 +583,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, default: return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value) } + case TiDBReplicaRead: + if strings.EqualFold(value, "follower") { + return "follower", nil + } else if strings.EqualFold(value, "leader") || len(value) == 0 { + return "leader", nil + } + return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) case TiDBEnableStmtSummary: switch { case strings.EqualFold(value, "ON") || value == "1": diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index ec97078c4d849..e15d95b7c2ffc 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testleak" ) @@ -294,6 +295,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "0") c.Assert(v.CorrelationThreshold, Equals, float64(0)) + + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "follower") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower) + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "leader") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader) } func (s *testVarsutilSuite) TestValidate(c *C) { @@ -348,6 +360,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBTxnMode, "pessimistic", false}, {TiDBTxnMode, "optimistic", false}, {TiDBTxnMode, "", false}, + {TiDBReplicaRead, "invalid", true}, } for _, t := range tests { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go old mode 100644 new mode 100755 index 712de3f1b7b30..92f42df178304 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -44,7 +44,8 @@ var tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogr // CopClient is coprocessor client. type CopClient struct { - store *tikvStore + store *tikvStore + replicaReadSeed uint32 } // IsRequestTypeSupported checks whether reqType is supported. @@ -93,12 +94,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable return copErrorResponse{err} } it := &copIterator{ - store: c.store, - req: req, - concurrency: req.Concurrency, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, + store: c.store, + req: req, + concurrency: req.Concurrency, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + replicaReadSeed: c.replicaReadSeed, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -386,6 +388,8 @@ type copIterator struct { vars *kv.Variables memTracker *memory.Tracker + + replicaReadSeed uint32 } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -399,6 +403,8 @@ type copIteratorWorker struct { vars *kv.Variables memTracker *memory.Tracker + + replicaReadSeed uint32 } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -498,6 +504,8 @@ func (it *copIterator) open(ctx context.Context) { vars: it.vars, memTracker: it.memTracker, + + replicaReadSeed: it.replicaReadSeed, } go worker.run(ctx) } @@ -666,6 +674,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) sender := NewRegionRequestSender(worker.store.regionCache, worker.store.client) + req := &tikvrpc.Request{ Type: task.cmdType, Cop: &coprocessor.Request{ @@ -679,7 +688,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch NotFillCache: worker.req.NotFillCache, HandleTime: true, ScanDetail: true, + ReplicaRead: worker.req.ReplicaRead.IsFollowerRead(), }, + ReplicaReadSeed: worker.replicaReadSeed, } startTime := time.Now() resp, rpcCtx, err := sender.SendReqCtx(bo, req, task.region, ReadTimeoutMedium) diff --git a/store/tikv/kv.go b/store/tikv/kv.go old mode 100644 new mode 100755 index 245af37af1c5b..b157f4b977f6d --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -21,6 +21,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/coreos/etcd/clientv3" @@ -145,6 +146,8 @@ type tikvStore struct { spTime time.Time spMutex sync.RWMutex // this is used to update safePoint and spTime closed chan struct{} // this is used to nofity when the store is closed + + replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled } func (s *tikvStore) UpdateSPCache(cachedSP uint64, cachedTime time.Time) { @@ -180,16 +183,17 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie return nil, errors.Trace(err) } store := &tikvStore{ - clusterID: pdClient.GetClusterID(context.TODO()), - uuid: uuid, - oracle: o, - client: client, - pdClient: pdClient, - regionCache: NewRegionCache(pdClient), - kv: spkv, - safePoint: 0, - spTime: time.Now(), - closed: make(chan struct{}), + clusterID: pdClient.GetClusterID(context.TODO()), + uuid: uuid, + oracle: o, + client: client, + pdClient: pdClient, + regionCache: NewRegionCache(pdClient), + kv: spkv, + safePoint: 0, + spTime: time.Now(), + closed: make(chan struct{}), + replicaReadSeed: rand.Uint32(), } store.lockResolver = newLockResolver(store) store.enableGC = enableGC @@ -263,7 +267,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { // BeginWithStartTS begins a transaction with startTS. func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { - txn, err := newTikvTxnWithStartTS(s, startTS) + txn, err := newTikvTxnWithStartTS(s, startTS, s.nextReplicaReadSeed()) if err != nil { return nil, errors.Trace(err) } @@ -272,7 +276,7 @@ func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { } func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { - snapshot := newTiKVSnapshot(s, ver) + snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed()) metrics.TiKVSnapshotCounter.Inc() return snapshot, nil } @@ -336,9 +340,14 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { } } +func (s *tikvStore) nextReplicaReadSeed() uint32 { + return atomic.AddUint32(&s.replicaReadSeed, 1) +} + func (s *tikvStore) GetClient() kv.Client { return &CopClient{ - store: s, + store: s, + replicaReadSeed: s.nextReplicaReadSeed(), } } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 3b9ed28f3fded..7e805611dc4a3 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -158,7 +158,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) { ver, err := s.store.CurrentVersion() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(s.store, ver) + snapshot := newTiKVSnapshot(s.store, ver, 0) m, err := snapshot.BatchGet(keys) c.Assert(err, IsNil) c.Assert(len(m), Equals, int('z'-'a'+1)) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 237cc80e0d3b5..7b94edd3d8e87 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/client" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -84,6 +85,26 @@ func (r *RegionStore) clone() *RegionStore { } } +// return next follower store's index +func (r *RegionStore) follower(seed uint32) int32 { + l := uint32(len(r.stores)) + if l <= 1 { + return r.workStoreIdx + } + + for retry := l - 1; retry > 0; retry-- { + followerIdx := int32(seed % (l - 1)) + if followerIdx >= r.workStoreIdx { + followerIdx++ + } + if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) { + return followerIdx + } + seed++ + } + return r.workStoreIdx +} + // init initializes region after constructed. func (r *Region) init(c *RegionCache) { // region store pull used store from global store map @@ -255,7 +276,7 @@ func (c *RPCContext) String() string { // GetRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. -func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { +func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32) (*RPCContext, error) { ts := time.Now().Unix() cachedRegion := c.getCachedRegionWithRLock(id) @@ -268,7 +289,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } regionStore := cachedRegion.getStore() - store, peer, storeIdx := cachedRegion.WorkStorePeer(regionStore) + var store *Store + var peer *metapb.Peer + var storeIdx int + switch replicaRead { + case kv.ReplicaReadFollower: + store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed) + default: + store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore) + } addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { return nil, err @@ -280,7 +309,7 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } storeFailEpoch := atomic.LoadUint32(&store.fail) - if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] { + if storeFailEpoch != regionStore.storeFails[storeIdx] { cachedRegion.invalidate() logutil.Logger(context.Background()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), @@ -828,12 +857,21 @@ func (r *Region) GetLeaderStoreID() uint64 { return r.meta.Peers[int(r.getStore().workStoreIdx)].StoreId } +func (r *Region) getStorePeer(rs *RegionStore, pidx int32) (store *Store, peer *metapb.Peer, idx int) { + store = rs.stores[pidx] + peer = r.meta.Peers[pidx] + idx = int(pidx) + return +} + // WorkStorePeer returns current work store with work peer. func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, idx int) { - idx = int(rs.workStoreIdx) - store = rs.stores[rs.workStoreIdx] - peer = r.meta.Peers[rs.workStoreIdx] - return + return r.getStorePeer(rs, rs.workStoreIdx) +} + +// FollowerStorePeer returns a follower store with follower peer. +func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, idx int) { + return r.getStorePeer(rs, rs.follower(followerStoreSeed)) } // RegionVerID is a unique ID that can identify a Region at a specific version. @@ -877,19 +915,20 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { rs := r.getStore() - if int(rs.workStoreIdx) != currentPeerIdx { - return - } if err != nil { // TODO: refine err, only do this for some errors. - s := rs.stores[rs.workStoreIdx] - epoch := rs.storeFails[rs.workStoreIdx] + s := rs.stores[currentPeerIdx] + epoch := rs.storeFails[currentPeerIdx] if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr)) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() } } + if int(rs.workStoreIdx) != currentPeerIdx { + return + } + nextIdx := (currentPeerIdx + 1) % len(rs.stores) newRegionStore := rs.clone() newRegionStore.workStoreIdx = int32(nextIdx) diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index c1261dadb3d17..0d0503aca38b6 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -17,22 +17,24 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" "github.com/google/btree" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" ) type testRegionCacheSuite struct { OneByOneSuite cluster *mocktikv.Cluster - store1 uint64 - store2 uint64 - peer1 uint64 - peer2 uint64 + store1 uint64 // store1 is leader + store2 uint64 // store2 is follower + peer1 uint64 // peer1 is leader + peer2 uint64 // peer2 is follower region1 uint64 cache *RegionCache bo *Backoffer @@ -105,10 +107,10 @@ func (s *testRegionCacheSuite) getRegionWithEndKey(c *C, key []byte) *Region { return r } -func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { +func (s *testRegionCacheSuite) getAddr(c *C, key []byte, replicaRead kv.ReplicaReadType, seed uint32) string { loc, err := s.cache.LocateKey(s.bo, key) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, replicaRead, seed) c.Assert(err, IsNil) if ctx == nil { return "" @@ -117,10 +119,12 @@ func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { } func (s *testRegionCacheSuite) TestSimple(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) c.Assert(r.GetMeta(), DeepEquals, r.meta) c.Assert(r.GetLeaderID(), Equals, r.meta.Peers[r.getStore().workStoreIdx].Id) @@ -134,7 +138,10 @@ func (s *testRegionCacheSuite) TestDropStore(c *C) { s.cluster.RemoveStore(s.store1) loc, err := s.cache.LocateKey(bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(bo, loc.Region) + ctx, err := s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx, IsNil) + ctx, err = s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) c.Assert(err, IsNil) c.Assert(ctx, IsNil) s.checkCache(c, 0) @@ -155,6 +162,7 @@ func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // tikv-server reports `NotLeader` @@ -163,15 +171,18 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) r = s.getRegionWithEndKey(c, []byte("z")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("z")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("z"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) } func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // new store3 becomes leader @@ -186,7 +197,20 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + follower := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower, Equals, s.storeAddr(store3)) + } + follower2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower2, Equals, s.storeAddr(store3)) + } + c.Assert(follower, Not(Equals), follower2) // tikv-server notifies new leader to pd-server. s.cluster.ChangeLeader(s.region1, peer3) @@ -195,10 +219,24 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r = s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + follower = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } + follower2 = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } + c.Assert(follower, Not(Equals), follower2) } func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // store2 becomes leader @@ -219,11 +257,16 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - addr := s.getAddr(c, []byte("a")) + addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") s.getRegion(c, []byte("a")) // pd-server should return the new leader. - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + addr = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + c.Assert(addr == s.storeAddr(s.store1) || len(addr) == 0, IsTrue) + addr2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + c.Assert(addr2 == s.storeAddr(s.store1) || len(addr2) == 0, IsTrue) + c.Assert((len(addr2) == 0 && len(addr) == 0) || addr != addr2, IsTrue) } func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { @@ -236,22 +279,74 @@ func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 1 it will return NotLeader, leader back to 2 again s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { @@ -264,30 +359,100 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == peer3, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 2, it's in hibernate and return 0 leader, so switch to 3 s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // again peer back to 1 - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { @@ -305,12 +470,12 @@ func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { c.Assert(loc2.Region.id, Equals, region2) // Send fail on region1 - ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) + ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) s.checkCache(c, 2) s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error")) // Get region2 cache will get nil then reload. - ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region) + ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region, kv.ReplicaReadLeader, 0) c.Assert(ctx2, IsNil) c.Assert(err, IsNil) } @@ -325,34 +490,106 @@ func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // send 2 fail leader switch to 3 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == s.peer2, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // 3 can be access, so switch to 1 s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSplit(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) // split to ['' - 'm' - 'z'] region2 := s.cluster.AllocID() @@ -365,7 +602,8 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { r = s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, region2) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) r = s.getRegionWithEndKey(c, []byte("m")) @@ -397,6 +635,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { } func (s *testRegionCacheSuite) TestReconnect(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) @@ -406,7 +645,8 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) } @@ -530,6 +770,37 @@ func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) { c.Assert(regionIDs, DeepEquals, []uint64{s.region1, region2}) } +func (s *testRegionCacheSuite) TestFollowerReadFallback(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // verify follower to be store2 and store3 + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 1) + c.Assert(err, IsNil) + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + + // send fail on store2, next follower read is going to fallback to store3 + s.cache.OnSendFail(s.bo, ctxFollower1, false, errors.New("test error")) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, peer3) +} + func createSampleRegion(startKey, endKey []byte) *Region { return &Region{ meta: &metapb.Region{ diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go old mode 100644 new mode 100755 index c1a82fb344a44..72b5dfaf5cf67 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -96,8 +96,14 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re } }) + var replicaRead kv.ReplicaReadType + if req.ReplicaRead { + replicaRead = kv.ReplicaReadFollower + } else { + replicaRead = kv.ReplicaReadLeader + } for { - ctx, err := s.regionCache.GetRPCContext(bo, regionID) + ctx, err := s.regionCache.GetRPCContext(bo, regionID, replicaRead, req.ReplicaReadSeed) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 896253c3217e0..c881367ecce8d 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -144,6 +144,11 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) { c.Assert(err, IsNil) c.Assert(resp.RawPut, NotNil) c.Assert(ctx, NotNil) + req.ReplicaRead = true + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second) + c.Assert(err, IsNil) + c.Assert(resp.RawPut, NotNil) + c.Assert(ctx, NotNil) } func (s *testRegionRequestSuite) TestOnSendFailedWithCancelled(c *C) { @@ -253,6 +258,9 @@ func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.Pessimi func (s *mockTikvGrpcServer) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { + return nil, errors.New("unreachable") +} func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { return nil, errors.New("unreachable") } diff --git a/store/tikv/safepoint_test.go b/store/tikv/safepoint_test.go index fecacd642ad47..0f26aa58f6caf 100644 --- a/store/tikv/safepoint_test.go +++ b/store/tikv/safepoint_test.go @@ -113,7 +113,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) { s.waitUntilErrorPlugIn(txn4.startTS) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}, 0) _, batchgeterr := snapshot.BatchGet(keys) c.Assert(batchgeterr, NotNil) isFallBehind = terror.ErrorEqual(errors.Cause(geterr2), ErrGCTooEarly) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 4ce5979270c02..0ef220941d77e 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -197,7 +197,9 @@ func (s *Scanner) getData(bo *Backoffer) error { Context: pb.Context{ Priority: s.snapshot.priority, NotFillCache: s.snapshot.notFillCache, + ReplicaRead: s.snapshot.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.snapshot.replicaReadSeed, } if s.reverse { req.Scan.StartKey = s.nextEndKey diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 204bcc95783d9..05cd2f89d3898 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -41,7 +41,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { @@ -74,7 +74,7 @@ func (s *testScanMockSuite) TestReverseScan(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) c.Assert(err, IsNil) for ch := byte('y'); ch >= byte('a'); ch-- { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go old mode 100644 new mode 100755 index 08df06751ac66..c4d5814855568 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -49,22 +49,25 @@ var ( // tikvSnapshot implements the kv.Snapshot interface. type tikvSnapshot struct { - store *tikvStore - version kv.Version - priority pb.CommandPri - notFillCache bool - syncLog bool - keyOnly bool - vars *kv.Variables + store *tikvStore + version kv.Version + priority pb.CommandPri + notFillCache bool + syncLog bool + keyOnly bool + vars *kv.Variables + replicaRead kv.ReplicaReadType + replicaReadSeed uint32 } // newTiKVSnapshot creates a snapshot of an TiKV store. -func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { +func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *tikvSnapshot { return &tikvSnapshot{ - store: store, - version: ver, - priority: pb.CommandPri_Normal, - vars: kv.DefaultVars, + store: store, + version: ver, + priority: pb.CommandPri_Normal, + vars: kv.DefaultVars, + replicaReadSeed: replicaReadSeed, } } @@ -163,7 +166,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Context: pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, + ReplicaRead: s.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.replicaReadSeed, } resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium) if err != nil { @@ -245,7 +250,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { Context: pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, + ReplicaRead: s.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.replicaReadSeed, } for { loc, err := s.store.regionCache.LocateKey(bo, k) @@ -305,6 +312,23 @@ func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { return scanner, errors.Trace(err) } +// SetOption sets an option with a value, when val is nil, uses the default +// value of this option. Only ReplicaRead is supported for snapshot +func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = val.(kv.ReplicaReadType) + } +} + +// ClearFollowerRead disables follower read on current transaction +func (s *tikvSnapshot) DelOption(opt kv.Option) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = kv.ReplicaReadLeader + } +} + func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 4b6e1af441e9f..529271074d1e7 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -67,7 +67,7 @@ func (s *testSnapshotSuite) beginTxn(c *C) *tikvTxn { func (s *testSnapshotSuite) checkAll(keys []kv.Key, c *C) { txn := s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) m, err := snapshot.BatchGet(keys) c.Assert(err, IsNil) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 3a40c844b14e4..443c797f5ee3f 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -58,7 +58,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { c.Assert(err, IsNil) txn := s.begin(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) keys := [][]byte{{'a'}, {'b'}, {'c'}} _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index b3cf78ee5e82e..a19caf5187671 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -167,6 +167,8 @@ type Request struct { DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest Empty *tikvpb.BatchCommandsEmptyRequest + + ReplicaReadSeed uint32 } // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index a39670fc2f440..7ec5fd8874a73 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -76,13 +76,13 @@ func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { if err != nil { return nil, errors.Trace(err) } - return newTikvTxnWithStartTS(store, startTS) + return newTikvTxnWithStartTS(store, startTS, store.nextReplicaReadSeed()) } // newTikvTxnWithStartTS creates a txn with startTS. -func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) { +func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) { ver := kv.NewVersion(startTS) - snapshot := newTiKVSnapshot(store, ver) + snapshot := newTiKVSnapshot(store, ver, replicaReadSeed) return &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), diff --git a/util/admin/admin.go b/util/admin/admin.go index a580edbb61bae..a8438d1bc8744 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -555,6 +555,10 @@ func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver k return nil, 0, errors.Trace(err) } + if sessCtx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snap.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + records, nextHandle, err := ScanTableRecord(sessCtx, snap, t, startHandle, limit) return records, nextHandle, errors.Trace(err)