From 2e8a982cb0f9e42cefb4e7ae9423ac8b8aaf7522 Mon Sep 17 00:00:00 2001 From: YangKeao Date: Thu, 23 Mar 2023 05:34:43 -0400 Subject: [PATCH] session, com_stmt: fetch all rows during EXECUTE command (#42473) close pingcap/tidb#41891, close pingcap/tidb#42424 --- ddl/db_test.go | 26 ++-- domain/infosync/info.go | 16 ++- server/BUILD.bazel | 1 - server/conn.go | 22 --- server/conn_stmt.go | 61 +++++++-- server/conn_stmt_test.go | 187 ++------------------------ server/conn_test.go | 5 +- server/driver.go | 24 +--- server/driver_tidb.go | 85 +++--------- server/driver_tidb_test.go | 25 ---- server/server.go | 34 ----- session/session.go | 1 - sessionctx/variable/session.go | 53 -------- sessionctx/variable/session_test.go | 54 -------- sessiontxn/isolation/readcommitted.go | 4 +- table/tables/BUILD.bazel | 2 +- testkit/mocksessionmanager.go | 24 +--- util/processinfo.go | 28 ---- 18 files changed, 113 insertions(+), 539 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index ca442f15c9ba5..87d941ccb87e8 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -1340,11 +1340,13 @@ func TestLogAndShowSlowLog(t *testing.T) { } func TestReportingMinStartTimestamp(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) - tk := testkit.NewTestKit(t, store) - se := tk.Session() + _, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease) infoSyncer := dom.InfoSyncer() + sm := &testkit.MockSessionManager{ + PS: make([]*util.ProcessInfo, 0), + } + infoSyncer.SetSessionManager(sm) beforeTS := oracle.GoTimeToTS(time.Now()) infoSyncer.ReportMinStartTS(dom.Store()) afterTS := oracle.GoTimeToTS(time.Now()) @@ -1353,21 +1355,13 @@ func TestReportingMinStartTimestamp(t *testing.T) { now := time.Now() validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse) lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse) - sm := se.GetSessionManager().(*testkit.MockSessionManager) sm.PS = []*util.ProcessInfo{ - {CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, - {CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, - {CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, - {CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList}, + {CurTxnStartTS: 0}, + {CurTxnStartTS: math.MaxUint64}, + {CurTxnStartTS: lowerLimit}, + {CurTxnStartTS: validTS}, } - infoSyncer.ReportMinStartTS(dom.Store()) - require.Equal(t, validTS, infoSyncer.GetMinStartTS()) - - unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1) - infoSyncer.ReportMinStartTS(dom.Store()) - require.Equal(t, validTS-1, infoSyncer.GetMinStartTS()) - - unhold() + infoSyncer.SetSessionManager(sm) infoSyncer.ReportMinStartTS(dom.Store()) require.Equal(t, validTS, infoSyncer.GetMinStartTS()) } diff --git a/domain/infosync/info.go b/domain/infosync/info.go index f3ae522d220a2..ef0777572de82 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -797,6 +797,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { if sm == nil { return } + pl := sm.ShowProcessList() + innerSessionStartTSList := sm.GetInternalSessionStartTSList() // Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC. currentVer, err := store.CurrentVersion(kv.GlobalTxnScope) @@ -810,8 +812,18 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) { minStartTS := oracle.GoTimeToTS(now) logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS), zap.Uint64("StartTSLowerLimit", startTSLowerLimit)) - if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS { - minStartTS = ts + for _, info := range pl { + if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS { + minStartTS = info.CurTxnStartTS + } + } + + for _, innerTS := range innerSessionStartTSList { + logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS)) + kv.PrintLongTimeInternalTxn(now, innerTS, false) + if innerTS > startTSLowerLimit && innerTS < minStartTS { + minStartTS = innerTS + } } is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS) diff --git a/server/BUILD.bazel b/server/BUILD.bazel index b01a1f1a4b0c2..af93cefec6fe8 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -204,7 +204,6 @@ go_test( "//util/plancodec", "//util/resourcegrouptag", "//util/rowcodec", - "//util/sqlexec", "//util/stmtsummary/v2:stmtsummary", "//util/syncutil", "//util/topsql", diff --git a/server/conn.go b/server/conn.go index 3eb2146517732..6f6735a936c26 100644 --- a/server/conn.go +++ b/server/conn.go @@ -2331,34 +2331,12 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool // fetchSize, the desired number of rows to be fetched each time when client uses cursor. func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error { fetchedRows := rs.GetFetchedRows() - // if fetchedRows is not enough, getting data from recordSet. - // NOTE: chunk should not be allocated from the allocator - // the allocator will reset every statement - // but it maybe stored in the result set among statements - // ref https://github.com/pingcap/tidb/blob/7fc6ebbda4ddf84c0ba801ca7ebb636b934168cf/server/conn_stmt.go#L233-L239 - // Here server.tidbResultSet implements Next method. - req := rs.NewChunk(nil) - for len(fetchedRows) < fetchSize { - if err := rs.Next(ctx, req); err != nil { - return err - } - rowCount := req.NumRows() - if rowCount == 0 { - break - } - // filling fetchedRows with chunk - for i := 0; i < rowCount; i++ { - fetchedRows = append(fetchedRows, req.GetRow(i)) - } - req = chunk.Renew(req, cc.ctx.GetSessionVars().MaxChunkSize) - } // tell the client COM_STMT_FETCH has finished by setting proper serverStatus, // and close ResultSet. if len(fetchedRows) == 0 { serverStatus &^= mysql.ServerStatusCursorExists serverStatus |= mysql.ServerStatusLastRowSend - terror.Call(rs.Close) return cc.writeEOF(ctx, serverStatus) } diff --git a/server/conn_stmt.go b/server/conn_stmt.go index f4638ba0afe1f..06462696b8c4d 100644 --- a/server/conn_stmt.go +++ b/server/conn_stmt.go @@ -56,6 +56,7 @@ import ( "github.com/pingcap/tidb/sessiontxn" storeerr "github.com/pingcap/tidb/store/driver/error" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/topsql" @@ -285,7 +286,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm } // since there are multiple implementations of ResultSet (the rs might be wrapped), we have to unwrap the rs before // casting it to *tidbResultSet. - if result, ok := unwrapResultSet(rs).(*tidbResultSet); ok { + if result, ok := rs.(*tidbResultSet); ok { if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok { result.preparedStmt = planCacheStmt } @@ -297,29 +298,54 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm if useCursor { cc.initResultEncoder(ctx) defer cc.rsEncoder.clean() - // fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info - // will be set to sleep after fetch returned. - if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 { - unhold := pi.HoldTS(pi.CurTxnStartTS) - rs = &rsWithHooks{ResultSet: rs, onClosed: unhold} + // fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read + // the rows directly to avoid running executor and accessing shared params/variables in the session + // NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command + // but the rows are still needed in the following FETCH command. + // + // TODO: trace the memory used here + chk := rs.NewChunk(nil) + var rows []chunk.Row + for { + if err = rs.Next(ctx, chk); err != nil { + return false, err + } + rowCount := chk.NumRows() + if rowCount == 0 { + break + } + // filling fetchedRows with chunk + for i := 0; i < rowCount; i++ { + row := chk.GetRow(i) + rows = append(rows, row) + } + chk = chunk.Renew(chk, vars.MaxChunkSize) } + rs.StoreFetchedRows(rows) + stmt.StoreResultSet(rs) - // also store the preparedParams in the stmt, so we could restore the params in the following fetch command - // the params should have been parsed in `(&cc.ctx).ExecuteStmt(ctx, execStmt)`. - stmt.StorePreparedCtx(&PreparedStatementCtx{ - Params: vars.PreparedParams, - }) if err = cc.writeColumnInfo(rs.Columns()); err != nil { return false, err } if cl, ok := rs.(fetchNotifier); ok { cl.OnFetchReturned() } + + // as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this + // `ResultSet`. + err = rs.Close() + if err != nil { + return false, err + } + + stmt.SetCursorActive(true) + // explicitly flush columnInfo to client. err = cc.writeEOF(ctx, cc.ctx.Status()) if err != nil { return false, err } + return false, cc.flush(ctx) } defer terror.Call(rs.Close) @@ -351,9 +377,6 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler, strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID)) } - - cc.ctx.GetSessionVars().PreparedParams = stmt.GetPreparedCtx().Params - if topsqlstate.TopSQLEnabled() { prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID) if prepareObj != nil && prepareObj.SQLDigest != nil { @@ -371,10 +394,19 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID)) } + sendingEOF := false + // if the `fetchedRows` are empty before writing result, we could say the `FETCH` command will send EOF + if len(rs.GetFetchedRows()) == 0 { + sendingEOF = true + } _, err = cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), int(fetchSize)) if err != nil { return errors.Annotate(err, cc.preparedStmt2String(stmtID)) } + if sendingEOF { + stmt.SetCursorActive(false) + } + return nil } @@ -712,6 +744,7 @@ func (cc *clientConn) handleStmtClose(data []byte) (err error) { if stmt != nil { return stmt.Close() } + return } diff --git a/server/conn_stmt_test.go b/server/conn_stmt_test.go index 61a7d3ad3e82c..dff61b203bf5e 100644 --- a/server/conn_stmt_test.go +++ b/server/conn_stmt_test.go @@ -20,7 +20,6 @@ import ( "encoding/binary" "testing" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" @@ -257,92 +256,6 @@ func TestParseStmtFetchCmd(t *testing.T) { } } -func TestCursorReadHoldTS(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - srv := CreateMockServer(t, store) - srv.SetDomain(dom) - defer srv.Close() - - appendUint32 := binary.LittleEndian.AppendUint32 - ctx := context.Background() - c := CreateMockConn(t, srv) - tk := testkit.NewTestKitWithSession(t, store, c.Context().Session) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int primary key)") - tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (7), (8)") - tk.MustQuery("select count(*) from t").Check(testkit.Rows("8")) - - stmt, _, _, err := c.Context().Prepare("select * from t") - require.NoError(t, err) - require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) - - // should hold ts after executing stmt with cursor - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - ts := tk.Session().ShowProcess().GetMinStartTS(0) - require.Positive(t, ts) - // should unhold ts when result set exhausted - require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) - require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) - require.Equal(t, ts, srv.GetMinStartTS(0)) - require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) - require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0)) - require.Equal(t, ts, srv.GetMinStartTS(0)) - require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5))) - require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) - - // should hold ts after executing stmt with cursor - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) - // should unhold ts when stmt reset - require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, uint32(stmt.ID())))) - require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) - - // should hold ts after executing stmt with cursor - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0)) - // should unhold ts when stmt closed - require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtClose}, uint32(stmt.ID())))) - require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) - - // create another 2 stmts and execute them - stmt1, _, _, err := c.Context().Prepare("select * from t") - require.NoError(t, err) - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt1.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - ts1 := tk.Session().ShowProcess().GetMinStartTS(0) - require.Positive(t, ts1) - stmt2, _, _, err := c.Context().Prepare("select * from t") - require.NoError(t, err) - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt2.ID())), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - ts2 := tk.Session().ShowProcess().GetMinStartTS(ts1) - require.Positive(t, ts2) - - require.Less(t, ts1, ts2) - require.Equal(t, ts1, srv.GetMinStartTS(0)) - require.Equal(t, ts2, srv.GetMinStartTS(ts1)) - require.Zero(t, srv.GetMinStartTS(ts2)) - - // should unhold all when session closed - c.Close() - require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0)) - require.Zero(t, srv.GetMinStartTS(0)) -} - func TestCursorExistsFlag(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) srv := CreateMockServer(t, store) @@ -399,93 +312,6 @@ func TestCursorExistsFlag(t *testing.T) { require.False(t, mysql.HasCursorExistsFlag(getLastStatus())) } -func TestCursorReadWithRCCheckTS(t *testing.T) { - store, dom := testkit.CreateMockStoreAndDomain(t) - srv := CreateMockServer(t, store) - srv.SetDomain(dom) - defer srv.Close() - - appendUint32 := binary.LittleEndian.AppendUint32 - ctx := context.Background() - c := CreateMockConn(t, srv).(*mockConn) - out := new(bytes.Buffer) - c.pkt.bufWriter.Reset(out) - c.capability |= mysql.ClientDeprecateEOF | mysql.ClientProtocol41 - tk := testkit.NewTestKitWithSession(t, store, c.Context().Session) - c.Context().GetSessionVars().SetDistSQLScanConcurrency(1) - c.Context().GetSessionVars().InitChunkSize = 1 - c.Context().GetSessionVars().MaxChunkSize = 1 - - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t(a int primary key, b int, c int, key k1(b))") - // Prepare data. - tk.MustQuery("split table t by (100), (1000)") - tk.MustExec("set tidb_txn_mode='pessimistic'") - tk.MustExec("set tx_isolation='READ-COMMITTED'") - tk.MustExec("set tidb_rc_read_check_ts='on'") - tk.MustExec("set tidb_store_batch_size=0") - tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 3), " + - "(104, 104, 104), (105, 105, 105), (106, 106, 106), " + - "(1007, 1007, 1007), (1008, 1008, 1008), (1009, 1009, 1009)") - tk.MustQuery("select count(*) from t").Check(testkit.Rows("9")) - - getLastStatus := func() uint16 { - raw := out.Bytes() - return binary.LittleEndian.Uint16(raw[len(raw)-4 : len(raw)-2]) - } - - stmtCop, _, _, err := c.Context().Prepare("select * from t") - require.NoError(t, err) - - stmtBatchGet, _, _, err := c.Context().Prepare("select * from t where a in (1, 2, 3, 104, 105, 106, 1007, 1008, 1009)") - require.NoError(t, err) - - pauseCopIterTaskSender := "github.com/pingcap/tidb/store/copr/pauseCopIterTaskSender" - defer func() { - require.NoError(t, failpoint.Disable(pauseCopIterTaskSender)) - }() - c.Context().GetSessionVars().MaxChunkSize = 1 - for _, taskType := range []string{"BatchPointGet", "Cop"} { - tk.MustExec("begin pessimistic") - require.NoError(t, failpoint.Enable(pauseCopIterTaskSender, "pause")) - var stmtID uint32 - if taskType == "BatchPointGet" { - stmtID = uint32(stmtBatchGet.ID()) - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, stmtID), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - } else if taskType == "Cop" { - stmtID = uint32(stmtCop.ID()) - require.NoError(t, c.Dispatch(ctx, append( - appendUint32([]byte{mysql.ComStmtExecute}, stmtID), - mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0, - ))) - } - require.True(t, mysql.HasCursorExistsFlag(getLastStatus())) - - // Fetch the first 3 rows. - require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, stmtID), 3))) - require.True(t, mysql.HasCursorExistsFlag(getLastStatus())) - - // The rows are updated by other transactions before the next fetch. - connUpdate := CreateMockConn(t, srv).(*mockConn) - tkUpdate := testkit.NewTestKitWithSession(t, store, connUpdate.Context().Session) - tkUpdate.MustExec("use test") - tkUpdate.MustExec("update t set c = c + 10 where a in (1, 104, 1007)") - - // Fetch the next 3 rows, write conflict error should be reported if the `RCCheckTS` is used. - require.NoError(t, failpoint.Disable(pauseCopIterTaskSender)) - require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, stmtID), 3))) - require.True(t, mysql.HasCursorExistsFlag(getLastStatus())) - - // Finish. - require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, stmtID))) - tk.MustExec("rollback") - } -} - func TestCursorWithParams(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) srv := CreateMockServer(t, store) @@ -514,6 +340,10 @@ func TestCursorWithParams(t *testing.T) { 0x0, 0x1, 0x3, 0x0, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, ))) + rows := c.Context().stmts[stmt1.ID()].GetResultSet().GetFetchedRows() + require.Len(t, rows, 1) + require.Equal(t, int64(1), rows[0].GetInt64(0)) + require.Equal(t, int64(2), rows[0].GetInt64(1)) // `execute stmt2 using 1` with cursor require.NoError(t, c.Dispatch(ctx, append( @@ -522,6 +352,12 @@ func TestCursorWithParams(t *testing.T) { 0x0, 0x1, 0x3, 0x0, 0x1, 0x0, 0x0, 0x0, ))) + rows = c.Context().stmts[stmt2.ID()].GetResultSet().GetFetchedRows() + require.Len(t, rows, 2) + require.Equal(t, int64(1), rows[0].GetInt64(0)) + require.Equal(t, int64(1), rows[0].GetInt64(1)) + require.Equal(t, int64(1), rows[1].GetInt64(0)) + require.Equal(t, int64(2), rows[1].GetInt64(1)) // fetch stmt2 with fetch size 256 require.NoError(t, c.Dispatch(ctx, append( @@ -529,7 +365,8 @@ func TestCursorWithParams(t *testing.T) { 0x0, 0x1, 0x0, 0x0, ))) - // fetch stmt1 with fetch size 256, as it has more params, if we didn't restore the parameters, it will panic. + // fetch stmt1 with fetch size 256, as it has more params, if we fetch the result at the first execute command, it + // will panic because the params have been overwritten and is not long enough. require.NoError(t, c.Dispatch(ctx, append( appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt1.ID())), 0x0, 0x1, 0x0, 0x0, diff --git a/server/conn_test.go b/server/conn_test.go index c705f1a01b981..286a6819fa3ab 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -524,7 +524,7 @@ func TestDispatchClientProtocol41(t *testing.T) { com: mysql.ComStmtFetch, in: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, err: nil, - out: []byte{0x5, 0x0, 0x0, 0x9, 0xfe, 0x0, 0x0, 0x82, 0x0}, + out: []byte{0x5, 0x0, 0x0, 0x9, 0xfe, 0x0, 0x0, 0x42, 0x0}, }, { com: mysql.ComStmtReset, @@ -1022,8 +1022,7 @@ func TestTiFlashFallback(t *testing.T) { tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout")) // test COM_STMT_FETCH (cursor mode) - require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0})) - require.Error(t, cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0})) + require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0})) tk.MustExec("set @@tidb_allow_fallback_to_tikv=''") require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0})) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0")) diff --git a/server/driver.go b/server/driver.go index a410d79adf763..7fdebdd2739ff 100644 --- a/server/driver.go +++ b/server/driver.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/extension" - "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/chunk" ) @@ -30,17 +29,6 @@ type IDriver interface { OpenCtx(connID uint64, capability uint32, collation uint8, dbname string, tlsState *tls.ConnectionState, extensions *extension.SessionExtensions) (*TiDBContext, error) } -// PreparedStatementCtx stores the context generated in `execute` statement for a prepared statement -// subsequent stmt fetching could restore the session variables from this context -type PreparedStatementCtx struct { - // Params is the params used in `execute` statement - Params variable.PreparedParams - // TODO: store and restore variables, but be careful that we'll also need to restore the variables after FETCH - // a cleaner way to solve this problem is to always reading params from a statement scope (but not session scope) - // context. But switching in/out related context is simpler on current code base, and the affected radius is more - // controllable. -} - // PreparedStatement is the interface to use a prepared statement. type PreparedStatement interface { // ID returns statement ID @@ -70,17 +58,17 @@ type PreparedStatement interface { // GetResultSet gets ResultSet associated this statement GetResultSet() ResultSet - // StorePreparedCtx stores context in `execute` statement for subsequent stmt fetching - StorePreparedCtx(ctx *PreparedStatementCtx) - - // GetPreparedParams gets the prepared params associated this statement - GetPreparedCtx() *PreparedStatementCtx - // Reset removes all bound parameters. Reset() // Close closes the statement. Close() error + + // GetCursorActive returns whether the statement has active cursor + GetCursorActive() bool + + // SetCursorActive sets whether the statement has active cursor + SetCursorActive(active bool) } // ResultSet is the result set of an query. diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 175e896a07cce..96312127e2da9 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -66,10 +66,13 @@ type TiDBStatement struct { boundParams [][]byte paramsType []byte ctx *TiDBContext - rs ResultSet - sql string + // this result set should have been closed before stored here. Only the `fetchedRows` are used here. This field is + // not moved out to reuse the logic inside functions `writeResultSet...` + // TODO: move the `fetchedRows` into the statement, and remove the `ResultSet` from statement. + rs ResultSet + sql string - preparedStatementCtx *PreparedStatementCtx + hasActiveCursor bool } // ID implements PreparedStatement ID method. @@ -144,27 +147,12 @@ func (ts *TiDBStatement) GetResultSet() ResultSet { return ts.rs } -// StorePreparedCtx implements PreparedStatement StorePreparedCtx method. -func (ts *TiDBStatement) StorePreparedCtx(ctx *PreparedStatementCtx) { - ts.preparedStatementCtx = ctx -} - -// GetPreparedCtx implements PreparedStatement GetPreparedCtx method. -func (ts *TiDBStatement) GetPreparedCtx() *PreparedStatementCtx { - return ts.preparedStatementCtx -} - // Reset implements PreparedStatement Reset method. func (ts *TiDBStatement) Reset() { for i := range ts.boundParams { ts.boundParams[i] = nil } - - // closing previous ResultSet if it exists - if ts.rs != nil { - terror.Call(ts.rs.Close) - ts.rs = nil - } + ts.hasActiveCursor = false } // Close implements PreparedStatement Close method. @@ -195,14 +183,19 @@ func (ts *TiDBStatement) Close() error { ts.ctx.GetSessionVars().RemovePreparedStmt(ts.id) } delete(ts.ctx.stmts, int(ts.id)) - - // close ResultSet associated with this statement - if ts.rs != nil { - terror.Call(ts.rs.Close) - } return nil } +// GetCursorActive implements PreparedStatement GetCursorActive method. +func (ts *TiDBStatement) GetCursorActive() bool { + return ts.hasActiveCursor +} + +// SetCursorActive implements PreparedStatement SetCursorActive method. +func (ts *TiDBStatement) SetCursorActive(fetchEnd bool) { + ts.hasActiveCursor = fetchEnd +} + // OpenCtx implements IDriver. func (qd *TiDBDriver) OpenCtx(connID uint64, capability uint32, collation uint8, dbname string, tlsState *tls.ConnectionState, extensions *extension.SessionExtensions) (*TiDBContext, error) { se, err := session.CreateSession(qd.store) @@ -368,8 +361,8 @@ func (tc *TiDBContext) EncodeSessionStates(ctx context.Context, sctx sessionctx. return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have bound params") } } - if rs := stmt.GetResultSet(); rs != nil && !rs.IsClosed() { - return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have open result sets") + if stmt.GetCursorActive() { + return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have unfetched rows") } preparedStmtInfo.ParamTypes = stmt.GetParamsType() } @@ -492,46 +485,6 @@ func (trs *tidbResultSet) Columns() []*ColumnInfo { return trs.columns } -// rsWithHooks wraps a ResultSet with some hooks (currently only onClosed). -type rsWithHooks struct { - ResultSet - onClosed func() -} - -// Close implements ResultSet#Close -func (rs *rsWithHooks) Close() error { - closed := rs.IsClosed() - err := rs.ResultSet.Close() - if !closed && rs.onClosed != nil { - rs.onClosed() - } - return err -} - -// OnFetchReturned implements fetchNotifier#OnFetchReturned -func (rs *rsWithHooks) OnFetchReturned() { - if impl, ok := rs.ResultSet.(fetchNotifier); ok { - impl.OnFetchReturned() - } -} - -// Unwrap returns the underlying result set -func (rs *rsWithHooks) Unwrap() ResultSet { - return rs.ResultSet -} - -// unwrapResultSet likes errors.Cause but for ResultSet -func unwrapResultSet(rs ResultSet) ResultSet { - var unRS ResultSet - if u, ok := rs.(interface{ Unwrap() ResultSet }); ok { - unRS = u.Unwrap() - } - if unRS == nil { - return rs - } - return unwrapResultSet(unRS) -} - func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) { ci = &ColumnInfo{ Name: fld.ColumnAsName.O, diff --git a/server/driver_tidb_test.go b/server/driver_tidb_test.go index 35dd70f438982..c61319a557d6e 100644 --- a/server/driver_tidb_test.go +++ b/server/driver_tidb_test.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" - "github.com/pingcap/tidb/util/sqlexec" "github.com/stretchr/testify/require" ) @@ -95,27 +94,3 @@ func TestConvertColumnInfo(t *testing.T) { colInfo = convertColumnInfo(&resultField) require.Equal(t, uint32(4), colInfo.ColumnLength) } - -func TestRSWithHooks(t *testing.T) { - closeCount := 0 - rs := &rsWithHooks{ - ResultSet: &tidbResultSet{recordSet: new(sqlexec.SimpleRecordSet)}, - onClosed: func() { closeCount++ }, - } - require.Equal(t, 0, closeCount) - rs.Close() - require.Equal(t, 1, closeCount) - rs.Close() - require.Equal(t, 1, closeCount) -} - -func TestUnwrapRS(t *testing.T) { - var nilRS ResultSet - require.Nil(t, unwrapResultSet(nilRS)) - rs0 := new(tidbResultSet) - rs1 := &rsWithHooks{ResultSet: rs0} - rs2 := &rsWithHooks{ResultSet: rs1} - for _, rs := range []ResultSet{rs0, rs1, rs2} { - require.Equal(t, rs0, unwrapResultSet(rs)) - } -} diff --git a/server/server.go b/server/server.go index 72122bdda6a72..2de9bc762ff4f 100644 --- a/server/server.go +++ b/server/server.go @@ -1012,37 +1012,3 @@ func (s *Server) KillNonFlashbackClusterConn() { s.Kill(id, false) } } - -// GetMinStartTS implements SessionManager interface. -func (s *Server) GetMinStartTS(lowerBound uint64) (ts uint64) { - // sys processes - if s.dom != nil { - for _, pi := range s.dom.SysProcTracker().GetSysProcessList() { - if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - } - } - // user sessions - func() { - s.rwlock.RLock() - defer s.rwlock.RUnlock() - for _, client := range s.clients { - if thisTS := client.ctx.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - } - }() - // internal sessions - func() { - s.sessionMapMutex.Lock() - defer s.sessionMapMutex.Unlock() - analyzeProcID := util.GetAutoAnalyzeProcID(s.ServerID) - for se := range s.internalSessions { - if thisTS, processInfoID := session.GetStartTSFromSession(se); processInfoID != analyzeProcID && thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - } - }() - return -} diff --git a/session/session.go b/session/session.go index 64808d01647f1..9dbee83010cde 100644 --- a/session/session.go +++ b/session/session.go @@ -1556,7 +1556,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, - ProtectedTSList: &s.sessionVars.ProtectedTSList, ResourceGroupName: s.sessionVars.ResourceGroupName, } oldPi := s.ShowProcess() diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 951546e0d32d1..999c287b34fdf 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1384,9 +1384,6 @@ type SessionVars struct { // Resource group name ResourceGroupName string - // ProtectedTSList holds a list of timestamps that should delay GC. - ProtectedTSList protectedTSList - // PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction // is enabled. PessimisticTransactionFairLocking bool @@ -3315,53 +3312,3 @@ func (s *SessionVars) GetRelatedTableForMDL() *sync.Map { func (s *SessionVars) EnableForceInlineCTE() bool { return s.enableForceInlineCTE } - -// protectedTSList implements util/processinfo#ProtectedTSList -type protectedTSList struct { - sync.Mutex - items map[uint64]int -} - -// HoldTS holds the timestamp to prevent its data from being GCed. -func (lst *protectedTSList) HoldTS(ts uint64) (unhold func()) { - lst.Lock() - if lst.items == nil { - lst.items = map[uint64]int{} - } - lst.items[ts] += 1 - lst.Unlock() - var once sync.Once - return func() { - once.Do(func() { - lst.Lock() - if lst.items != nil { - if lst.items[ts] > 1 { - lst.items[ts] -= 1 - } else { - delete(lst.items, ts) - } - } - lst.Unlock() - }) - } -} - -// GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). -func (lst *protectedTSList) GetMinProtectedTS(lowerBound uint64) (ts uint64) { - lst.Lock() - for k, v := range lst.items { - if v > 0 && k > lowerBound && (k < ts || ts == 0) { - ts = k - } - } - lst.Unlock() - return -} - -// Size returns the number of protected timestamps (exported for test). -func (lst *protectedTSList) Size() (size int) { - lst.Lock() - size = len(lst.items) - lst.Unlock() - return -} diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 29848fb723177..047f00efe07ec 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -489,60 +489,6 @@ func TestGetReuseChunk(t *testing.T) { require.Nil(t, sessVars.ChunkPool.Alloc) } -func TestPretectedTSList(t *testing.T) { - lst := &variable.NewSessionVars(nil).ProtectedTSList - - // empty set - require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) - require.Equal(t, 0, lst.Size()) - - // hold 1 - unhold1 := lst.HoldTS(1) - require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) - - // hold 2 twice - unhold2a := lst.HoldTS(2) - unhold2b := lst.HoldTS(2) - require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) - require.Equal(t, 2, lst.Size()) - - // unhold 2a - unhold2a() - require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) - require.Equal(t, 2, lst.Size()) - // unhold 2a again - unhold2a() - require.Equal(t, uint64(1), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) - require.Equal(t, 2, lst.Size()) - - // unhold 1 - unhold1() - require.Equal(t, uint64(2), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(2), lst.GetMinProtectedTS(1)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(2)) - require.Equal(t, 1, lst.Size()) - - // unhold 2b - unhold2b() - require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) - require.Equal(t, 0, lst.Size()) - - // unhold 2b again - unhold2b() - require.Equal(t, uint64(0), lst.GetMinProtectedTS(0)) - require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) - require.Equal(t, 0, lst.Size()) -} - func TestUserVarConcurrently(t *testing.T) { sv := variable.NewSessionVars(nil) ctx, cancel := context.WithTimeout(context.Background(), time.Second) diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index 3146db3126948..510f6b407c9a7 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" @@ -108,8 +107,7 @@ func NeedSetRCCheckTSFlag(ctx sessionctx.Context, node ast.Node) bool { sessionVars := ctx.GetSessionVars() if sessionVars.ConnectionID > 0 && variable.EnableRCReadCheckTS.Load() && sessionVars.InTxn() && !sessionVars.RetryInfo.Retrying && - plannercore.IsReadOnly(node, sessionVars) && - !ctx.GetSessionVars().GetStatusFlag(mysql.ServerStatusCursorExists) { + plannercore.IsReadOnly(node, sessionVars) { return true } return false diff --git a/table/tables/BUILD.bazel b/table/tables/BUILD.bazel index dac288208d11f..a23c69e4e7239 100644 --- a/table/tables/BUILD.bazel +++ b/table/tables/BUILD.bazel @@ -74,7 +74,7 @@ go_test( ], embed = [":tables"], flaky = True, - shard_count = 10, + shard_count = 50, deps = [ "//ddl", "//domain", diff --git a/testkit/mocksessionmanager.go b/testkit/mocksessionmanager.go index 550ff69132d91..67280bc2e4cbe 100644 --- a/testkit/mocksessionmanager.go +++ b/testkit/mocksessionmanager.go @@ -145,7 +145,7 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() { } } -// CheckOldRunningTxn implement SessionManager interface. +// CheckOldRunningTxn is to get all startTS of every transactions running in the current internal sessions func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) { msm.mu.Lock() for _, se := range msm.conn { @@ -153,25 +153,3 @@ func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2i } msm.mu.Unlock() } - -// GetMinStartTS implements SessionManager interface. -func (msm *MockSessionManager) GetMinStartTS(lowerBound uint64) (ts uint64) { - msm.PSMu.RLock() - defer msm.PSMu.RUnlock() - if len(msm.PS) > 0 { - for _, pi := range msm.PS { - if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - } - return - } - msm.mu.Lock() - defer msm.mu.Unlock() - for _, s := range msm.conn { - if thisTS := s.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - } - return -} diff --git a/util/processinfo.go b/util/processinfo.go index 135b1c58d5676..eee5b8fe020b7 100644 --- a/util/processinfo.go +++ b/util/processinfo.go @@ -31,14 +31,6 @@ import ( "github.com/tikv/client-go/v2/oracle" ) -// ProtectedTSList holds a list of timestamps that should delay GC. -type ProtectedTSList interface { - // HoldTS holds the timestamp to prevent its data from being GCed. - HoldTS(ts uint64) (unhold func()) - // GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one). - GetMinProtectedTS(lowerBound uint64) (ts uint64) -} - // OOMAlarmVariablesInfo is a struct for OOM alarm variables. type OOMAlarmVariablesInfo struct { SessionAnalyzeVersion int @@ -48,7 +40,6 @@ type OOMAlarmVariablesInfo struct { // ProcessInfo is a struct used for show processlist statement. type ProcessInfo struct { - ProtectedTSList Time time.Time ExpensiveLogTime time.Time Plan interface{} @@ -139,23 +130,6 @@ func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} { return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz), pi.ResourceGroupName) } -// GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). -func (pi *ProcessInfo) GetMinStartTS(lowerBound uint64) (ts uint64) { - if pi == nil { - return - } - if thisTS := pi.CurTxnStartTS; thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - if pi.ProtectedTSList == nil { - return - } - if thisTS := pi.GetMinProtectedTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) { - ts = thisTS - } - return -} - // ascServerStatus is a slice of all defined server status in ascending order. var ascServerStatus = []uint16{ mysql.ServerStatusInTrans, @@ -224,8 +198,6 @@ type SessionManager interface { CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) // KillNonFlashbackClusterConn kill all non flashback cluster connections. KillNonFlashbackClusterConn() - // GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one). - GetMinStartTS(lowerBound uint64) uint64 } // GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.