Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: move more session vars to stmt context for retrying #8034

Merged
merged 20 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func testCheckJobCancelled(c *C, d *ddl, job *model.Job, state *model.SchemaStat
t := meta.NewMeta(txn)
historyJob, err := t.GetHistoryDDLJob(job.ID)
c.Assert(err, IsNil)
c.Assert(historyJob.IsCancelled() || historyJob.IsRollbackDone(), IsTrue, Commentf("histroy job %s", historyJob))
c.Assert(historyJob.IsCancelled() || historyJob.IsRollbackDone(), IsTrue, Commentf("history job %s", historyJob))
if state != nil {
c.Assert(historyJob.SchemaState, Equals, *state)
}
Expand Down
9 changes: 7 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1257,10 +1257,15 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}
}
if vars.LastInsertID > 0 {
vars.PrevLastInsertID = vars.LastInsertID
sc.PrevLastInsertID = vars.LastInsertID
vars.LastInsertID = 0
}
vars.ResetPrevAffectedRows()
sc.PrevAffectedRows = 0
if vars.StmtCtx.InUpdateOrDeleteStmt || vars.StmtCtx.InInsertStmt {
sc.PrevAffectedRows = int64(vars.StmtCtx.AffectedRows())
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
err = vars.SetSystemVar("warning_count", fmt.Sprintf("%d", vars.StmtCtx.NumWarnings(false)))
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error {
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
txnCtx := e.ctx.GetSessionVars().TxnCtx
if txnCtx.Histroy != nil {
if txnCtx.History != nil {
err := e.ctx.NewTxn()
if err != nil {
return errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions expression/builtin_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (b *builtinLastInsertIDSig) Clone() builtinFunc {
// evalInt evals LAST_INSERT_ID().
// See https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_last-insert-id.
func (b *builtinLastInsertIDSig) evalInt(row chunk.Row) (res int64, isNull bool, err error) {
res = int64(b.ctx.GetSessionVars().PrevLastInsertID)
res = int64(b.ctx.GetSessionVars().StmtCtx.PrevLastInsertID)
return res, false, nil
}

Expand Down Expand Up @@ -437,6 +437,6 @@ func (b *builtinRowCountSig) Clone() builtinFunc {
// evalInt evals ROW_COUNT().
// See https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_row-count.
func (b *builtinRowCountSig) evalInt(_ chunk.Row) (res int64, isNull bool, err error) {
res = int64(b.ctx.GetSessionVars().PrevAffectedRows)
res = int64(b.ctx.GetSessionVars().StmtCtx.PrevAffectedRows)
return res, false, nil
}
4 changes: 2 additions & 2 deletions expression/builtin_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *testEvaluatorSuite) TestRowCount(c *C) {
defer testleak.AfterTest(c)()
ctx := mock.NewContext()
sessionVars := ctx.GetSessionVars()
sessionVars.PrevAffectedRows = 10
sessionVars.StmtCtx.PrevAffectedRows = 10

f, err := funcs[ast.RowCount].getFunction(ctx, nil)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *testEvaluatorSuite) TestLastInsertID(c *C) {
err error
)
if t.insertID > 0 {
s.ctx.GetSessionVars().PrevLastInsertID = t.insertID
s.ctx.GetSessionVars().StmtCtx.PrevLastInsertID = t.insertID
}

if t.args != nil {
Expand Down
5 changes: 3 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,9 @@ func (s *session) retry(ctx context.Context, maxCnt uint) error {
if st.IsReadOnly() {
continue
}
s.sessionVars.StmtCtx = sr.stmtCtx
s.sessionVars.StmtCtx.ResetForRetry()
s.sessionVars.PreparedParams = s.sessionVars.PreparedParams[:0]
schemaVersion, err = st.RebuildPlan()
if err != nil {
return errors.Trace(err)
Expand All @@ -489,8 +492,6 @@ func (s *session) retry(ctx context.Context, maxCnt uint) error {
} else {
log.Warnf("con:%d schema_ver:%d retry_cnt:%d query_num:%d", connID, schemaVersion, retryCnt, i)
}
s.sessionVars.StmtCtx = sr.stmtCtx
s.sessionVars.StmtCtx.ResetForRetry()
jackysp marked this conversation as resolved.
Show resolved Hide resolved
_, err = st.Exec(ctx)
if err != nil {
s.StmtRollback()
Expand Down
42 changes: 36 additions & 6 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ func (s *testSessionSuite) TestLastInsertID(c *C) {
tk.MustExec("execute stmt1 using @v1")
tk.MustExec("execute stmt1 using @v2")
tk.MustExec("deallocate prepare stmt1")
currLastInsertID := tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID := tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
tk.MustQuery("select c1 from t where c2 = 20").Check(testkit.Rows(fmt.Sprint(currLastInsertID)))
c.Assert(lastInsertID+2, Equals, currLastInsertID)
}
Expand Down Expand Up @@ -727,7 +727,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk.MustExec("commit")

tk.MustQuery("select c1 from t where c2 = 11").Check(testkit.Rows("6"))
currLastInsertID := tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID := tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
c.Assert(lastInsertID+5, Equals, currLastInsertID)

// insert set
Expand All @@ -742,7 +742,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk.MustExec("commit")

tk.MustQuery("select c1 from t where c2 = 31").Check(testkit.Rows("9"))
currLastInsertID = tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID = tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
c.Assert(lastInsertID+3, Equals, currLastInsertID)

// replace
Expand All @@ -757,7 +757,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk.MustExec("commit")

tk.MustQuery("select c1 from t where c2 = 21").Check(testkit.Rows("10"))
currLastInsertID = tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID = tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
c.Assert(lastInsertID+1, Equals, currLastInsertID)

// update
Expand All @@ -773,7 +773,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk.MustExec("commit")

tk.MustQuery("select c1 from t where c2 = 41").Check(testkit.Rows("0"))
currLastInsertID = tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID = tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
c.Assert(lastInsertID+3, Equals, currLastInsertID)

// prepare
Expand All @@ -795,7 +795,7 @@ func (s *testSessionSuite) TestAutoIncrementWithRetry(c *C) {
tk.MustExec("commit")

tk.MustQuery("select c1 from t where c2 = 12").Check(testkit.Rows("7"))
currLastInsertID = tk.Se.GetSessionVars().PrevLastInsertID
currLastInsertID = tk.Se.GetSessionVars().StmtCtx.PrevLastInsertID
c.Assert(lastInsertID+3, Equals, currLastInsertID)
}

Expand Down Expand Up @@ -1255,6 +1255,36 @@ func (s *testSessionSuite) TestDelete(c *C) {
tk1.MustQuery("select * from t;").Check(testkit.Rows("1"))
}

func (s *testSessionSuite) TestResetCtx(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("create table t (i int auto_increment not null key);")
tk.MustExec("insert into t values (1);")
tk.MustExec("begin;")
tk.MustExec("insert into t values (10);")
tk.MustExec("update t set i = i + row_count();")
tk.MustQuery("select * from t;").Check(testkit.Rows("2", "11"))

tk1.MustExec("update t set i = 0 where i = 1;")
jackysp marked this conversation as resolved.
Show resolved Hide resolved
tk1.MustQuery("select * from t;").Check(testkit.Rows("0"))

tk.MustExec("commit;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1", "11"))

tk.MustExec("delete from t where i = 11;")
tk.MustExec("begin;")
tk.MustExec("insert into t values ();")
tk.MustExec("update t set i = i + last_insert_id() + 1;")
tk.MustQuery("select * from t;").Check(testkit.Rows("14", "25"))

tk1.MustExec("update t set i = 0 where i = 1;")
tk1.MustQuery("select * from t;").Check(testkit.Rows("0"))

tk.MustExec("commit;")
tk.MustQuery("select * from t;").Check(testkit.Rows("13", "25"))
}

func (s *testSessionSuite) TestUnique(c *C) {
// test for https://github.com/pingcap/tidb/pull/461

Expand Down
4 changes: 2 additions & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)

// GetHistory get all stmtHistory in current txn. Exported only for test.
func GetHistory(ctx sessionctx.Context) *StmtHistory {
hist, ok := ctx.GetSessionVars().TxnCtx.Histroy.(*StmtHistory)
hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory)
if ok {
return hist
}
hist = new(StmtHistory)
ctx.GetSessionVars().TxnCtx.Histroy = hist
ctx.GetSessionVars().TxnCtx.History = hist
return hist
}

Expand Down
6 changes: 6 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type StatementContext struct {
histogramsNotLoad bool
execDetails execdetails.ExecDetails
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
// PrevLastInsertID is the last insert ID of previous statement.
PrevLastInsertID uint64

// Copied from SessionVars.TimeZone.
TimeZone *time.Location
Expand Down Expand Up @@ -241,6 +245,8 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.foundRows = 0
sc.mu.warnings = nil
sc.mu.Unlock()
sc.TableIDs = sc.TableIDs[:0]
jackysp marked this conversation as resolved.
Show resolved Hide resolved
sc.IndexIDs = sc.IndexIDs[:0]
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
23 changes: 4 additions & 19 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type TransactionContext struct {
DirtyDB interface{}
Binlog interface{}
InfoSchema interface{}
Histroy interface{}
History interface{}
SchemaVersion int64
StartTS uint64
Shard *int64
Expand Down Expand Up @@ -195,12 +195,9 @@ type SessionVars struct {

// Following variables are special for current session.

Status uint16
PrevLastInsertID uint64 // PrevLastInsertID is the last insert ID of previous statement.
LastInsertID uint64 // LastInsertID is the auto-generated ID in the current statement.
InsertID uint64 // InsertID is the given insert ID of an auto_increment column.
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
Status uint16
LastInsertID uint64 // LastInsertID is the auto-generated ID in the current statement.
InsertID uint64 // InsertID is the given insert ID of an auto_increment column.

// ClientCapability is client's capability.
ClientCapability uint32
Expand Down Expand Up @@ -461,18 +458,6 @@ func (s *SessionVars) Location() *time.Location {
return loc
}

// ResetPrevAffectedRows reset the prev-affected-rows variable.
func (s *SessionVars) ResetPrevAffectedRows() {
s.PrevAffectedRows = 0
if s.StmtCtx != nil {
if s.StmtCtx.InUpdateOrDeleteStmt || s.StmtCtx.InInsertStmt {
s.PrevAffectedRows = int64(s.StmtCtx.AffectedRows())
} else if s.StmtCtx.InSelectStmt {
s.PrevAffectedRows = -1
}
}
}

// GetExecuteArgumentsInfo gets the argument list as a string of execute statement.
func (s *SessionVars) GetExecuteArgumentsInfo() string {
if len(s.PreparedParams) == 0 {
Expand Down