Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support AS OF TIMESTAMP read-only begin statement #24740

Merged
merged 11 commits into from
May 20, 2021
9 changes: 5 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,10 +706,11 @@ func (b *executorBuilder) buildSimple(v *plannercore.Simple) Executor {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
base.initCap = chunk.ZeroCapacity
e := &SimpleExec{
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
baseExecutor: base,
Statement: v.Statement,
IsFromRemote: v.IsFromRemote,
is: b.is,
StalenessTxnOption: v.StalenessTxnOption,
}
return e
}
Expand Down
32 changes: 26 additions & 6 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ type SimpleExec struct {
IsFromRemote bool
done bool
is infoschema.InfoSchema

// StalenessTxnOption is used to execute the staleness txn during a read-only begin statement.
StalenessTxnOption *sessionctx.StalenessTxnOption
}

func (e *baseExecutor) getSysSession() (sessionctx.Context, error) {
Expand Down Expand Up @@ -566,13 +569,16 @@ func (e *SimpleExec) executeUse(s *ast.UseStmt) error {
}

func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
// If `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND` is the first statement in TxnCtx, we should
// If `START TRANSACTION READ ONLY` is the first statement in TxnCtx, we should
// always create a new Txn instead of reusing it.
if s.ReadOnly {
enableNoopFuncs := e.ctx.GetSessionVars().EnableNoopFuncs
if !enableNoopFuncs && s.Bound == nil {
if !enableNoopFuncs && s.AsOf == nil && s.Bound == nil {
return expression.ErrFunctionsNoopImpl.GenWithStackByArgs("READ ONLY")
}
if s.AsOf != nil {
return e.executeStartTransactionReadOnlyWithBoundedStaleness(ctx, s)
}
if s.Bound != nil {
return e.executeStartTransactionReadOnlyWithTimestampBound(ctx, s)
}
Comment on lines 582 to 584
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code cleaning work will be in an independent PR later.

Expand Down Expand Up @@ -614,6 +620,22 @@ func (e *SimpleExec) executeBegin(ctx context.Context, s *ast.BeginStmt) error {
return nil
}

func (e *SimpleExec) executeStartTransactionReadOnlyWithBoundedStaleness(ctx context.Context, s *ast.BeginStmt) error {
if e.StalenessTxnOption == nil {
return errors.New("Failed to get timestamp during start transaction read only as of timestamp")
}
if err := e.ctx.NewTxnWithStalenessOption(ctx, *e.StalenessTxnOption); err != nil {
return err
}

// With START TRANSACTION, autocommit remains disabled until you end
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetInTxn(true)
return nil
}

// TODO: deprecate this syntax and only keep `AS OF TIMESTAMP` statement.
func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx context.Context, s *ast.BeginStmt) error {
opt := sessionctx.StalenessTxnOption{}
opt.Mode = s.Bound.Mode
Expand All @@ -632,8 +654,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
case ast.TimestampBoundExactStaleness:
// TODO: support funcCallExpr in future
v, ok := s.Bound.Timestamp.(*driver.ValueExpr)
Expand Down Expand Up @@ -668,8 +689,7 @@ func (e *SimpleExec) executeStartTransactionReadOnlyWithTimestampBound(ctx conte
if err != nil {
return err
}
startTS := oracle.GoTimeToTS(gt)
opt.StartTS = startTS
opt.StartTS = oracle.GoTimeToTS(gt)
}
err := e.ctx.NewTxnWithStalenessOption(ctx, opt)
if err != nil {
Expand Down
118 changes: 89 additions & 29 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,30 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
zone: "sz",
},
{
name: "begin",
name: "begin after TimestampBoundReadTimestamp",
preSQL: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: kv.GlobalTxnScope,
zone: "",
},
{
name: "AsOfTimestamp",
preSQL: "begin",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
IsStaleness: true,
expectPhysicalTS: 1599321600000,
txnScope: "local",
zone: "sh",
},
{
name: "begin after AsOfTimestamp",
preSQL: `START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`,
sql: "begin",
IsStaleness: false,
txnScope: oracle.GlobalTxnScope,
zone: "",
},
}
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand All @@ -106,8 +123,8 @@ func (s *testStaleTxnSerialSuite) TestExactStalenessTransaction(c *C) {
}
c.Assert(tk.Se.GetSessionVars().TxnCtx.IsStaleness, Equals, testcase.IsStaleness)
tk.MustExec("commit")
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
}

func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
Expand Down Expand Up @@ -147,13 +164,17 @@ func (s *testStaleTxnSerialSuite) TestStaleReadKVRequest(c *C) {
failpoint.Enable("github.com/pingcap/tidb/config/injectTxnScope", fmt.Sprintf(`return("%v")`, testcase.zone))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStoreLabels", fmt.Sprintf(`return("%v_%v")`, placement.DCLabelKey, testcase.txnScope))
failpoint.Enable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag", `return(true)`)
// Using NOW() will cause the loss of fsp precision, so we use NOW(3) to be accurate to the millisecond.
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW(3);`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND EXACT STALENESS '00:00:00';`)
tk.MustQuery(testcase.sql)
tk.MustExec(`commit`)
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag")
}
failpoint.Disable("github.com/pingcap/tidb/config/injectTxnScope")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStoreLabels")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/assertStaleReadFlag")
}

