Skip to content

Commit

Permalink
session,executor: run statement without transaction (#8260)
Browse files Browse the repository at this point in the history
`set @@autocommit = 0`
`select 1` // This statement should not make the session enter a transaction,
because it never need Txn().
Make txn lazy initialize
  • Loading branch information
tiancaiamao authored Nov 14, 2018
1 parent 73a3497 commit 0c3e9c1
Show file tree
Hide file tree
Showing 15 changed files with 156 additions and 45 deletions.
16 changes: 9 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,14 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
return a.handleNoDelayExecutor(ctx, sctx, e)
}

var txnStartTS uint64
if sctx.Txn(false).Valid() {
txnStartTS = sctx.Txn().StartTS()
}
return &recordSet{
executor: e,
stmt: a,
txnStartTS: sctx.Txn().StartTS(),
txnStartTS: txnStartTS,
}, nil
}

Expand All @@ -263,7 +267,8 @@ func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Co
defer func() {
terror.Log(errors.Trace(e.Close()))
txnTS := uint64(0)
if sctx.Txn() != nil {
// Don't active pending txn here.
if sctx.Txn(false).Valid() {
txnTS = sctx.Txn().StartTS()
}
a.LogSlowQuery(txnTS, err == nil)
Expand All @@ -287,9 +292,6 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
if isPointGet {
log.Debugf("con:%d InitTxnWithStartTS %s", ctx.GetSessionVars().ConnectionID, a.Text)
err = ctx.InitTxnWithStartTS(math.MaxUint64)
} else {
log.Debugf("con:%d ActivePendingTxn %s", ctx.GetSessionVars().ConnectionID, a.Text)
err = ctx.ActivePendingTxn()
}
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -405,7 +407,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {

// IsPointGetWithPKOrUniqueKeyByAutoCommit returns true when meets following conditions:
// 1. ctx is auto commit tagged
// 2. txn is nil
// 2. txn is not valid
// 2. plan is point get by pk or unique key
func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannercore.Plan) bool {
// check auto commit
Expand All @@ -414,7 +416,7 @@ func IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx sessionctx.Context, p plannerco
}

// check txn
if ctx.Txn() != nil {
if ctx.Txn(false).Valid() {
return false
}

Expand Down
9 changes: 6 additions & 3 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -1173,8 +1174,6 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
baseExecutor: base,
numDualRows: v.RowCount,
}
// Init the startTS for later use.
b.getStartTS()
return e
}

Expand All @@ -1185,10 +1184,14 @@ func (b *executorBuilder) getStartTS() uint64 {
}

startTS := b.ctx.GetSessionVars().SnapshotTS
if startTS == 0 && b.ctx.Txn() != nil {
if startTS == 0 && b.ctx.Txn().Valid() {
startTS = b.ctx.Txn().StartTS()
}
b.startTS = startTS
if b.startTS == 0 {
// The the code should never run here if there is no bug.
log.Error(errors.ErrorStack(errors.Trace(ErrGetStartTS)))
}
return startTS
}

Expand Down
2 changes: 2 additions & 0 deletions executor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
codeResultIsEmpty
codeErrBuildExec
codeBatchInsertFail
codeGetStartTS
)

