Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: introduce stmtstats and sql execution count #30277

Merged
merged 62 commits into from
Dec 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
ee242e4
Init sql execution count
mornyx Nov 22, 2021
101fd27
Merge branch 'master' into exec-count
mornyx Nov 24, 2021
f9719b7
Merge branch 'master' into exec-count
mornyx Nov 29, 2021
91b2048
Add exec-count for prepared statements; Add comments
mornyx Nov 30, 2021
89f10ba
Add unit-tests
mornyx Nov 30, 2021
9340a27
Add todo
mornyx Nov 30, 2021
136675a
Add license
mornyx Nov 30, 2021
671ccf1
Rename
mornyx Nov 30, 2021
979efa7
Add comments
mornyx Nov 30, 2021
9868bf2
Init sql execution count for tikv
mornyx Dec 3, 2021
1c95a0a
Use exec-count as a separate package to avoid circular dependencies
mornyx Dec 3, 2021
5a53b8e
Integrate ExecCounter and KvExecCounter
mornyx Dec 3, 2021
21df3ec
Add unit-tests
mornyx Dec 3, 2021
5fb0752
Add comments
mornyx Dec 3, 2021
d6919bb
Simplify the code in distsql
mornyx Dec 3, 2021
347480c
Rename
mornyx Dec 3, 2021
684ddf4
Add comments
mornyx Dec 3, 2021
7c0889f
Introduce stmtstats
mornyx Dec 9, 2021
50629ef
Fix npe; Add comments; Remove print
mornyx Dec 9, 2021
a520036
Remove local mod replace
mornyx Dec 9, 2021
119a525
Merge branch 'master' into exec-count
mornyx Dec 9, 2021
b51b394
Fix ut
mornyx Dec 9, 2021
7bc8589
Merge branch 'master' into exec-count
crazycs520 Dec 10, 2021
8fcd43b
Merge branch 'master' into exec-count
mornyx Dec 13, 2021
99dad2c
Merge branch 'master' into exec-count
mornyx Dec 13, 2021
73e2e0a
Remove KvExecCounter pointer in session vars
mornyx Dec 14, 2021
a0f8143
Add plan digest for statement stats
mornyx Dec 14, 2021
dfa0590
Upgrade client-go
mornyx Dec 14, 2021
946ea85
Fix typo
mornyx Dec 14, 2021
1b46677
Comments
mornyx Dec 14, 2021
c2209e9
Comments
mornyx Dec 14, 2021
eed9af8
Merge branch 'master' into exec-count
crazycs520 Dec 15, 2021
f66c79b
Remove session.stmtStats
mornyx Dec 15, 2021
6f1e427
Merge remote-tracking branch 'mornyx/exec-count' into exec-count
mornyx Dec 15, 2021
2381e11
Resolve pr comments
mornyx Dec 15, 2021
bb05c16
Eliminate race
mornyx Dec 15, 2021
f99a7c9
Remove explicit ts param
mornyx Dec 16, 2021
0d74444
Add string method
mornyx Dec 16, 2021
98256cf
Merge branch 'master' into exec-count
mornyx Dec 16, 2021
de99322
Add integration tests for stmt stats
mornyx Dec 17, 2021
816bca2
Add comments
mornyx Dec 17, 2021
d1bce96
Upgrade client-go
mornyx Dec 19, 2021
3bfc8d0
Add kv exec count test case
mornyx Dec 19, 2021
32c85e5
Rename
mornyx Dec 19, 2021
474b6d2
Remove print
mornyx Dec 19, 2021
3a4ff9b
Merge branch 'master' into exec-count
mornyx Dec 19, 2021
fb9b379
Use bytes instead of string
mornyx Dec 20, 2021
78a30ca
Move String() to test file
mornyx Dec 20, 2021
467ac79
Add more UT
mornyx Dec 20, 2021
b8ed21b
Fix compile error in test
mornyx Dec 20, 2021
90d94f5
Simplify globalAggregator
mornyx Dec 20, 2021
e9ecf63
Add StatementObserver
mornyx Dec 20, 2021
699cf7e
Extract method
mornyx Dec 20, 2021
db63338
Merge branch 'master' into exec-count
crazycs520 Dec 21, 2021
b9ed865
Reduce boilerplate code
mornyx Dec 21, 2021
94803d6
Use atomic.Bool instead of uint32
mornyx Dec 21, 2021
a295cf2
Mark SetupAggregator & CloseAggregator as not thread-safe
mornyx Dec 21, 2021
f069615
Rename kvexeccount.go -> kv_exec_count.go
mornyx Dec 21, 2021
f5728d6
Resolve comments
mornyx Dec 21, 2021
b855763
Merge remote-tracking branch 'mornyx/exec-count' into exec-count
mornyx Dec 21, 2021
d6dcca9
Merge branch 'master' into exec-count
mornyx Dec 21, 2021
83dc182
Merge branch 'master' into exec-count
ti-chi-bot Dec 21, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,20 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/trxevents"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"go.uber.org/zap"
)

