Skip to content

Commit

Permalink
Cherry pick follower read to TiDB (pingcap#11347) (pingcap#13464)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood committed Nov 20, 2019
1 parent c700fd2 commit 68af256
Show file tree
Hide file tree
Showing 26 changed files with 608 additions and 78 deletions.
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
return builder
}

Expand Down
33 changes: 33 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -429,6 +431,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -474,6 +477,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -554,3 +558,32 @@ func (s *testSuite) TestRequestBuilder6(c *C) {

c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower

concurrency := 10

actual, err := (&RequestBuilder{}).
SetFromSessionVars(vars).
SetConcurrency(concurrency).
Build()
c.Assert(err, IsNil)

expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadFollower,
}

c.Assert(actual, DeepEquals, expect)
}
12 changes: 10 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
}
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
// we always use the first follower when follower read is enabled
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0)
if *err != nil {
return
}
Expand Down Expand Up @@ -917,6 +918,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(t.StartKey, t.EndKey)
if err != nil {
Expand All @@ -934,10 +938,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))

for i := workID; i < len(e.sampTasks); i += e.concurrency {
task := e.sampTasks[i]
if task.SampSize == 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ PARTITION BY RANGE ( a ) (
}
}

func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
tk.MustExec("analyze table t")
}

func (s *testSuite1) TestAnalyzeParameters(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
Expand Down
27 changes: 27 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// Set replica read
ReplicaRead
)

// Priority value for transaction priority.
Expand All @@ -68,6 +70,23 @@ const (
RC
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = 1 << iota
// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
Expand Down Expand Up @@ -216,6 +235,8 @@ type Request struct {
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down Expand Up @@ -247,6 +268,12 @@ type Snapshot interface {
BatchGet(keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
SetOption(opt Option, val interface{})
// DelOption deletes an option.
DelOption(opt Option)
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
3 changes: 3 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}

func (s *mockSnapshot) SetOption(opt Option, val interface{}) {}
func (s *mockSnapshot) DelOption(opt Option) {}
13 changes: 12 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ type session struct {
statsCollector *handle.SessionStatsCollector
// ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement;
ddlOwnerChecker owner.DDLOwnerChecker

// shared coprocessor client per session
client kv.Client
}

// DDLOwnerChecker returns s.ddlOwnerChecker.
Expand Down Expand Up @@ -495,7 +498,7 @@ func (s *session) RollbackTxn(ctx context.Context) {
}

func (s *session) GetClient() kv.Client {
return s.store.GetClient()
return s.client
}

func (s *session) String() string {
Expand Down Expand Up @@ -1271,6 +1274,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
if s.sessionVars.ReplicaRead.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
}
return &s.txn, nil
}
Expand All @@ -1294,6 +1300,9 @@ func (s *session) NewTxn(ctx context.Context) error {
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand Down Expand Up @@ -1573,6 +1582,7 @@ func createSession(store kv.Storage) (*session, error) {
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
ddlOwnerChecker: dom.DDL().OwnerManager(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand All @@ -1596,6 +1606,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
store: store,
parser: parser.New(),
sessionVars: variable.NewSessionVars(),
client: store.GetClient(),
}
if plannercore.PreparedPlanCacheEnabled() {
s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity,
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2714,3 +2714,15 @@ func (s *testSessionSuite) TestGrantViewRelated(c *C) {
tkUser.MustQuery("select current_user();").Check(testkit.Rows("u_version29@%"))
tkUser.MustExec("create view v_version29_c as select * from v_version29;")
}

func (s *testSessionSuite) TestReplicaRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
}
10 changes: 10 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ type SessionVars struct {
// ConnectionInfo indicates current connection info used by current session, only be lazy assigned by plugin.
ConnectionInfo *ConnectionInfo

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType

// StartTime is the start time of the last query.
StartTime time.Time

Expand Down Expand Up @@ -474,6 +477,7 @@ func NewSessionVars() *SessionVars {
SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile,
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
ReplicaRead: kv.ReplicaReadLeader,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
}
Expand Down Expand Up @@ -847,6 +851,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
// It's a global variable, but it also wants to be cached in server.
case TiDBMaxDeltaSchemaCount:
SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount))
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
case TiDBStoreLimit:
storeutil.StoreLimit.Store(tidbOptInt64(val, DefTiDBStoreLimit))
}
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)},
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeSession, TiDBReplicaRead, "leader"},
{ScopeSession, TiDBAllowRemoveAutoInc, BoolToIntStr(DefTiDBAllowRemoveAutoInc)},
{ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"},
{ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)},
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) {

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")

f = GetSysVar("tidb_replica_read")
c.Assert(f.Value, Equals, "leader")
}

func (*testSysVarSuite) TestTxnMode(c *C) {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ const (
// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"

// TiDBReplicaRead is used for reading data from replicas, followers for example.
TiDBReplicaRead = "tidb_replica_read"

// TiDBAllowRemoveAutoInc indicates whether a user can drop the auto_increment column attribute or not.
TiDBAllowRemoveAutoInc = "tidb_allow_remove_auto_inc"
)
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
default:
return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value)
}
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
case TiDBAllowRemoveAutoInc:
switch {
case strings.EqualFold(value, "ON") || value == "1":
Expand Down
13 changes: 13 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testleak"
)
Expand Down Expand Up @@ -294,6 +295,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(err, IsNil)
c.Assert(val, Equals, "0")
c.Assert(v.CorrelationThreshold, Equals, float64(0))

SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "follower")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower)
SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader)
}

func (s *testVarsutilSuite) TestValidate(c *C) {
Expand Down Expand Up @@ -348,6 +360,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBTxnMode, "pessimistic", false},
{TiDBTxnMode, "optimistic", false},
{TiDBTxnMode, "", false},
{TiDBReplicaRead, "invalid", true},
}

for _, t := range tests {
Expand Down
Loading

0 comments on commit 68af256

Please sign in to comment.