diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 10c91e93598d1..f1ed9f77ad637 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -148,12 +148,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int { } // SetFromSessionVars sets the following fields for "kv.Request" from session variables: -// "Concurrency", "IsolationLevel", "NotFillCache". +// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead". func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder { builder.Request.Concurrency = sv.DistSQLScanConcurrency builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) + builder.Request.ReplicaRead = sv.ReplicaRead return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index d94d200e50c15..c73f775096193 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -314,6 +314,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -388,6 +389,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -432,6 +434,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -477,6 +480,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) { Streaming: true, NotFillCache: false, SyncLog: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -558,6 +562,35 @@ func (s *testSuite) TestRequestBuilder6(c *C) { c.Assert(actual, DeepEquals, expect) } +func (s *testSuite) TestRequestBuilder7(c *C) { + vars := variable.NewSessionVars() + vars.ReplicaRead = kv.ReplicaReadFollower + + concurrency := 10 + + actual, err := (&RequestBuilder{}). + SetFromSessionVars(vars). + SetConcurrency(concurrency). + Build() + c.Assert(err, IsNil) + + expect := &kv.Request{ + Tp: 0, + StartTs: 0x0, + KeepOrder: false, + Desc: false, + Concurrency: concurrency, + IsolationLevel: 0, + Priority: 0, + NotFillCache: false, + SyncLog: false, + Streaming: false, + ReplicaRead: kv.ReplicaReadFollower, + } + + c.Assert(actual, DeepEquals, expect) +} + func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) { ranges := []*ranger.Range{ { diff --git a/executor/analyze.go b/executor/analyze.go index 144be79ec8525..a155684377cf8 100755 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -623,7 +623,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild }) var resp *tikvrpc.Response var rpcCtx *tikv.RPCContext - rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region) + // we always use the first follower when follower read is enabled + rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0) if *err != nil { return } @@ -924,6 +925,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err if err != nil { return 0, err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } for _, t := range e.scanTasks { iter, err := snapshot.Iter(t.StartKey, t.EndKey) if err != nil { @@ -942,10 +946,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e defer e.wg.Done() var snapshot kv.Snapshot snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) - rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) if *err != nil { return } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) + for i := workID; i < len(e.sampTasks); i += e.concurrency { task := e.sampTasks[i] if task.SampSize == 0 { diff --git a/executor/analyze_test.go b/executor/analyze_test.go index ca1b6356fca78..70f5617849f3f 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -106,6 +106,16 @@ PARTITION BY RANGE ( a ) ( } } +func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + ctx := tk.Se.(sessionctx.Context) + ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower + tk.MustExec("analyze table t") +} + func (s *testSuite1) TestAnalyzeRestrict(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/point_get.go b/executor/point_get.go index 665742635a5e3..383e24d0164e2 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } if e.idxInfo != nil { idxKey, err1 := e.encodeIndexKey() if err1 != nil && !kv.ErrNotExist.Equal(err1) { diff --git a/go.mod b/go.mod index 77d15922bc4f9..70d7d5ffb1157 100644 --- a/go.mod +++ b/go.mod @@ -39,7 +39,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 + github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 github.com/pingcap/parser v0.0.0-20190815020744-315a7ae5e232 github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b diff --git a/go.sum b/go.sum index 202ca1479c97a..1263b8eb28275 100644 --- a/go.sum +++ b/go.sum @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI= -github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE= +github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ= github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= diff --git a/kv/kv.go b/kv/kv.go index 895d8ebed1338..24977006f9208 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -50,6 +50,8 @@ const ( Pessimistic // SnapshotTS is defined to set snapshot ts. SnapshotTS + // Set replica read + ReplicaRead ) // Priority value for transaction priority. @@ -69,6 +71,23 @@ const ( RC ) +// ReplicaReadType is the type of replica to read data from +type ReplicaReadType byte + +const ( + // ReplicaReadLeader stands for 'read from leader'. + ReplicaReadLeader ReplicaReadType = 1 << iota + // ReplicaReadFollower stands for 'read from follower'. + ReplicaReadFollower + // ReplicaReadLearner stands for 'read from learner'. + ReplicaReadLearner +) + +// IsFollowerRead checks if leader is going to be used to read data. +func (r ReplicaReadType) IsFollowerRead() bool { + return r == ReplicaReadFollower +} + // Those limits is enforced to make sure the transaction can be well handled by TiKV. var ( // TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)). @@ -221,6 +240,8 @@ type Request struct { // Streaming indicates using streaming API for this request, result in that one Next() // call would not corresponds to a whole region result. Streaming bool + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead ReplicaReadType } // ResultSubset represents a result subset from a single storage unit. @@ -252,6 +273,12 @@ type Snapshot interface { BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error) // SetPriority snapshot set the priority SetPriority(priority int) + + // SetOption sets an option with a value, when val is nil, uses the default + // value of this option. Only ReplicaRead is supported for snapshot + SetOption(opt Option, val interface{}) + // DelOption deletes an option. + DelOption(opt Option) } // Driver is the interface that must be implemented by a KV storage. diff --git a/kv/mock.go b/kv/mock.go index ca50a069ec813..70f5ce831d0d7 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -229,3 +229,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) { func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) { return s.store.IterReverse(k) } + +func (s *mockSnapshot) SetOption(opt Option, val interface{}) {} +func (s *mockSnapshot) DelOption(opt Option) {} diff --git a/session/session.go b/session/session.go index f56e760599eef..c49b55956dfdd 100644 --- a/session/session.go +++ b/session/session.go @@ -183,6 +183,9 @@ type session struct { ddlOwnerChecker owner.DDLOwnerChecker // lockedTables use to record the table locks hold by the session. lockedTables map[int64]model.TableLockTpInfo + + // shared coprocessor client per session + client kv.Client } // AddTableLock adds table lock to the session lock map. @@ -546,7 +549,7 @@ func (s *session) RollbackTxn(ctx context.Context) { } func (s *session) GetClient() kv.Client { - return s.store.GetClient() + return s.client } func (s *session) String() string { @@ -1255,6 +1258,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable() + if s.sessionVars.ReplicaRead.IsFollowerRead() { + s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } } return &s.txn, nil } @@ -1315,6 +1321,9 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetCap(s.getMembufCap()) txn.SetVars(s.sessionVars.KVVars) + if s.GetSessionVars().ReplicaRead.IsFollowerRead() { + txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ @@ -1607,6 +1616,7 @@ func createSession(store kv.Storage) (*session, error) { parser: parser.New(), sessionVars: variable.NewSessionVars(), ddlOwnerChecker: dom.DDL().OwnerManager(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, @@ -1631,6 +1641,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er store: store, parser: parser.New(), sessionVars: variable.NewSessionVars(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, diff --git a/session/session_test.go b/session/session_test.go index 5973742d411c6..3ca8974b56afe 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2796,3 +2796,15 @@ func (s *testSessionSuite) TestLoadClientInteractive(c *C) { tk.Se.GetSessionVars().ClientCapability = tk.Se.GetSessionVars().ClientCapability | mysql.ClientInteractive tk.MustQuery("select @@wait_timeout").Check(testkit.Rows("28800")) } + +func (s *testSessionSuite) TestReplicaRead(c *C) { + var err error + tk := testkit.NewTestKit(c, s.store) + tk.Se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + tk.MustExec("set @@tidb_replica_read = 'follower';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower) + tk.MustExec("set @@tidb_replica_read = 'leader';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cbc015eecc4fa..dede1b68640fe 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -399,6 +399,9 @@ type SessionVars struct { // use noop funcs or not EnableNoopFuncs bool + + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead kv.ReplicaReadType } // ConnectionInfo present connection used by audit. @@ -453,6 +456,7 @@ func NewSessionVars() *SessionVars { WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, EnableIndexMerge: false, EnableNoopFuncs: DefTiDBEnableNoopFuncs, + ReplicaRead: kv.ReplicaReadLeader, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -831,6 +835,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { s.EnableIndexMerge = TiDBOptOn(val) case TiDBEnableNoopFuncs: s.EnableNoopFuncs = TiDBOptOn(val) + case TiDBReplicaRead: + if strings.EqualFold(val, "follower") { + s.ReplicaRead = kv.ReplicaReadFollower + } else if strings.EqualFold(val, "leader") || len(val) == 0 { + s.ReplicaRead = kv.ReplicaReadLeader + } } s.systems[name] = val return nil diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index f42c04bf3c99e..9d8ade09e4748 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -704,6 +704,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBLowResolutionTSO, "0"}, {ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)}, {ScopeGlobal | ScopeSession, TiDBEnableNoopFuncs, BoolToIntStr(DefTiDBEnableNoopFuncs)}, + {ScopeSession, TiDBReplicaRead, "leader"}, } // SynonymsSysVariables is synonyms of system variables. diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index f4b2e50db44d2..702db0ac7fce1 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) { f = GetSysVar("tidb_low_resolution_tso") c.Assert(f.Value, Equals, "0") + + f = GetSysVar("tidb_replica_read") + c.Assert(f.Value, Equals, "leader") } func (*testSysVarSuite) TestTxnMode(c *C) { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ade964cb85966..725b229c0089c 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -141,6 +141,9 @@ const ( // TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds TiDBLowResolutionTSO = "tidb_low_resolution_tso" + + // TiDBReplicaRead is used for reading data from replicas, followers for example. + TiDBReplicaRead = "tidb_replica_read" ) // TiDB system variable names that both in session and global scope. diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index 47a60f1a7b7ed..ae244d93face2 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -560,6 +560,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, if v <= 0 { return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v) } + case TiDBReplicaRead: + if strings.EqualFold(value, "follower") { + return "follower", nil + } else if strings.EqualFold(value, "leader") || len(value) == 0 { + return "leader", nil + } + return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) } return value, nil } diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index 4ee860cc7f2f8..d36d9143380ab 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testleak" ) @@ -294,6 +295,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "0") c.Assert(v.CorrelationThreshold, Equals, float64(0)) + + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "follower") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower) + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "leader") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader) } func (s *testVarsutilSuite) TestSetOverflowBehave(c *C) { @@ -362,6 +374,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBMaxChunkSize, "-1", true}, {TiDBOptJoinReorderThreshold, "a", true}, {TiDBOptJoinReorderThreshold, "-1", true}, + {TiDBReplicaRead, "invalid", true}, } for _, t := range tests { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go index e49815f47f67d..1dea973858225 100644 --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -44,7 +44,8 @@ var tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogr // CopClient is coprocessor client. type CopClient struct { kv.RequestTypeSupportedChecker - store *tikvStore + store *tikvStore + replicaReadSeed uint32 } // Send builds the request and gets the coprocessor iterator response. @@ -56,12 +57,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable return copErrorResponse{err} } it := &copIterator{ - store: c.store, - req: req, - concurrency: req.Concurrency, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, + store: c.store, + req: req, + concurrency: req.Concurrency, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + replicaReadSeed: c.replicaReadSeed, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -345,6 +347,8 @@ type copIterator struct { memTracker *memory.Tracker + replicaReadSeed uint32 + wg sync.WaitGroup // closed represents when the Close is called. // There are two cases we need to close the `finishCh` channel, one is when context is done, the other one is @@ -363,6 +367,8 @@ type copIteratorWorker struct { vars *kv.Variables memTracker *memory.Tracker + + replicaReadSeed uint32 } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -462,6 +468,8 @@ func (it *copIterator) open(ctx context.Context) { vars: it.vars, memTracker: it.memTracker, + + replicaReadSeed: it.replicaReadSeed, } go worker.run(ctx) } @@ -630,11 +638,11 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) sender := NewRegionRequestSender(worker.store.regionCache, worker.store.client) - req := tikvrpc.NewRequest(task.cmdType, &coprocessor.Request{ + req := tikvrpc.NewReplicaReadRequest(task.cmdType, &coprocessor.Request{ Tp: worker.req.Tp, Data: worker.req.Data, Ranges: task.ranges.toPBRanges(), - }, kvrpcpb.Context{ + }, worker.req.ReplicaRead, worker.replicaReadSeed, kvrpcpb.Context{ IsolationLevel: pbIsolationLevel(worker.req.IsolationLevel), Priority: kvPriorityToCommandPri(worker.req.Priority), NotFillCache: worker.req.NotFillCache, diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 8f2cbe68ea2d7..8090c98b39ee3 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -21,6 +21,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/coreos/etcd/clientv3" @@ -146,6 +147,8 @@ type tikvStore struct { spTime time.Time spMutex sync.RWMutex // this is used to update safePoint and spTime closed chan struct{} // this is used to nofity when the store is closed + + replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled } func (s *tikvStore) UpdateSPCache(cachedSP uint64, cachedTime time.Time) { @@ -181,16 +184,17 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie return nil, errors.Trace(err) } store := &tikvStore{ - clusterID: pdClient.GetClusterID(context.TODO()), - uuid: uuid, - oracle: o, - client: client, - pdClient: pdClient, - regionCache: NewRegionCache(pdClient), - kv: spkv, - safePoint: 0, - spTime: time.Now(), - closed: make(chan struct{}), + clusterID: pdClient.GetClusterID(context.TODO()), + uuid: uuid, + oracle: o, + client: client, + pdClient: pdClient, + regionCache: NewRegionCache(pdClient), + kv: spkv, + safePoint: 0, + spTime: time.Now(), + closed: make(chan struct{}), + replicaReadSeed: rand.Uint32(), } store.lockResolver = newLockResolver(store) store.enableGC = enableGC @@ -264,7 +268,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { // BeginWithStartTS begins a transaction with startTS. func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { - txn, err := newTikvTxnWithStartTS(s, startTS) + txn, err := newTikvTxnWithStartTS(s, startTS, s.nextReplicaReadSeed()) if err != nil { return nil, errors.Trace(err) } @@ -273,7 +277,7 @@ func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { } func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { - snapshot := newTiKVSnapshot(s, ver) + snapshot := newTiKVSnapshot(s, ver, s.nextReplicaReadSeed()) metrics.TiKVSnapshotCounter.Inc() return snapshot, nil } @@ -343,9 +347,14 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { } } +func (s *tikvStore) nextReplicaReadSeed() uint32 { + return atomic.AddUint32(&s.replicaReadSeed, 1) +} + func (s *tikvStore) GetClient() kv.Client { return &CopClient{ - store: s, + store: s, + replicaReadSeed: s.nextReplicaReadSeed(), } } diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index 2fa2c8e12c20a..2675ef436011f 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -158,7 +158,7 @@ func (s *testLockSuite) TestScanLockResolveWithBatchGet(c *C) { ver, err := s.store.CurrentVersion() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(s.store, ver) + snapshot := newTiKVSnapshot(s.store, ver, 0) m, err := snapshot.BatchGet(context.Background(), keys) c.Assert(err, IsNil) c.Assert(len(m), Equals, int('z'-'a'+1)) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index eda9a5260713d..bfcc6ade57d58 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/client" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -86,6 +87,26 @@ func (r *RegionStore) clone() *RegionStore { } } +// return next follower store's index +func (r *RegionStore) follower(seed uint32) int32 { + l := uint32(len(r.stores)) + if l <= 1 { + return r.workStoreIdx + } + + for retry := l - 1; retry > 0; retry-- { + followerIdx := int32(seed % (l - 1)) + if followerIdx >= r.workStoreIdx { + followerIdx++ + } + if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) { + return followerIdx + } + seed++ + } + return r.workStoreIdx +} + // init initializes region after constructed. func (r *Region) init(c *RegionCache) { // region store pull used store from global store map @@ -257,7 +278,7 @@ func (c *RPCContext) String() string { // GetRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. -func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { +func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32) (*RPCContext, error) { ts := time.Now().Unix() cachedRegion := c.getCachedRegionWithRLock(id) @@ -270,7 +291,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } regionStore := cachedRegion.getStore() - store, peer, storeIdx := cachedRegion.WorkStorePeer(regionStore) + var store *Store + var peer *metapb.Peer + var storeIdx int + switch replicaRead { + case kv.ReplicaReadFollower: + store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed) + default: + store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore) + } addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { return nil, err @@ -282,7 +311,7 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } storeFailEpoch := atomic.LoadUint32(&store.fail) - if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] { + if storeFailEpoch != regionStore.storeFails[storeIdx] { cachedRegion.invalidate() logutil.BgLogger().Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), @@ -912,12 +941,21 @@ func (r *Region) GetLeaderStoreID() uint64 { return r.meta.Peers[int(r.getStore().workStoreIdx)].StoreId } +func (r *Region) getStorePeer(rs *RegionStore, pidx int32) (store *Store, peer *metapb.Peer, idx int) { + store = rs.stores[pidx] + peer = r.meta.Peers[pidx] + idx = int(pidx) + return +} + // WorkStorePeer returns current work store with work peer. func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, idx int) { - idx = int(rs.workStoreIdx) - store = rs.stores[rs.workStoreIdx] - peer = r.meta.Peers[rs.workStoreIdx] - return + return r.getStorePeer(rs, rs.workStoreIdx) +} + +// FollowerStorePeer returns a follower store with follower peer. +func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, idx int) { + return r.getStorePeer(rs, rs.follower(followerStoreSeed)) } // RegionVerID is a unique ID that can identify a Region at a specific version. @@ -961,19 +999,20 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { rs := r.getStore() - if int(rs.workStoreIdx) != currentPeerIdx { - return - } if err != nil { // TODO: refine err, only do this for some errors. - s := rs.stores[rs.workStoreIdx] - epoch := rs.storeFails[rs.workStoreIdx] + s := rs.stores[currentPeerIdx] + epoch := rs.storeFails[currentPeerIdx] if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { logutil.BgLogger().Info("mark store's regions need be refill", zap.String("store", s.addr)) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() } } + if int(rs.workStoreIdx) != currentPeerIdx { + return + } + nextIdx := (currentPeerIdx + 1) % len(rs.stores) newRegionStore := rs.clone() newRegionStore.workStoreIdx = int32(nextIdx) diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 4d9ebecd768f2..ab721473f9431 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -17,22 +17,24 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" "github.com/google/btree" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" ) type testRegionCacheSuite struct { OneByOneSuite cluster *mocktikv.Cluster - store1 uint64 - store2 uint64 - peer1 uint64 - peer2 uint64 + store1 uint64 // store1 is leader + store2 uint64 // store2 is follower + peer1 uint64 // peer1 is leader + peer2 uint64 // peer2 is follower region1 uint64 cache *RegionCache bo *Backoffer @@ -105,10 +107,10 @@ func (s *testRegionCacheSuite) getRegionWithEndKey(c *C, key []byte) *Region { return r } -func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { +func (s *testRegionCacheSuite) getAddr(c *C, key []byte, replicaRead kv.ReplicaReadType, seed uint32) string { loc, err := s.cache.LocateKey(s.bo, key) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, replicaRead, seed) c.Assert(err, IsNil) if ctx == nil { return "" @@ -117,10 +119,12 @@ func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { } func (s *testRegionCacheSuite) TestSimple(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) c.Assert(r.GetMeta(), DeepEquals, r.meta) c.Assert(r.GetLeaderID(), Equals, r.meta.Peers[r.getStore().workStoreIdx].Id) @@ -134,7 +138,10 @@ func (s *testRegionCacheSuite) TestDropStore(c *C) { s.cluster.RemoveStore(s.store1) loc, err := s.cache.LocateKey(bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(bo, loc.Region) + ctx, err := s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx, IsNil) + ctx, err = s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) c.Assert(err, IsNil) c.Assert(ctx, IsNil) s.checkCache(c, 0) @@ -155,6 +162,7 @@ func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // tikv-server reports `NotLeader` @@ -163,15 +171,18 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) r = s.getRegionWithEndKey(c, []byte("z")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("z")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("z"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) } func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // new store3 becomes leader @@ -186,7 +197,20 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + follower := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower, Equals, s.storeAddr(store3)) + } + follower2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower2, Equals, s.storeAddr(store3)) + } + c.Assert(follower, Not(Equals), follower2) // tikv-server notifies new leader to pd-server. s.cluster.ChangeLeader(s.region1, peer3) @@ -195,10 +219,24 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r = s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + follower = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } + follower2 = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } + c.Assert(follower, Not(Equals), follower2) } func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // store2 becomes leader @@ -219,11 +257,16 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - addr := s.getAddr(c, []byte("a")) + addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") s.getRegion(c, []byte("a")) // pd-server should return the new leader. - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + addr = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + c.Assert(addr == s.storeAddr(s.store1) || len(addr) == 0, IsTrue) + addr2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + c.Assert(addr2 == s.storeAddr(s.store1) || len(addr2) == 0, IsTrue) + c.Assert((len(addr2) == 0 && len(addr) == 0) || addr != addr2, IsTrue) } func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { @@ -236,22 +279,74 @@ func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 1 it will return NotLeader, leader back to 2 again s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { @@ -264,30 +359,100 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == peer3, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 2, it's in hibernate and return 0 leader, so switch to 3 s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // again peer back to 1 - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { @@ -305,12 +470,12 @@ func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { c.Assert(loc2.Region.id, Equals, region2) // Send fail on region1 - ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) + ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) s.checkCache(c, 2) s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error")) // Get region2 cache will get nil then reload. - ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region) + ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region, kv.ReplicaReadLeader, 0) c.Assert(ctx2, IsNil) c.Assert(err, IsNil) } @@ -325,34 +490,106 @@ func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // send 2 fail leader switch to 3 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == s.peer2, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // 3 can be access, so switch to 1 s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSplit(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) // split to ['' - 'm' - 'z'] region2 := s.cluster.AllocID() @@ -365,7 +602,8 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { r = s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, region2) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) r = s.getRegionWithEndKey(c, []byte("m")) @@ -397,6 +635,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { } func (s *testRegionCacheSuite) TestReconnect(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) @@ -406,7 +645,8 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) } @@ -626,6 +866,37 @@ func (s *testRegionCacheSuite) TestBatchLoadRegions(c *C) { s.checkCache(c, len(regions)) } +func (s *testRegionCacheSuite) TestFollowerReadFallback(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // verify follower to be store2 and store3 + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 1) + c.Assert(err, IsNil) + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + + // send fail on store2, next follower read is going to fallback to store3 + s.cache.OnSendFail(s.bo, ctxFollower1, false, errors.New("test error")) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, peer3) +} + func createSampleRegion(startKey, endKey []byte) *Region { return &Region{ meta: &metapb.Region{ diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 95ffd4d239974..ce4756840f956 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -94,8 +94,14 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re } }) + var replicaRead kv.ReplicaReadType + if req.ReplicaRead { + replicaRead = kv.ReplicaReadFollower + } else { + replicaRead = kv.ReplicaReadLeader + } for { - ctx, err := s.regionCache.GetRPCContext(bo, regionID) + ctx, err := s.regionCache.GetRPCContext(bo, regionID, replicaRead, req.ReplicaReadSeed) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index 2bb824126fbbb..3a64e13f775c0 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -135,6 +135,11 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) { c.Assert(err, IsNil) c.Assert(resp.Resp, NotNil) c.Assert(ctx, NotNil) + req.ReplicaRead = true + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second) + c.Assert(err, IsNil) + c.Assert(resp.Resp, NotNil) + c.Assert(ctx, NotNil) } func (s *testRegionRequestSuite) TestOnSendFailedWithCancelled(c *C) { diff --git a/store/tikv/safepoint_test.go b/store/tikv/safepoint_test.go index 03d84467d23ea..93d889c3243f6 100644 --- a/store/tikv/safepoint_test.go +++ b/store/tikv/safepoint_test.go @@ -113,7 +113,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) { s.waitUntilErrorPlugIn(txn4.startTS) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn4.StartTS()}, 0) _, batchgeterr := snapshot.BatchGet(context.Background(), keys) c.Assert(batchgeterr, NotNil) isFallBehind = terror.ErrorEqual(errors.Cause(geterr2), ErrGCTooEarly) diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 645ecd93f04b5..70e51cd7d58f1 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -198,7 +198,7 @@ func (s *Scanner) getData(bo *Backoffer) error { sreq.EndKey = reqStartKey sreq.Reverse = true } - req := tikvrpc.NewRequest(tikvrpc.CmdScan, sreq, pb.Context{ + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdScan, sreq, s.snapshot.replicaRead, s.snapshot.replicaReadSeed, pb.Context{ Priority: s.snapshot.priority, NotFillCache: s.snapshot.notFillCache, }) diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index 204bcc95783d9..05cd2f89d3898 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -41,7 +41,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) scanner, err := newScanner(snapshot, []byte("a"), nil, 10, false) c.Assert(err, IsNil) for ch := byte('a'); ch <= byte('z'); ch++ { @@ -74,7 +74,7 @@ func (s *testScanMockSuite) TestReverseScan(c *C) { txn, err = store.Begin() c.Assert(err, IsNil) - snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(store, kv.Version{Ver: txn.StartTS()}, 0) scanner, err := newScanner(snapshot, nil, []byte("z"), 10, true) c.Assert(err, IsNil) for ch := byte('y'); ch >= byte('a'); ch-- { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 98ce8eb8954b3..875b45ae45212 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -49,22 +49,25 @@ var ( // tikvSnapshot implements the kv.Snapshot interface. type tikvSnapshot struct { - store *tikvStore - version kv.Version - priority pb.CommandPri - notFillCache bool - syncLog bool - keyOnly bool - vars *kv.Variables + store *tikvStore + version kv.Version + priority pb.CommandPri + notFillCache bool + syncLog bool + keyOnly bool + vars *kv.Variables + replicaRead kv.ReplicaReadType + replicaReadSeed uint32 } // newTiKVSnapshot creates a snapshot of an TiKV store. -func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { +func newTiKVSnapshot(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *tikvSnapshot { return &tikvSnapshot{ - store: store, - version: ver, - priority: pb.CommandPri_Normal, - vars: kv.DefaultVars, + store: store, + version: ver, + priority: pb.CommandPri_Normal, + vars: kv.DefaultVars, + replicaReadSeed: replicaReadSeed, } } @@ -154,10 +157,10 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll pending := batch.keys for { - req := tikvrpc.NewRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{ + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdBatchGet, &pb.BatchGetRequest{ Keys: pending, Version: s.version.Ver, - }, pb.Context{ + }, s.replicaRead, s.replicaReadSeed, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, }) @@ -232,11 +235,11 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { sender := NewRegionRequestSender(s.store.regionCache, s.store.client) - req := tikvrpc.NewRequest(tikvrpc.CmdGet, + req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &pb.GetRequest{ Key: k, Version: s.version.Ver, - }, pb.Context{ + }, s.replicaRead, s.replicaReadSeed, pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, }) @@ -298,6 +301,23 @@ func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { return scanner, errors.Trace(err) } +// SetOption sets an option with a value, when val is nil, uses the default +// value of this option. Only ReplicaRead is supported for snapshot +func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = val.(kv.ReplicaReadType) + } +} + +// ClearFollowerRead disables follower read on current transaction +func (s *tikvSnapshot) DelOption(opt kv.Option) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = kv.ReplicaReadLeader + } +} + func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 6c15cf1ac8249..7ea31aebe801e 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -67,7 +67,7 @@ func (s *testSnapshotSuite) beginTxn(c *C) *tikvTxn { func (s *testSnapshotSuite) checkAll(keys []kv.Key, c *C) { txn := s.beginTxn(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) m, err := snapshot.BatchGet(context.Background(), keys) c.Assert(err, IsNil) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 4ee508282e564..fe5caa718891e 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -58,7 +58,7 @@ func (s *testSplitSuite) TestSplitBatchGet(c *C) { c.Assert(err, IsNil) txn := s.begin(c) - snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) keys := [][]byte{{'a'}, {'b'}, {'c'}} _, region, err := s.store.regionCache.GroupKeysByRegion(s.bo, keys) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index c016fbd694e53..9f2e1f6eb14fb 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/tikvpb" + "github.com/pingcap/tidb/kv" ) // CmdType represents the concrete request type in Request or response type in Response. @@ -137,6 +138,7 @@ type Request struct { Type CmdType req interface{} kvrpcpb.Context + ReplicaReadSeed uint32 } // NewRequest returns new kv rpc request. @@ -154,6 +156,14 @@ func NewRequest(typ CmdType, pointer interface{}, ctxs ...kvrpcpb.Context) *Requ } } +// NewReplicaReadRequest returns new kv rpc request with replica read. +func NewReplicaReadRequest(typ CmdType, pointer interface{}, replicaReadType kv.ReplicaReadType, replicaReadSeed uint32, ctxs ...kvrpcpb.Context) *Request { + req := NewRequest(typ, pointer, ctxs...) + req.ReplicaRead = replicaReadType.IsFollowerRead() + req.ReplicaReadSeed = replicaReadSeed + return req +} + // Get returns GetRequest in request. func (req *Request) Get() *kvrpcpb.GetRequest { return req.req.(*kvrpcpb.GetRequest) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index ec7a266e916af..1c7a32b024daa 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -82,13 +82,13 @@ func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { if err != nil { return nil, errors.Trace(err) } - return newTikvTxnWithStartTS(store, startTS) + return newTikvTxnWithStartTS(store, startTS, store.nextReplicaReadSeed()) } // newTikvTxnWithStartTS creates a txn with startTS. -func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) { +func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) { ver := kv.NewVersion(startTS) - snapshot := newTiKVSnapshot(store, ver) + snapshot := newTiKVSnapshot(store, ver, replicaReadSeed) return &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), diff --git a/util/admin/admin.go b/util/admin/admin.go index 2dbbe77fd7ade..540f2d5244512 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -555,6 +555,10 @@ func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver k return nil, 0, errors.Trace(err) } + if sessCtx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snap.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + records, nextHandle, err := ScanTableRecord(sessCtx, snap, t, startHandle, limit) return records, nextHandle, errors.Trace(err)