// DispatchMPPTasks dispatches all tasks and returns an iterator.
func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv.MPPDispatchRequest, fieldTypes []*types.FieldType, planIDs []int, rootID int) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
resp := sctx.GetMPPClient().DispatchMPPTasks(ctx, sctx.GetSessionVars().KVVars, tasks, allowTiFlashFallback)
if resp == nil {
Expand Down Expand Up @@ -88,6 +91,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
zap.String("stmt", originalSQL))
}
}

ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, sctx.GetSessionVars().StmtCtx.MemTracker, enabledRateLimitAction, eventCb)
if resp == nil {
return nil, errors.New("client returns nil response")
Expand Down Expand Up @@ -149,8 +154,9 @@ func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq

// Analyze do a analyze request.
func Analyze(ctx context.Context, client kv.Client, kvReq *kv.Request, vars interface{},
isRestrict bool, sessionMemTracker *memory.Tracker) (SelectResult, error) {
resp := client.Send(ctx, kvReq, vars, sessionMemTracker, false, nil)
isRestrict bool, stmtCtx *stmtctx.StatementContext) (SelectResult, error) {
ctx = WithSQLKvExecCounterInterceptor(ctx, stmtCtx)
resp := client.Send(ctx, kvReq, vars, stmtCtx.MemTracker, false, nil)
if resp == nil {
return nil, errors.New("client returns nil response")
}
Expand Down Expand Up @@ -244,3 +250,15 @@ func init() {
systemEndian = tipb.Endian_LittleEndian
}
}

// WithSQLKvExecCounterInterceptor binds an interceptor for client-go to count the
// number of SQL executions of each TiKV (if any).
func WithSQLKvExecCounterInterceptor(ctx context.Context, stmtCtx *stmtctx.StatementContext) context.Context {
if variable.TopSQLEnabled() && stmtCtx.KvExecCounter != nil {
// Unlike calling Transaction or Snapshot interface, in distsql package we directly
// face tikv Request. So we need to manually bind RPCInterceptor to ctx. Instead of
// calling SetRPCInterceptor on Transaction or Snapshot.
return interceptor.WithRPCInterceptor(ctx, stmtCtx.KvExecCounter.RPCInterceptor())
}
return ctx
}
2 changes: 1 addition & 1 deletion distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestAnalyze(t *testing.T) {
Build()
require.NoError(t, err)

response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx.MemTracker)
response, err := Analyze(context.TODO(), sctx.GetClient(), request, tikvstore.DefaultVars, true, sctx.GetSessionVars().StmtCtx)
require.NoError(t, err)

result, ok := response.(*selectResult)
Expand Down
31 changes: 31 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
ctx = opentracing.ContextWithSpan(ctx, span1)
}
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
startTs := uint64(math.MaxUint64)
err := a.Ctx.InitTxnWithStartTS(startTs)
if err != nil {
Expand Down Expand Up @@ -383,6 +384,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}
// ExecuteExec will rewrite `a.Plan`, so set plan label should be executed after `a.buildExecutor`.
ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()

