Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add follower read support to TiDB #11347

Merged
merged 24 commits into from
Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
11f6f5e
Initial effort to add follower read to TiDB
sunxiaoguang Jul 20, 2019
ef3a32b
Add tests for replica read
sunxiaoguang Jul 23, 2019
8beffef
Merge branch 'master' into replica_read
sunxiaoguang Jul 23, 2019
013e0ca
Updated kvproto dependency and fixed a test
sunxiaoguang Jul 23, 2019
7277838
Add more tests
sunxiaoguang Jul 23, 2019
31f1333
Merge branch 'master' of https://github.com/pingcap/tidb into replica…
sunxiaoguang Jul 24, 2019
c43f94b
Merge branch 'master' into replica_read
sunxiaoguang Jul 25, 2019
315aa95
Update kvproto
sunxiaoguang Jul 31, 2019
7bb7d76
Run go mod tidy
sunxiaoguang Aug 1, 2019
3c3f2e0
Merge branch 'master' into replica_read
sunxiaoguang Aug 1, 2019
f4d59d6
Merge branch 'master' into replica_read
sunxiaoguang Aug 2, 2019
237005e
Refactor interface between SQL and KV layers
sunxiaoguang Aug 2, 2019
136d760
Do not store index of followers in region cache
sunxiaoguang Aug 5, 2019
93c88d4
Use session scope coprocessor client
sunxiaoguang Aug 5, 2019
03c9576
Merge branch 'master' into replica_read
sunxiaoguang Aug 5, 2019
3968c6b
Try next follower if selected follower had failed
sunxiaoguang Aug 6, 2019
3bcd5c8
Handle failed follower when reading from it
sunxiaoguang Aug 10, 2019
1775d42
Merge branch 'master' into replica_read
sunxiaoguang Aug 11, 2019
13065e8
Merge branch 'master' into replica_read
sunxiaoguang Aug 11, 2019
39c9add
Merge branch 'master' into replica_read
coocood Aug 12, 2019
d3b64ce
Merge branch 'master' of https://github.com/pingcap/tidb into replica…
sunxiaoguang Aug 13, 2019
b03c2d9
Merge branch 'master' into replica_read
sunxiaoguang Aug 14, 2019
682333a
Assign different replica read seeds to snapshots
sunxiaoguang Aug 15, 2019
b0bf2a3
Merge branch 'master' into replica_read
sre-bot Aug 16, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int {
}

// SetFromSessionVars sets the following fields for "kv.Request" from session variables:
// "Concurrency", "IsolationLevel", "NotFillCache".
// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead".
func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder {
builder.Request.Concurrency = sv.DistSQLScanConcurrency
builder.Request.IsolationLevel = builder.getIsolationLevel()
builder.Request.NotFillCache = sv.StmtCtx.NotFillCache
builder.Request.Priority = builder.getKVPriority(sv)
builder.Request.ReplicaRead = sv.ReplicaRead
return builder
}

Expand Down
33 changes: 33 additions & 0 deletions distsql/request_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -388,6 +389,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -432,6 +434,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) {
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -477,6 +480,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) {
Streaming: true,
NotFillCache: false,
SyncLog: false,
ReplicaRead: kv.ReplicaReadLeader,
}
c.Assert(actual, DeepEquals, expect)
}
Expand Down Expand Up @@ -558,6 +562,35 @@ func (s *testSuite) TestRequestBuilder6(c *C) {
c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestRequestBuilder7(c *C) {
vars := variable.NewSessionVars()
vars.ReplicaRead = kv.ReplicaReadFollower

concurrency := 10

actual, err := (&RequestBuilder{}).
SetFromSessionVars(vars).
SetConcurrency(concurrency).
Build()
c.Assert(err, IsNil)

expect := &kv.Request{
Tp: 0,
StartTs: 0x0,
KeepOrder: false,
Desc: false,
Concurrency: concurrency,
IsolationLevel: 0,
Priority: 0,
NotFillCache: false,
SyncLog: false,
Streaming: false,
ReplicaRead: kv.ReplicaReadFollower,
}

c.Assert(actual, DeepEquals, expect)
}

func (s *testSuite) TestTableRangesToKVRangesWithFbs(c *C) {
ranges := []*ranger.Range{
{
Expand Down
11 changes: 9 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild
})
var resp *tikvrpc.Response
var rpcCtx *tikv.RPCContext
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region)
rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead)
if *err != nil {
return
}
Expand Down Expand Up @@ -925,6 +925,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
if err != nil {
return 0, err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetFollowerRead()
}
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(t.StartKey, t.EndKey)
if err != nil {
Expand All @@ -943,10 +946,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e
defer e.wg.Done()
var snapshot kv.Snapshot
snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion)
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))
if *err != nil {
return
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
snapshot.SetFollowerRead()
}
rander := rand.New(rand.NewSource(e.randSeed + int64(workID)))

