Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Move stale read context to TxnManager #33812

Merged
merged 18 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 20 additions & 17 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -195,11 +196,6 @@ type TelemetryInfo struct {
type ExecStmt struct {
// GoCtx stores parent go context.Context for a stmt.
GoCtx context.Context
// SnapshotTS stores the timestamp for stale read.
// It is not equivalent to session variables's snapshot ts, it only use to build the executor.
SnapshotTS uint64
// IsStaleness means whether this statement use stale read.
IsStaleness bool
// ReplicaReadScope indicates the scope the store selector scope the request visited
ReplicaReadScope string
// InfoSchema stores a reference to the schema information.
Expand Down Expand Up @@ -234,6 +230,14 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
defer span1.Finish()
ctx = opentracing.ContextWithSpan(ctx, span1)
}

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
// stale read should not reach here
staleread.AssertStmtStaleness(a.Ctx, false)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

ctx = a.setPlanLabelForTopSQL(ctx)
a.observeStmtBeginForTopSQL()
startTs := uint64(math.MaxUint64)
Expand All @@ -257,7 +261,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.ReplicaReadScope)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand All @@ -266,11 +270,6 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
Expand Down Expand Up @@ -303,7 +302,7 @@ func (a *ExecStmt) IsReadOnly(vars *variable.SessionVars) bool {
// It returns the current information schema version that 'a' is using.
func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
ret := &plannercore.PreprocessorReturn{}
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.InitTxnContextProvider, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

Expand All @@ -314,11 +313,10 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
staleread.AssertStmtStaleness(a.Ctx, ret.IsStaleness)
})

a.InfoSchema = sessiontxn.GetTxnManager(a.Ctx).GetTxnInfoSchema()
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
a.ReplicaReadScope = ret.ReadReplicaScope
if a.Ctx.GetSessionVars().GetReplicaRead().IsClosestRead() && a.ReplicaReadScope == kv.GlobalReplicaScope {
logutil.BgLogger().Warn(fmt.Sprintf("tidb can't read closest replicas due to it haven't %s label", placement.DCLabelKey))
Expand Down Expand Up @@ -369,8 +367,13 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
}()

failpoint.Inject("assertStaleTSO", func(val failpoint.Value) {
if n, ok := val.(int); ok && a.IsStaleness {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n, ok := val.(int); ok && staleread.IsStmtStaleness(a.Ctx) {
txnManager := sessiontxn.GetTxnManager(a.Ctx)
ts, err := txnManager.GetReadTS()
if err != nil {
panic(err)
}
startTS := oracle.ExtractPhysical(ts) / 1000
if n != int(startTS) {
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
Expand Down Expand Up @@ -863,7 +866,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.IsStaleness, a.ReplicaReadScope)
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.ReplicaReadScope)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
10 changes: 5 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -344,7 +344,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
return b.build(plan)
}

Expand Down Expand Up @@ -577,7 +577,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
keyOff2IdxOff[i] = i
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
outerCompareFuncs = append(outerCompareFuncs, expression.GetCmpFunction(nil, outerJoinKeys[i], outerJoinKeys[i]))
}

readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, 0, false, oracle.GlobalTxnScope).
readerBuilder, err := newExecutorBuilder(tc.ctx, nil, nil, oracle.GlobalTxnScope).
newDataReaderBuilder(&mockPhysicalIndexReader{e: innerDS})
if err != nil {
return nil, err
Expand Down
63 changes: 46 additions & 17 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -117,16 +119,27 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *executorBuilder {
return &executorBuilder{
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder {
b := &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
snapshotTSCached: isStaleness,
snapshotTS: snapshotTS,
isStaleness: isStaleness,
isStaleness: staleread.IsStmtStaleness(ctx),
readReplicaScope: replicaReadScope,
}

txnManager := sessiontxn.GetTxnManager(ctx)
if provider, ok := txnManager.GetContextProvider().(*sessiontxn.SimpleTxnContextProvider); ok {
provider.GetReadTSFunc = b.getReadTS
provider.GetForUpdateTSFunc = func() (uint64, error) {
if b.forUpdateTS != 0 {
return b.forUpdateTS, nil
}
return b.getReadTS()
}
}

return b
}

// MockPhysicalPlan is used to return a specified executor in when build.
Expand All @@ -143,9 +156,9 @@ type MockExecutorBuilder struct {
}

// NewMockExecutorBuilderForTest is ONLY used in test.
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, isStaleness bool, replicaReadScope string) *MockExecutorBuilder {
func NewMockExecutorBuilderForTest(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *MockExecutorBuilder {
return &MockExecutorBuilder{
executorBuilder: newExecutorBuilder(ctx, is, ti, snapshotTS, isStaleness, replicaReadScope)}
executorBuilder: newExecutorBuilder(ctx, is, ti, replicaReadScope)}
}

// Build builds an executor tree according to `p`.
Expand Down Expand Up @@ -733,10 +746,6 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
failpoint.Inject("assertStaleReadValuesSameWithExecuteAndBuilder", func() {
// This fail point is used to assert the behavior after refactoring is exactly the same with the previous implement.
// Some variables in `plannercore.Execute` is deprecated and only be used for asserting now.
if b.snapshotTS != v.SnapshotTS {
panic(fmt.Sprintf("%d != %d", b.snapshotTS, v.SnapshotTS))
}

if b.isStaleness != v.IsStaleness {
panic(fmt.Sprintf("%v != %v", b.isStaleness, v.IsStaleness))
}
Expand All @@ -746,21 +755,36 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
}

if v.SnapshotTS != 0 {
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(v.SnapshotTS)
if err != nil {
panic(err)
}

if b.is.SchemaMetaVersion() != is.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", b.is.SchemaMetaVersion(), is.SchemaMetaVersion()))
}

ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS()
if err != nil {
panic(e)
}

if v.SnapshotTS != ts {
panic(fmt.Sprintf("%d != %d", ts, v.SnapshotTS))
}
}
})

failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
if strconv.FormatUint(b.snapshotTS, 10) != assertTS ||
staleread.AssertStmtStaleness(b.ctx, true)
ts, err := sessiontxn.GetTxnManager(b.ctx).GetReadTS()
if err != nil {
panic(e)
}

if strconv.FormatUint(ts, 10) != assertTS ||
assertTxnScope != b.readReplicaScope {
panic("execute prepare statement have wrong staleness option")
}
Expand Down Expand Up @@ -1541,11 +1565,11 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return b.dataReaderTS, nil
}

if (b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt) && b.forUpdateTS != 0 {
return b.forUpdateTS, nil
txnManager := sessiontxn.GetTxnManager(b.ctx)
if b.inInsertStmt || b.inUpdateStmt || b.inDeleteStmt || b.inSelectLockStmt {
return txnManager.GetForUpdateTS()
}

return b.getReadTS()
return txnManager.GetReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
Expand All @@ -1557,6 +1581,11 @@ func (b *executorBuilder) getReadTS() (uint64, error) {
// logics. However for `IndexLookUpMergeJoin` and `IndexLookUpHashJoin`, it requires caching the
// snapshotTS and and may even use it after the txn being destroyed. In this case, mark
// `snapshotTSCached` to skip `refreshForUpdateTSForRC`.
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
})

if b.snapshotTSCached {
return b.snapshotTS, nil
}
Expand Down
14 changes: 6 additions & 8 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package executor

import (
"context"
"fmt"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/failpoint"
Expand All @@ -28,6 +27,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/staleread"
)

var (
Expand Down Expand Up @@ -71,6 +71,10 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema)
if ret.LastSnapshotTS != 0 {
staleread.AssertStmtStaleness(c.Ctx, true)
sessiontxn.AssertTxnManagerReadTS(c.Ctx, ret.LastSnapshotTS)
}
})

is := sessiontxn.GetTxnManager(c.Ctx).GetTxnInfoSchema()
Expand All @@ -80,11 +84,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}

failpoint.Inject("assertStmtCtxIsStaleness", func(val failpoint.Value) {
expected := val.(bool)
got := c.Ctx.GetSessionVars().StmtCtx.IsStaleness
if got != expected {
panic(fmt.Sprintf("stmtctx isStaleness wrong, expected:%v, got:%v", expected, got))
}
staleread.AssertStmtStaleness(c.Ctx, val.(bool))
})

CountStmtNode(stmtNode, c.Ctx.GetSessionVars().InRestrictedSQL)
Expand All @@ -94,8 +94,6 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.LastSnapshotTS,
IsStaleness: ret.IsStaleness,
ReplicaReadScope: ret.ReadReplicaScope,
InfoSchema: is,
Plan: finalPlan,
Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(h.sctx, is, nil, oracle.GlobalTxnScope)
return b.build(plan), nil
}

Expand Down
3 changes: 2 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import (
topsqlstate "github.com/pingcap/tidb/util/topsql/state"
tikverr "github.com/tikv/client-go/v2/error"
tikvstore "github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
tikvutil "github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -1263,7 +1264,7 @@ func init() {
ctx = opentracing.ContextWithSpan(ctx, span1)
}

e := &executorBuilder{is: is, ctx: sctx}
e := newExecutorBuilder(sctx, is, nil, oracle.GlobalTxnScope)
exec := e.build(p)
if e.err != nil {
return nil, e.err
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i]))
}

b := newExecutorBuilder(ctx, nil, nil, 0, false, oracle.GlobalTxnScope)
b := newExecutorBuilder(ctx, nil, nil, oracle.GlobalTxnScope)
return b.build(j)
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3467,7 +3467,7 @@ func TestUnreasonablyClose(t *testing.T) {
&plannercore.PhysicalShuffle{},
&plannercore.PhysicalUnionAll{},
}
executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, math.MaxUint64, false, "global")
executorBuilder := executor.NewMockExecutorBuilderForTest(tk.Session(), is, nil, oracle.GlobalTxnScope)

opsNeedsCoveredMask := uint64(1<<len(opsNeedsCovered) - 1)
opsAlreadyCoveredMask := uint64(0)
Expand Down
Loading