if err = e.Open(ctx); err != nil {
terror.Call(e.Close)
Expand Down Expand Up @@ -896,6 +898,7 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo
// `LowSlowQuery` and `SummaryStmt` must be called before recording `PrevStmt`.
a.LogSlowQuery(txnTS, succ, hasMoreResults)
a.SummaryStmt(succ)
a.observeStmtFinishedForTopSQL()
if sessVars.StmtCtx.IsTiFlash.Load() {
if succ {
totalTiFlashQuerySuccCounter.Inc()
Expand Down Expand Up @@ -1247,3 +1250,31 @@ func (a *ExecStmt) GetTextToLog() string {
}
return sql
}

func (a *ExecStmt) observeStmtBeginForTopSQL() {
if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil {
sqlDigest, planDigest := a.getSQLPlanDigest()
vars.StmtStats.OnExecutionBegin(sqlDigest, planDigest)
// This is a special logic prepared for TiKV's SQLExecCount.
vars.StmtCtx.KvExecCounter = vars.StmtStats.CreateKvExecCounter(sqlDigest, planDigest)
}
}

func (a *ExecStmt) observeStmtFinishedForTopSQL() {
if vars := a.Ctx.GetSessionVars(); variable.TopSQLEnabled() && vars.StmtStats != nil {
sqlDigest, planDigest := a.getSQLPlanDigest()
vars.StmtStats.OnExecutionFinished(sqlDigest, planDigest)
}
}

func (a *ExecStmt) getSQLPlanDigest() ([]byte, []byte) {
var sqlDigest, planDigest []byte
vars := a.Ctx.GetSessionVars()
if _, d := vars.StmtCtx.SQLDigest(); d != nil {
sqlDigest = d.Bytes()
}
if _, d := vars.StmtCtx.GetPlanDigest(); d != nil {
planDigest = d.Bytes()
}
return sqlDigest, planDigest
}
6 changes: 4 additions & 2 deletions executor/analyze.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ func (e *AnalyzeIndexExec) fetchAnalyzeResult(ranges []*ranger.Range, isNullRang
return err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return err
}
Expand Down Expand Up @@ -763,7 +763,7 @@ func (e *AnalyzeColumnsExec) buildResp(ranges []*ranger.Range) (distsql.SelectRe
return nil, err
}
ctx := context.TODO()
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx.MemTracker)
result, err := distsql.Analyze(ctx, e.ctx.GetClient(), kvReq, e.ctx.GetSessionVars().KVVars, e.ctx.GetSessionVars().InRestrictedSQL, e.ctx.GetSessionVars().StmtCtx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1854,6 +1854,7 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) (keysSize int, err
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot)
for _, t := range e.scanTasks {
iter, err := snapshot.Iter(kv.Key(t.StartKey), kv.Key(t.EndKey))
if err != nil {
Expand All @@ -1875,6 +1876,7 @@ func (e *AnalyzeFastExec) handleSampTasks(workID int, step uint32, err *error) {
snapshot.SetOption(kv.IsolationLevel, kv.SI)
snapshot.SetOption(kv.Priority, kv.PriorityLow)
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), snapshot)
readReplicaType := e.ctx.GetSessionVars().GetReplicaRead()
if readReplicaType.IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, readReplicaType)
Expand Down
1 change: 1 addition & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
})
}
setResourceGroupTaggerForTxn(stmtCtx, snapshot)
setRPCInterceptorOfExecCounterForTxn(sessVars, snapshot)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
lock := e.tblInfo.Lock
Expand Down
2 changes: 1 addition & 1 deletion executor/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *ChecksumTableExec) checksumWorker(taskCh <-chan *checksumTask, resultCh
}