func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) {
Expand All @@ -169,6 +190,17 @@ func (s *testStaleTxnSerialSuite) TestStalenessAndHistoryRead(c *C) {
tk.MustExec(updateSafePoint)
// set @@tidb_snapshot before staleness txn
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
// 1599321600000 == 2020-09-06 00:00:00
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
tk.MustExec("commit")
// set @@tidb_snapshot during staleness txn
tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP '2020-09-06 00:00:00';`)
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
// set @@tidb_snapshot before staleness txn
tk.MustExec(`set @@tidb_snapshot="2016-10-08 16:45:26";`)
tk.MustExec(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND READ TIMESTAMP '2020-09-06 00:00:00';`)
c.Assert(oracle.ExtractPhysical(tk.Se.GetSessionVars().TxnCtx.StartTS), Equals, int64(1599321600000))
tk.MustExec("commit")
Expand All @@ -190,60 +222,76 @@ func (s *testStaleTxnSerialSuite) TestTimeBoundedStalenessTxn(c *C) {
name string
sql string
injectSafeTS uint64
useSafeTS bool
// compareWithSafeTS will be 0 if StartTS==SafeTS, -1 if StartTS < SafeTS, and +1 if StartTS > SafeTS.
compareWithSafeTS int
}{
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:20'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MAX STALENESS '00:00:10'`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "max 20 seconds ago, safeTS 10 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-20*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-10 * time.Second))
}(),
useSafeTS: true,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "max 10 seconds ago, safeTS 20 secs ago",
sql: func() string {
return fmt.Sprintf(`START TRANSACTION READ ONLY WITH TIMESTAMP BOUND MIN READ TIMESTAMP '%v'`,
time.Now().Add(-10*time.Second).Format("2006-01-02 15:04:05"))
}(),
injectSafeTS: func() uint64 {
return oracle.GoTimeToTS(time.Now().Add(-20 * time.Second))
}(),
useSafeTS: false,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "20 seconds ago to now, safeTS 10 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW())`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-10 * time.Second)),
compareWithSafeTS: 0,
},
{
name: "10 seconds ago to now, safeTS 20 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 10 SECOND, NOW())`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-20 * time.Second)),
compareWithSafeTS: 1,
},
{
name: "20 seconds ago to 10 seconds ago, safeTS 5 secs ago",
sql: `START TRANSACTION READ ONLY AS OF TIMESTAMP tidb_bounded_staleness(NOW() - INTERVAL 20 SECOND, NOW() - INTERVAL 10 SECOND)`,
injectSafeTS: oracle.GoTimeToTS(time.Now().Add(-5 * time.Second)),
compareWithSafeTS: -1,
},
}
for _, testcase := range testcases {
c.Log(testcase.name)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", testcase.injectSafeTS)), IsNil)
tk.MustExec(testcase.sql)
if testcase.useSafeTS {
if testcase.compareWithSafeTS == 1 {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS)
} else if testcase.compareWithSafeTS == 0 {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Equals, testcase.injectSafeTS)
} else {
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Greater, testcase.injectSafeTS)
c.Assert(tk.Se.GetSessionVars().TxnCtx.StartTS, Less, testcase.injectSafeTS)
}
tk.MustExec("commit")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS")
}
failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")
failpoint.Disable("github.com/pingcap/tidb/store/tikv/injectSafeTS")
}

