Skip to content

Commit

Permalink
Add follower read support to TiDB (#11347)
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaoguang authored and sre-bot committed Aug 16, 2019
1 parent 8a16172 commit 523b936
Show file tree
Hide file tree
Showing 33 changed files with 616 additions and 99 deletions.
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
return builder
}

Expand Down
33 changes: 33 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -388,6 +389,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -432,6 +434,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -477,6 +480,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -558,6 +562,35 @@ func (s *testSuite) TestRequestBuilder6(c *C) {
c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower

concurrency := 10

actual, err := (&RequestBuilder{}).
SetFromSessionVars(vars).
SetConcurrency(concurrency).
Build()
c.Assert(err, IsNil)

expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadFollower,
}

c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) {
ranges := []*ranger.Range{
{
Expand Down
12 changes: 10 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
})
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -924,6 +925,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if err != nil {
return 0, err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(t.StartKey, t.EndKey)
if err != nil {
Expand All @@ -942,10 +946,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))

for i := workID; i < len(e.sampTasks); i += e.concurrency {
task := e.sampTasks[i]
if task.SampSize == 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ PARTITION BY RANGE ( a ) (
}
}

func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
tk.MustExec("analyze table t")
}

func (s *testSuite1) TestAnalyzeRestrict(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190815020744-315a7ae5e232
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7 h1:BMrtxXqQeZ9y27LN/V3PHA/tSyDWHK+90VLYaymrXQE=
github.com/pingcap/kvproto v0.0.0-20190724165112-ec9df5f208a7/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
27 changes: 27 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
)

// Priority value for transaction priority.
Expand All @@ -69,6 +71,23 @@ const (
RC
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
Expand Down Expand Up @@ -221,6 +240,8 @@ type Request struct {
// Streaming indicates using streaming API for this request, result in that one Next()
// call would not corresponds to a whole region result.
Streaming bool
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down Expand Up @@ -252,6 +273,12 @@ type Snapshot interface {
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
3 changes: 3 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}

func (s *mockSnapshot) SetOption(opt Option, val interface{}) {}
func (s *mockSnapshot) DelOption(opt Option) {}
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ type session struct {
ddlOwnerChecker owner.DDLOwnerChecker
// lockedTables use to record the table locks hold by the session.
lockedTables map[int64]model.TableLockTpInfo

// shared coprocessor client per session
client kv.Client
}

// AddTableLock adds table lock to the session lock map.
Expand Down Expand Up @@ -546,7 +549,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
}

func (s *session) GetClient() kv.Client {
return s.store.GetClient()
return s.client
}

func (s *session) String() string {
Expand Down Expand Up @@ -1255,6 +1258,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
if s.sessionVars.ReplicaRead.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
return &s.txn, nil
}
Expand Down Expand Up @@ -1315,6 +1321,9 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand Down Expand Up @@ -1607,6 +1616,7 @@ func createSession(store kv.Storage) (*session, error) {
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
ddlOwnerChecker: dom.DDL().OwnerManager(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand All @@ -1631,6 +1641,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
store: store,
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2796,3 +2796,15 @@ func (s *testSessionSuite) TestLoadClientInteractive(c *C) {
tk.Se.GetSessionVars().ClientCapability = tk.Se.GetSessionVars().ClientCapability | mysql.ClientInteractive
tk.MustQuery("select @@wait_timeout").Check(testkit.Rows("28800"))
}

func (s *testSessionSuite) TestReplicaRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ type SessionVars struct {

// use noop funcs or not
EnableNoopFuncs bool

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -453,6 +456,7 @@ func NewSessionVars() *SessionVars {
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
EnableIndexMerge: false,
EnableNoopFuncs: DefTiDBEnableNoopFuncs,
ReplicaRead: kv.ReplicaReadLeader,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -831,6 +835,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableIndexMerge = TiDBOptOn(val)
case TiDBEnableNoopFuncs:
s.EnableNoopFuncs = TiDBOptOn(val)
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeGlobal | ScopeSession, TiDBEnableNoopFuncs, BoolToIntStr(DefTiDBEnableNoopFuncs)},
{ScopeSession, TiDBReplicaRead, "leader"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) {

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")

f = GetSysVar("tidb_replica_read")
c.Assert(f.Value, Equals, "leader")
}

func (*testSysVarSuite) TestTxnMode(c *C) {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ const (

// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"

// TiDBReplicaRead is used for reading data from replicas, followers for example.
TiDBReplicaRead = "tidb_replica_read"
)

// TiDB system variable names that both in session and global scope.
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
if v <= 0 {
return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v)
}
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
}
Expand Down
Loading

0 comments on commit 523b936

Please sign in to comment.