func (e *ChecksumTableExec) handleChecksumRequest(req *kv.Request) (resp *tipb.ChecksumResponse, err error) {
ctx := context.TODO()
ctx := distsql.WithSQLKvExecCounterInterceptor(context.TODO(), e.ctx.GetSessionVars().StmtCtx)
res, err := distsql.Checksum(ctx, e.ctx.GetClient(), req, e.ctx.GetSessionVars().KVVars)
if err != nil {
return nil, err
Expand Down
8 changes: 8 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1914,3 +1914,11 @@ func setResourceGroupTaggerForTxn(sc *stmtctx.StatementContext, snapshot kv.Snap
snapshot.SetOption(kv.ResourceGroupTagger, sc.GetResourceGroupTagger())
}
}

// setRPCInterceptorOfExecCounterForTxn binds an interceptor for client-go to count
// the number of SQL executions of each TiKV.
func setRPCInterceptorOfExecCounterForTxn(vars *variable.SessionVars, snapshot kv.Snapshot) {
if snapshot != nil && variable.TopSQLEnabled() && vars.StmtCtx.KvExecCounter != nil {
breezewish marked this conversation as resolved.
Show resolved Hide resolved
snapshot.SetOption(kv.RPCInterceptor, vars.StmtCtx.KvExecCounter.RPCInterceptor())
}
}
1 change: 1 addition & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
return err
}
setResourceGroupTaggerForTxn(sessVars.StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(sessVars, txn)
txnSize := txn.Size()
sessVars.StmtCtx.AddRecordRows(uint64(len(rows)))
// If you use the IGNORE keyword, duplicate-key error that occurs while executing the INSERT statement are ignored.
Expand Down
1 change: 1 addition & 0 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
}
})
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, e.snapshot)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), e.snapshot)
return nil
}

Expand Down
1 change: 1 addition & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
}
setResourceGroupTaggerForTxn(e.ctx.GetSessionVars().StmtCtx, txn)
setRPCInterceptorOfExecCounterForTxn(e.ctx.GetSessionVars(), txn)
prefetchStart := time.Now()
// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
Expand Down
4 changes: 4 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
txn, err := e.ctx.Txn(true)
if err == nil {
txn.SetOption(kv.ResourceGroupTagger, e.ctx.GetSessionVars().StmtCtx.GetResourceGroupTagger())
if e.ctx.GetSessionVars().StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
txn.SetOption(kv.RPCInterceptor, e.ctx.GetSessionVars().StmtCtx.KvExecCounter.RPCInterceptor())
}
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -712,8 +712,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfK
github.com/tidwall/gjson v1.3.5/go.mod h1:P256ACg0Mn+j1RXIDXoss50DeIABTYK1PULOJHhxOls=
github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50 h1:B+cAIm2P1/SNsVV1vL9/mRaGUVl/vdgV8MU03O0vY28=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211214093715-605f49d3ba50/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0 h1:38Jst/O36MKXAt7aD1Ipnx4nKwclG66ifkcmi4f0NZ4=
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211218050306-6165dbaa95d0/go.mod h1:wRuh+W35daKTiYBld0oBlT6PSkzEVr+pB/vChzJZk+8=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI=
Expand Down
4 changes: 3 additions & 1 deletion kv/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ const (
ResourceGroupTagger
// KVFilter indicates the filter to ignore key-values in the transaction's memory buffer.
KVFilter

// SnapInterceptor is used for setting the interceptor for snapshot
SnapInterceptor
// CommitTSUpperBoundChec is used by cached table
// The commitTS must be greater than all the write lock lease of the visited cached table.
CommitTSUpperBoundCheck
// RPCInterceptor is interceptor.RPCInterceptor on Transaction or Snapshot, used to decorate
// additional logic before and after the underlying client-go RPC request.
RPCInterceptor
)