// Error instances.
var (
ErrGetStartTS = terror.ClassExecutor.New(codeGetStartTS, "Can not get start ts")
ErrUnknownPlan = terror.ClassExecutor.New(codeUnknownPlan, "Unknown plan")
ErrPrepareMulti = terror.ClassExecutor.New(codePrepareMulti, "Can not prepare multiple statements")
ErrPrepareDDL = terror.ClassExecutor.New(codePrepareDDL, "Can not prepare DDL statements")
Expand Down
4 changes: 0 additions & 4 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,10 +735,6 @@ func init() {
// but the plan package cannot import the executor package because of the dependency cycle.
// So we assign a function implemented in the executor package to the plan package to avoid the dependency cycle.
plannercore.EvalSubquery = func(p plannercore.PhysicalPlan, is infoschema.InfoSchema, sctx sessionctx.Context) (rows [][]types.Datum, err error) {
err = sctx.ActivePendingTxn()
if err != nil {
return rows, errors.Trace(err)
}
e := &executorBuilder{is: is, ctx: sctx}
exec := e.build(p)
if e.err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2147,7 +2147,7 @@ func (s *testSuite) TestSelectForUpdate(c *C) {

tk.MustExec("drop table if exists t, t1")

c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")
tk.MustExec("insert t values (12, 2, 3)")
Expand Down
19 changes: 19 additions & 0 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,25 @@ type innerWorker struct {

// Open implements the Executor interface.
func (e *IndexLookUpJoin) Open(ctx context.Context) error {
// Be careful, very dirty hack in this line!!!
// IndexLookUpJoin need to rebuild executor (the dataReaderBuilder) during
// executing. However `executor.Next()` is lazy evaluation when the RecordSet
// result is drained.
// Lazy evaluation means the saved session context may change during executor's
// building and its running.
// A specific sequence for example:
//
// e := buildExecutor() // txn at build time
// recordSet := runStmt(e)
// session.CommitTxn() // txn closed
// recordSet.Next()
// e.dataReaderBuilder.Build() // txn is used again, which is already closed
//
// The trick here is `getStartTS` will cache start ts in the dataReaderBuilder,
// so even txn is destroyed later, the dataReaderBuilder could still use the
// cached start ts to construct DAG.
e.innerCtx.readerBuilder.getStartTS()

err := e.children[0].Open(ctx)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error {
// the transaction with COMMIT or ROLLBACK. The autocommit mode then
// reverts to its previous state.
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true)
// Call ctx.Txn() to active pending txn.
e.ctx.Txn()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1907,7 +1907,7 @@ func (b *PlanBuilder) buildDataSource(tn *ast.TableName) (LogicalPlan, error) {
// If this SQL is executed in a non-readonly transaction, we need a
// "UnionScan" operator to read the modifications of former SQLs, which is
// buffered in tidb-server memory.
if b.ctx.Txn() != nil && !b.ctx.Txn().IsReadOnly() {
if b.ctx.Txn(false).Valid() && !b.ctx.Txn(false).IsReadOnly() {
us := LogicalUnionScan{}.Init(b.ctx)
us.SetChildren(ds)
result = us
Expand Down
20 changes: 14 additions & 6 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,9 +939,20 @@ func (s *session) DropPreparedStmt(stmtID uint32) error {
return nil
}

func (s *session) Txn() kv.Transaction {
if !s.txn.Valid() {
return nil
func (s *session) Txn(opt ...bool) kv.Transaction {
if s.txn.pending() && len(opt) == 0 {
// Transaction is lazy intialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
txnCap := s.getMembufCap()
if err := s.txn.changePendingToValid(txnCap); err != nil {
s.txn.fail = errors.Trace(err)
} else {
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
}
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}
return &s.txn
}
Expand Down Expand Up @@ -1359,9 +1370,6 @@ func (s *session) PrepareTxnCtx(ctx context.Context) {
SchemaVersion: is.SchemaMetaVersion(),
CreateTime: time.Now(),
}
if !s.sessionVars.IsAutocommit() {
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true)
}
}

// RefreshTxnCtx implements context.RefreshTxnCtx interface.
Expand Down
73 changes: 64 additions & 9 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *testSessionSuite) TestRowLock(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("drop table if exists t")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")
tk.MustExec("insert t values (12, 2, 3)")
Expand Down Expand Up @@ -301,6 +301,61 @@ func (s *testSessionSuite) TestAutocommit(c *C) {
c.Assert(int(tk.Se.Status()&mysql.ServerStatusAutocommit), Equals, 0)
tk.MustExec("set autocommit='On'")
c.Assert(int(tk.Se.Status()&mysql.ServerStatusAutocommit), Greater, 0)

// When autocommit is 0, transaction start ts should be the first *valid*
// statement, rather than *any* statement.
tk.MustExec("create table t (id int)")
tk.MustExec("set @@autocommit = 0")
tk.MustExec("rollback")
tk.MustExec("set @@autocommit = 0")
tk1 := testkit.NewTestKitWithInit(c, s.store)
tk1.MustExec("insert into t select 1")
tk.MustQuery("select * from t").Check(testkit.Rows("1"))

// TODO: MySQL compatibility for setting global variable.
// tk.MustExec("begin")
// tk.MustExec("insert into t values (42)")
// tk.MustExec("set @@global.autocommit = 1")
// tk.MustExec("rollback")
// tk.MustQuery("select count(*) from t where id = 42").Check(testkit.Rows("0"))
// Even the transaction is rollbacked, the set statement succeed.
// tk.MustQuery("select @@global.autocommit").Rows("1")
}

// TestTxnLazyInitialize tests that when autocommit = 0, not all statement starts
// a new transaction.
func (s *testSessionSuite) TestTxnLazyInitialize(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

tk.MustExec("set @@autocommit = 0")
c.Assert(tk.Se.Txn(false).Valid(), IsFalse)
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))

// Those statement should not start a new transaction automacally.
tk.MustQuery("select 1")
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))

tk.MustExec("set @@tidb_general_log = 0")
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))

tk.MustQuery("explain select * from t")
tk.MustQuery("select @@tidb_current_ts").Check(testkit.Rows("0"))

// Begin statement should start a new transaction.
tk.MustExec("begin")
c.Assert(tk.Se.Txn(false).Valid(), IsTrue)
tk.MustExec("rollback")

tk.MustExec("select * from t")
c.Assert(tk.Se.Txn(false).Valid(), IsTrue)
tk.MustExec("rollback")

tk.MustExec("insert into t values (1)")
c.Assert(tk.Se.Txn(false).Valid(), IsTrue)
tk.MustExec("rollback")
}

func (s *testSessionSuite) TestGlobalVarAccessor(c *C) {
Expand Down Expand Up @@ -428,7 +483,7 @@ func (s *testSessionSuite) TestRetryCleanTxn(c *C) {
history.Add(0, stmt, tk.Se.GetSessionVars().StmtCtx)
_, err = tk.Exec("commit")
c.Assert(err, NotNil)
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
c.Assert(tk.Se.GetSessionVars().InTxn(), IsFalse)
}

Expand Down Expand Up @@ -507,11 +562,11 @@ func (s *testSessionSuite) TestInTrans(c *C) {
tk.MustExec("insert t values ()")
c.Assert(tk.Se.Txn().Valid(), IsTrue)
tk.MustExec("drop table if exists t;")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("create table t (id BIGINT PRIMARY KEY AUTO_INCREMENT NOT NULL)")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("insert t values ()")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("commit")
tk.MustExec("insert t values ()")

Expand All @@ -521,11 +576,11 @@ func (s *testSessionSuite) TestInTrans(c *C) {
tk.MustExec("insert t values ()")
c.Assert(tk.Se.Txn().Valid(), IsTrue)
tk.MustExec("commit")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("insert t values ()")
c.Assert(tk.Se.Txn().Valid(), IsTrue)
tk.MustExec("commit")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)

tk.MustExec("set autocommit=1")
tk.MustExec("drop table if exists t")
Expand All @@ -535,7 +590,7 @@ func (s *testSessionSuite) TestInTrans(c *C) {
tk.MustExec("insert t values ()")
c.Assert(tk.Se.Txn().Valid(), IsTrue)
tk.MustExec("rollback")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
}

func (s *testSessionSuite) TestRetryPreparedStmt(c *C) {
Expand All @@ -544,7 +599,7 @@ func (s *testSessionSuite) TestRetryPreparedStmt(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("drop table if exists t")
c.Assert(tk.Se.Txn(), IsNil)
c.Assert(tk.Se.Txn().Valid(), IsFalse)
tk.MustExec("create table t (c1 int, c2 int, c3 int)")
tk.MustExec("insert t values (11, 2, 3)")

Expand Down
13 changes: 12 additions & 1 deletion session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
if err == nil {
GetHistory(sctx).Add(0, s, se.sessionVars.StmtCtx)
}
if sctx.Txn() != nil {
if sctx.Txn(false).Valid() {
if err != nil {
sctx.StmtRollback()
} else {
Expand All @@ -178,6 +178,17 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)
history.Count(), sctx.GetSessionVars().IsAutocommit())
}
}
if se.txn.pending() {
// After run statement finish, txn state is still pending means the
// statement never need a Txn(), such as:
//
// set @@tidb_general_log = 1
// set @@autocommit = 0
// select 1
//
// Reset txn state to invalid to dispose the pending start ts.
se.txn.changeToInvalid()
}
return rs, errors.Trace(err)
}

Expand Down
6 changes: 5 additions & 1 deletion session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,15 @@ func (st *TxnState) init() {
st.mutations = make(map[int64]*binlog.TableMutation)
}

// Valid overrides Transaction interface.
// Valid implements the kv.Transaction interface.
func (st *TxnState) Valid() bool {
return st.Transaction != nil && st.Transaction.Valid()
}

func (st *TxnState) pending() bool {
return st.Transaction == nil && st.txnFuture != nil
}

func (st *TxnState) validOrPending() bool {
return st.txnFuture != nil || st.Valid()
}
Expand Down
3 changes: 2 additions & 1 deletion sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ type Context interface {
NewTxn() error

// Txn returns the current transaction which is created before executing a statement.
Txn() kv.Transaction
// The returned kv.Transaction is not nil, but maybe pending or invalid.
Txn(...bool) kv.Transaction

// GetClient gets a kv.Client.
GetClient() kv.Client
Expand Down
2 changes: 1 addition & 1 deletion table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ func (ctx *ctxForPartitionExpr) NewTxn() error {
}

// Txn returns the current transaction which is created before executing a statement.
func (ctx *ctxForPartitionExpr) Txn() kv.Transaction {
func (ctx *ctxForPartitionExpr) Txn(...bool) kv.Transaction {
panic("not support")
}

Expand Down
Loading

0 comments on commit 0c3e9c1

Please sign in to comment.