func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
Expand All @@ -263,4 +311,16 @@ func (s *testStaleTxnSerialSuite) TestStalenessTransactionSchemaVer(c *C) {
schemaVer3 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// got an old infoSchema
c.Assert(schemaVer3, Equals, schemaVer1)

schemaVer4 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
time.Sleep(time.Second)
tk.MustExec("create table t (id int primary key);")
schemaVer5 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// confirm schema changed
c.Assert(schemaVer4, Less, schemaVer5)

tk.MustExec(`START TRANSACTION READ ONLY AS OF TIMESTAMP NOW() - INTERVAL 1 SECOND`)
schemaVer6 := tk.Se.GetSessionVars().GetInfoSchema().SchemaMetaVersion()
// got an old infoSchema
c.Assert(schemaVer6, Equals, schemaVer4)
}
4 changes: 2 additions & 2 deletions expression/builtin_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2928,7 +2928,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) {

// Test whether it's deterministic.
safeTime1 := t2.Add(-1 * time.Second)
safeTS1 := oracle.ComposeTS(safeTime1.Unix()*1000, 0)
safeTS1 := oracle.GoTimeToTS(safeTime1)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS1)), IsNil)
f, err := fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
Expand All @@ -2941,7 +2941,7 @@ func (s *testEvaluatorSuite) TestTiDBBoundedStaleness(c *C) {
c.Assert(resultTime, Equals, safeTime1.Format(types.TimeFormat))
// SafeTS updated.
safeTime2 := t2.Add(1 * time.Second)
safeTS2 := oracle.ComposeTS(safeTime2.Unix()*1000, 0)
safeTS2 := oracle.GoTimeToTS(safeTime2)
c.Assert(failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS",
fmt.Sprintf("return(%v)", safeTS2)), IsNil)
f, err = fc.getFunction(s.ctx, s.datumsToConstants([]types.Datum{types.NewDatum(t1Str), types.NewDatum(t2Str)}))
Expand Down
3 changes: 3 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,9 @@ type Simple struct {
// and executing in co-processor.
// Used for `global kill`. See https://github.com/pingcap/tidb/blob/master/docs/design/2020-06-01-global-kill.md.
IsFromRemote bool

// StalenessTxnOption is the transaction option that will be built when planner builder calls buildSimple.
StalenessTxnOption *sessionctx.StalenessTxnOption
}

// PhysicalSimpleWrapper is a wrapper of `Simple` to implement physical plan interface.
Expand Down
36 changes: 34 additions & 2 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
Expand Down Expand Up @@ -643,7 +644,7 @@ func (b *PlanBuilder) Build(ctx context.Context, node ast.Node) (Plan, error) {
*ast.GrantStmt, *ast.DropUserStmt, *ast.AlterUserStmt, *ast.RevokeStmt, *ast.KillStmt, *ast.DropStatsStmt,
*ast.GrantRoleStmt, *ast.RevokeRoleStmt, *ast.SetRoleStmt, *ast.SetDefaultRoleStmt, *ast.ShutdownStmt,
*ast.RenameUserStmt:
return b.buildSimple(node.(ast.StmtNode))
return b.buildSimple(ctx, node.(ast.StmtNode))
case ast.DDLNode:
return b.buildDDL(ctx, x)
case *ast.CreateBindingStmt:
Expand Down Expand Up @@ -2259,7 +2260,7 @@ func (b *PlanBuilder) buildShow(ctx context.Context, show *ast.ShowStmt) (Plan,
return np, nil
}

func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) {
func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (Plan, error) {
p := &Simple{Statement: node}

switch raw := node.(type) {
Expand Down Expand Up @@ -2325,10 +2326,41 @@ func (b *PlanBuilder) buildSimple(node ast.StmtNode) (Plan, error) {
}
case *ast.ShutdownStmt:
b.visitInfo = appendVisitInfo(b.visitInfo, mysql.ShutdownPriv, "", "", "", nil)
case *ast.BeginStmt:
if raw.AsOf != nil {
startTS, err := b.calculateTsExpr(raw.AsOf)
if err != nil {
return nil, err
}
p.StalenessTxnOption = &sessionctx.StalenessTxnOption{
Mode: ast.TimestampBoundReadTimestamp,
StartTS: startTS,
}
}
}
return p, nil
}

// calculateTsExpr calculates the TsExpr of AsOfClause to get a StartTS.
func (b *PlanBuilder) calculateTsExpr(asOfClause *ast.AsOfClause) (uint64, error) {
tsVal, err := evalAstExpr(b.ctx, asOfClause.TsExpr)
if err != nil {
return 0, err
}
toTypeTimestamp := types.NewFieldType(mysql.TypeTimestamp)
// We need at least the millionsecond here, so set fsp to 3.
toTypeTimestamp.Decimal = 3
tsTimestamp, err := tsVal.ConvertTo(b.ctx.GetSessionVars().StmtCtx, toTypeTimestamp)
if err != nil {
return 0, err
}
tsTime, err := tsTimestamp.GetMysqlTime().GoTime(b.ctx.GetSessionVars().TimeZone)
if err != nil {
return 0, err
}
return oracle.GoTimeToTS(tsTime), nil
}

func collectVisitInfoFromRevokeStmt(sctx sessionctx.Context, vi []visitInfo, stmt *ast.RevokeStmt) []visitInfo {
// To use REVOKE, you must have the GRANT OPTION privilege,
// and you must have the privileges that you are granting.
Expand Down
Loading