// ReplicaReadType is the type of replica to read data from
Expand Down
7 changes: 7 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,10 @@ func (s *session) doCommit(ctx context.Context) error {
s.txn.SetOption(kv.EnableAsyncCommit, sessVars.EnableAsyncCommit)
s.txn.SetOption(kv.Enable1PC, sessVars.Enable1PC)
s.txn.SetOption(kv.ResourceGroupTagger, sessVars.StmtCtx.GetResourceGroupTagger())
if sessVars.StmtCtx.KvExecCounter != nil {
// Bind an interceptor for client-go to count the number of SQL executions of each TiKV.
s.txn.SetOption(kv.RPCInterceptor, sessVars.StmtCtx.KvExecCounter.RPCInterceptor())
}
// priority of the sysvar is lower than `start transaction with causal consistency only`
if val := s.txn.GetOption(kv.GuaranteeLinearizability); val == nil || val.(bool) {
// We needn't ask the TiKV client to guarantee linearizability for auto-commit transactions
Expand Down Expand Up @@ -2311,6 +2315,9 @@ func (s *session) Close() {
s.RollbackTxn(ctx)
if s.sessionVars != nil {
s.sessionVars.WithdrawAllPreparedStmt()
if s.sessionVars.StmtStats != nil {
s.sessionVars.StmtStats.SetFinished()
}
}
s.ClearDiskFullOpt()
}
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/topsql/stmtstats"
"github.com/pingcap/tidb/util/tracing"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -207,6 +208,12 @@ type StatementContext struct {

// WaitLockLeaseTime is the duration of cached table read lease expiration time.
WaitLockLeaseTime time.Duration

// KvExecCounter is created from SessionVars.StmtStats to count the number of SQL
// executions of the kv layer during the current execution of the statement.
// Its life cycle is limited to this execution, and a new KvExecCounter is
// always created during each statement execution.
KvExecCounter *stmtstats.KvExecCounter
}

// StmtHints are SessionVars related sql hints.
Expand Down
12 changes: 10 additions & 2 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"sync/atomic"
"time"

utilMath "github.com/pingcap/tidb/util/math"

"github.com/pingcap/errors"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
"github.com/pingcap/tidb/config"
Expand All @@ -48,10 +46,12 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
utilMath "github.com/pingcap/tidb/util/math"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tidb/util/topsql/stmtstats"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/twmb/murmur3"
Expand Down Expand Up @@ -969,6 +969,13 @@ type SessionVars struct {

// EnablePaging indicates whether enable paging in coprocessor requests.
EnablePaging bool

// StmtStats is used to count various indicators of each SQL in this session
// at each point in time. These data will be periodically taken away by the
// background goroutine. The background goroutine will continue to aggregate
// all the local data in each session, and finally report them to the remote
// regularly.
StmtStats *stmtstats.StatementStats
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down Expand Up @@ -1203,6 +1210,7 @@ func NewSessionVars() *SessionVars {
MPPStoreFailTTL: DefTiDBMPPStoreFailTTL,
EnablePlacementChecks: DefEnablePlacementCheck,
Rng: utilMath.NewWithTime(),
StmtStats: stmtstats.CreateStatementStats(),
}
vars.KVVars = tikvstore.NewVariables(&vars.Killed)
vars.Concurrency = Concurrency{
Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
derr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/driver/options"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/txnkv/txnutil"
)
Expand Down Expand Up @@ -120,6 +121,8 @@ func (s *tikvSnapshot) SetOption(opt int, val interface{}) {
s.KVSnapshot.SetReadReplicaScope(val.(string))
case kv.SnapInterceptor:
s.interceptor = val.(kv.SnapshotInterceptor)
case kv.RPCInterceptor:
s.KVSnapshot.SetRPCInterceptor(val.(interceptor.RPCInterceptor))
}
}

Expand Down
3 changes: 3 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/tikvrpc/interceptor"
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
)

Expand Down Expand Up @@ -232,6 +233,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn.snapshotInterceptor = val.(kv.SnapshotInterceptor)
case kv.CommitTSUpperBoundCheck:
txn.KVTxn.SetCommitTSUpperBoundCheck(val.(func(commitTS uint64) bool))
case kv.RPCInterceptor:
txn.KVTxn.SetRPCInterceptor(val.(interceptor.RPCInterceptor))
}
}

Expand Down
Loading