Skip to content

Commit

Permalink
Cherry pick follower read to TiDB (#11347) (#12535)
Browse files Browse the repository at this point in the history
  • Loading branch information
shldreams authored and sre-bot committed Oct 12, 2019
1 parent 9ca4666 commit 7459d01
Show file tree
Hide file tree
Showing 33 changed files with 613 additions and 92 deletions.
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
return builder
}

Expand Down
33 changes: 33 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -429,6 +431,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -474,6 +477,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -554,3 +558,32 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower

concurrency := 10

actual, err := (&RequestBuilder{}).
SetFromSessionVars(vars).
SetConcurrency(concurrency).
Build()
c.Assert(err, IsNil)

expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadFollower,
}

c.Assert(actual, DeepEquals, expect)
}
12 changes: 10 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -914,6 +915,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(t.StartKey, t.EndKey)
if err != nil {
Expand All @@ -931,10 +935,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))

for i := workID; i < len(e.sampTasks); i += e.concurrency {
task := e.sampTasks[i]
if task.SampSize == 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ PARTITION BY RANGE ( a ) (
}
}

func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
tk.MustExec("analyze table t")
}

func (s *testSuite1) TestAnalyzeParameters(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190917040145-a90dba59f50d
github.com/pingcap/pd v0.0.0-20190711034019-ee98bf9063e9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d h1:vgmgBxetJN6YXdlbpjC0sxVceFrCV2aWlXETt7hGwH0=
github.com/pingcap/kvproto v0.0.0-20190923055505-97975d13947d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
27 changes: 27 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
)

// Priority value for transaction priority.
Expand All @@ -68,6 +70,23 @@ const (
RC
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
Expand Down Expand Up @@ -214,6 +233,8 @@ type Request struct {
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down Expand Up @@ -245,6 +266,12 @@ type Snapshot interface {
BatchGet(keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
3 changes: 3 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}

func (s *mockSnapshot) SetOption(opt Option, val interface{}) {}
func (s *mockSnapshot) DelOption(opt Option) {}
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ type session struct {
statsCollector *handle.SessionStatsCollector
// ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerChecker owner.DDLOwnerChecker

// shared coprocessor client per session
client kv.Client
}

// DDLOwnerChecker returns s.ddlOwnerChecker.
Expand Down Expand Up @@ -495,7 +498,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
}

func (s *session) GetClient() kv.Client {
return s.store.GetClient()
return s.client
}

func (s *session) String() string {
Expand Down Expand Up @@ -1268,6 +1271,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
if s.sessionVars.ReplicaRead.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
return &s.txn, nil
}
Expand All @@ -1291,6 +1297,9 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand Down Expand Up @@ -1570,6 +1579,7 @@ func createSession(store kv.Storage) (*session, error) {
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
ddlOwnerChecker: dom.DDL().OwnerManager(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand All @@ -1593,6 +1603,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
store: store,
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,3 +2714,15 @@ func (s *testSessionSuite) TestGrantViewRelated(c *C) {
tkUser.MustQuery("select current_user();").Check(testkit.Rows("u_version29@%"))
tkUser.MustExec("create view v_version29_c as select * from v_version29;")
}

func (s *testSessionSuite) TestReplicaRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ type SessionVars struct {

// ConnectionInfo indicates current connection info used by current session, only be lazy assigned by plugin.
ConnectionInfo *ConnectionInfo

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -444,6 +447,7 @@ func NewSessionVars() *SessionVars {
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
ReplicaRead: kv.ReplicaReadLeader,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -828,6 +832,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
// It's a global variable, but it also wants to be cached in server.
case TiDBMaxDeltaSchemaCount:
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeSession, TiDBReplicaRead, "leader"},
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
}

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) {

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")

f = GetSysVar("tidb_replica_read")
c.Assert(f.Value, Equals, "leader")
}

func (*testSysVarSuite) TestTxnMode(c *C) {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,9 @@ const (

// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"

// TiDBReplicaRead is used for reading data from replicas, followers for example.
TiDBReplicaRead = "tidb_replica_read"
)

// TiDB system variable names that both in session and global scope.
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value)
}
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBEnableStmtSummary:
switch {
case strings.EqualFold(value, "ON") || value == "1":
Expand Down
Loading

0 comments on commit 7459d01

Please sign in to comment.