for i := workID; i < len(e.sampTasks); i += e.concurrency {
task := e.sampTasks[i]
if task.SampSize == 0 {
Expand Down
10 changes: 10 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,16 @@ PARTITION BY RANGE ( a ) (
}
}

func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
ctx := tk.Se.(sessionctx.Context)
ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower
tk.MustExec("analyze table t")
}

func (s *testSuite1) TestAnalyzeRestrict(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
3 changes: 3 additions & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() {
e.snapshot.SetFollowerRead()
}
if e.idxInfo != nil {
idxKey, err1 := e.encodeIndexKey()
if err1 != nil && !kv.ErrNotExist.Equal(err1) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531
github.com/pingcap/kvproto v0.0.0-20190720003038-b134cf7a671b
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190719041739-ff945b25f903
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531 h1:8xk2HobDwClB5E3Hv9TEPiS7K7bv3ykWHLyZzuUYywI=
lzmhhh123 marked this conversation as resolved.
Show resolved Hide resolved
github.com/pingcap/kvproto v0.0.0-20190703131923-d9830856b531/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190720003038-b134cf7a671b h1:4z0dvijaS0QTtnzBn+91d3JHVnPqf0PPb4ou6hDRq8Q=
github.com/pingcap/kvproto v0.0.0-20190720003038-b134cf7a671b/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190214045112-b37da76f67a7/go.mod h1:xsfkWVaFVV5B8e1K9seWfyJWFrIhbtUTAD8NV1Pq3+w=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3FwJptMTt6MEPdzK1Wt99oaefQ=
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
Expand Down
29 changes: 29 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ const (
RC
)

// ReplicaReadType is the type of replica to read data from
type ReplicaReadType byte

const (
// ReplicaReadLeader stands for 'read from leader'.
ReplicaReadLeader ReplicaReadType = iota
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer using 1 << iota here, so we can use bit operations to easily check the type is set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though about this initially but doubt if it is rational to read from all kind of replicas. Read from different type of replicas may have different latency characteristics and pose different burden to leader, we can have more discussion about this.

// ReplicaReadFollower stands for 'read from follower'.
ReplicaReadFollower
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if leader is going to be used to read data.
func (r ReplicaReadType) IsFollowerRead() bool {
return r == ReplicaReadFollower
}

// Those limits is enforced to make sure the transaction can be well handled by TiKV.
var (
// TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)).
Expand Down Expand Up @@ -158,6 +175,11 @@ type Transaction interface {
// BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage.
BatchGet(keys []Key) (map[string][]byte, error)
IsPessimistic() bool

// SetFollowerRead sets current transaction to read data from follower
SetFollowerRead()
// ClearFollowerRead disables follower read on current transaction
ClearFollowerRead()
}

// AssertionProto is an interface defined for the assertion protocol.
Expand Down Expand Up @@ -222,6 +244,8 @@ type Request struct {
Streaming bool
// MemTracker is used to trace and control memory usage in co-processor layer.
MemTracker *memory.Tracker
// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead ReplicaReadType
}

// ResultSubset represents a result subset from a single storage unit.
Expand Down Expand Up @@ -253,6 +277,11 @@ type Snapshot interface {
BatchGet(keys []Key) (map[string][]byte, error)
// SetPriority snapshot set the priority
SetPriority(priority int)

// SetFollowerRead sets current snapshot to read data from follower
SetFollowerRead()
// ClearFollowerRead disables follower read on current snapshot
ClearFollowerRead()
}

// Driver is the interface that must be implemented by a KV storage.
Expand Down
7 changes: 7 additions & 0 deletions kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ func (t *mockTxn) SetVars(vars *Variables) {
func (t *mockTxn) SetAssertion(key Key, assertion AssertionType) {}
func (t *mockTxn) ConfirmAssertions(succ bool) {}

func (t *mockTxn) SetFollowerRead() {}
func (t *mockTxn) ClearFollowerRead() {}

// NewMockTxn new a mockTxn.
func NewMockTxn() Transaction {
return &mockTxn{
Expand Down Expand Up @@ -229,3 +232,7 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) {
func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
return s.store.IterReverse(k)
}

func (s *mockSnapshot) SetFollowerRead() {}

func (s *mockSnapshot) ClearFollowerRead() {}
8 changes: 7 additions & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
if s.sessionVars.ReplicaRead.IsFollowerRead() {
s.txn.SetFollowerRead()
}
}
return &s.txn, nil
}
Expand Down Expand Up @@ -1315,7 +1318,10 @@ func (s *session) NewTxn(ctx context.Context) error {
return err
}
txn.SetCap(s.getMembufCap())
txn.SetVars(s.sessionVars.KVVars)
txn.SetVars(s.GetSessionVars().KVVars)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
if s.GetSessionVars().ReplicaRead.IsFollowerRead() {
txn.SetFollowerRead()
}
s.txn.changeInvalidToValid(txn)
is := domain.GetDomain(s).InfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
Expand Down
12 changes: 12 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2796,3 +2796,15 @@ func (s *testSessionSuite) TestLoadClientInteractive(c *C) {
tk.Se.GetSessionVars().ClientCapability = tk.Se.GetSessionVars().ClientCapability | mysql.ClientInteractive
tk.MustQuery("select @@wait_timeout").Check(testkit.Rows("28800"))
}

func (s *testSessionSuite) TestReplicaRead(c *C) {
var err error
tk := testkit.NewTestKit(c, s.store)
tk.Se, err = session.CreateSession4Test(s.store)
c.Assert(err, IsNil)
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
tk.MustExec("set @@tidb_replica_read = 'follower';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower)
tk.MustExec("set @@tidb_replica_read = 'leader';")
c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader)
}
9 changes: 9 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ type SessionVars struct {

// use noop funcs or not
EnableNoopFuncs bool

// ReplicaRead is used for reading data from replicas, only follower is supported at this time.
ReplicaRead kv.ReplicaReadType
}

// ConnectionInfo present connection used by audit.
Expand Down Expand Up @@ -831,6 +834,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
s.EnableIndexMerge = TiDBOptOn(val)
case TiDBEnableNoopFuncs:
s.EnableNoopFuncs = TiDBOptOn(val)
case TiDBReplicaRead:
if strings.EqualFold(val, "follower") {
s.ReplicaRead = kv.ReplicaReadFollower
} else if strings.EqualFold(val, "leader") || len(val) == 0 {
s.ReplicaRead = kv.ReplicaReadLeader
}
sunxiaoguang marked this conversation as resolved.
Show resolved Hide resolved
}
s.systems[name] = val
return nil
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ var defaultSysVars = []*SysVar{
{ScopeSession, TiDBLowResolutionTSO, "0"},
{ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)},
{ScopeGlobal | ScopeSession, TiDBEnableNoopFuncs, BoolToIntStr(DefTiDBEnableNoopFuncs)},
{ScopeSession, TiDBReplicaRead, "leader"},
}

// SynonymsSysVariables is synonyms of system variables.
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) {

f = GetSysVar("tidb_low_resolution_tso")
c.Assert(f.Value, Equals, "0")

f = GetSysVar("tidb_replica_read")
c.Assert(f.Value, Equals, "leader")
}

func (*testSysVarSuite) TestTxnMode(c *C) {
Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ const (

// TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds
TiDBLowResolutionTSO = "tidb_low_resolution_tso"

// TiDBReplicaRead is used for reading data from replicas, followers for example.
TiDBReplicaRead = "tidb_replica_read"
)

// TiDB system variable names that both in session and global scope.
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string,
if v <= 0 {
return value, errors.Errorf("tidb_wait_split_region_timeout(%d) cannot be smaller than 1", v)
}
case TiDBReplicaRead:
if strings.EqualFold(value, "follower") {
return "follower", nil
} else if strings.EqualFold(value, "leader") || len(value) == 0 {
return "leader", nil
}
return value, ErrWrongValueForVar.GenWithStackByArgs(name, value)
}
return value, nil
}
Expand Down
13 changes: 13 additions & 0 deletions sessionctx/variable/varsutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/testleak"
)
Expand Down Expand Up @@ -294,6 +295,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) {
c.Assert(err, IsNil)
c.Assert(val, Equals, "0")
c.Assert(v.CorrelationThreshold, Equals, float64(0))

SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "follower")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower)
SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader"))
val, err = GetSessionSystemVar(v, TiDBReplicaRead)
c.Assert(err, IsNil)
c.Assert(val, Equals, "leader")
c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader)
}

func (s *testVarsutilSuite) TestSetOverflowBehave(c *C) {
Expand Down Expand Up @@ -362,6 +374,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) {
{TiDBMaxChunkSize, "-1", true},
{TiDBOptJoinReorderThreshold, "a", true},
{TiDBOptJoinReorderThreshold, "-1", true},
{TiDBReplicaRead, "invalid", true},
}

for _, t := range tests {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
NotFillCache: worker.req.NotFillCache,
HandleTime: true,
ScanDetail: true,
FollowerRead: worker.req.ReplicaRead.IsFollowerRead(),
})
startTime := time.Now()
resp, rpcCtx, err := sender.SendReqCtx(bo, req, task.region, ReadTimeoutMedium)
Expand Down
Loading