Skip to content

Commit

Permalink
session, com_stmt: fetch all rows during EXECUTE command (#42473) (#4…
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKeao authored Mar 29, 2023
1 parent 7ba7ae5 commit 44d7fbe
Show file tree
Hide file tree
Showing 23 changed files with 158 additions and 390 deletions.
4 changes: 0 additions & 4 deletions bindinfo/session_handle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,10 +414,6 @@ func (msm *mockSessionManager) GetInternalSessionStartTSList() []uint64 {
return nil
}

func (msm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

func TestIssue19836(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
21 changes: 6 additions & 15 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,10 +1415,8 @@ func TestLogAndShowSlowLog(t *testing.T) {
}

func TestReportingMinStartTimestamp(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
_, dom, clean := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
defer clean()
tk := testkit.NewTestKit(t, store)
se := tk.Session()

infoSyncer := dom.InfoSyncer()
sm := &testkit.MockSessionManager{
Expand All @@ -1434,19 +1432,12 @@ func TestReportingMinStartTimestamp(t *testing.T) {
validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse)
lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse)
sm.PS = []*util.ProcessInfo{
{CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: 0},
{CurTxnStartTS: math.MaxUint64},
{CurTxnStartTS: lowerLimit},
{CurTxnStartTS: validTS},
}
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())

unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS-1, infoSyncer.GetMinStartTS())

unhold()
infoSyncer.SetSessionManager(sm)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())
}
16 changes: 14 additions & 2 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
if sm == nil {
return
}
pl := sm.ShowProcessList()
innerSessionStartTSList := sm.GetInternalSessionStartTSList()

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
Expand All @@ -642,8 +644,18 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
minStartTS := oracle.GoTimeToTS(now)
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS),
zap.Uint64("StartTSLowerLimit", startTSLowerLimit))
if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS {
minStartTS = ts
for _, info := range pl {
if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}

for _, innerTS := range innerSessionStartTSList {
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS))
kv.PrintLongTimeInternalTxn(now, innerTS, false)
if innerTS > startTSLowerLimit && innerTS < minStartTS {
minStartTS = innerTS
}
}

is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS)
Expand Down
4 changes: 0 additions & 4 deletions executor/executor_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ func (msm *mockSessionManager) GetInternalSessionStartTSList() []uint64 {
return nil
}

func (msm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

func TestShowProcessList(t *testing.T) {
// Compose schema.
names := []string{"Id", "User", "Host", "db", "Command", "Time", "State", "Info"}
Expand Down
4 changes: 0 additions & 4 deletions executor/infoschema_cluster_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ func (sm *mockSessionManager) SetServerID(serverID uint64) {
sm.serverID = serverID
}

func (sm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

type mockStore struct {
helper.Storage
host string
Expand Down
4 changes: 0 additions & 4 deletions executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,6 @@ func (sm *mockSessionManager2) GetInternalSessionStartTSList() []uint64 {
return nil
}

func (sm *mockSessionManager2) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

func TestPreparedStmtWithHint(t *testing.T) {
// see https://github.com/pingcap/tidb/issues/18535
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
Expand Down
4 changes: 0 additions & 4 deletions executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,10 +898,6 @@ func (msm *mockSessionManager1) GetInternalSessionStartTSList() []uint64 {
return nil
}

func (msm *mockSessionManager1) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

func TestPreparedIssue17419(t *testing.T) {
store, dom, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()
Expand Down
4 changes: 0 additions & 4 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,10 +326,6 @@ func (sm *mockSessionManager) GetInternalSessionStartTSList() []uint64 {
return nil
}

func (sm *mockSessionManager) GetMinStartTS(lowerBound uint64) uint64 {
return 0
}

func TestSomeTables(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
7 changes: 7 additions & 0 deletions parser/mysql/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,3 +595,10 @@ const (
// DefaultDecimal defines the default decimal value when the value out of range.
DefaultDecimal = "99999999999999999999999999999999999999999999999999999999999999999"
)

// This is enum_cursor_type in MySQL
const (
CursorTypeReadOnly = 1 << iota
CursorTypeForUpdate
CursorTypeScrollable
)
22 changes: 0 additions & 22 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2259,34 +2259,12 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error {
fetchedRows := rs.GetFetchedRows()
// if fetchedRows is not enough, getting data from recordSet.
// NOTE: chunk should not be allocated from the allocator
// the allocator will reset every statement
// but it maybe stored in the result set among statements
// ref https://github.com/pingcap/tidb/blob/7fc6ebbda4ddf84c0ba801ca7ebb636b934168cf/server/conn_stmt.go#L233-L239
// Here server.tidbResultSet implements Next method.
req := rs.NewChunk(nil)
for len(fetchedRows) < fetchSize {
if err := rs.Next(ctx, req); err != nil {
return err
}
rowCount := req.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
fetchedRows = append(fetchedRows, req.GetRow(i))
}
req = chunk.Renew(req, cc.ctx.GetSessionVars().MaxChunkSize)
}

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if len(fetchedRows) == 0 {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
terror.Call(rs.Close)
return cc.writeEOF(serverStatus)
}

Expand Down
44 changes: 39 additions & 5 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/topsql"
Expand Down Expand Up @@ -231,11 +232,15 @@ func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err e
// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []types.Datum, useCursor bool) (bool, error) {
vars := (&cc.ctx).GetSessionVars()
rs, err := stmt.Execute(ctx, args)
if err != nil {
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
if rs == nil {
if useCursor {
vars.SetStatusFlag(mysql.ServerStatusCursorExists, false)
}
return false, cc.writeOK(ctx)
}

Expand All @@ -245,12 +250,31 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
if useCursor {
cc.initResultEncoder(ctx)
defer cc.rsEncoder.clean()
// fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info
// will be set to sleep after fetch returned.
if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 {
unhold := pi.HoldTS(pi.CurTxnStartTS)
rs = &rsWithHooks{ResultSet: rs, onClosed: unhold}
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
//
// TODO: trace the memory used here
chk := rs.NewChunk(nil)
var rows []chunk.Row
for {
if err = rs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
row := chk.GetRow(i)
rows = append(rows, row)
}
chk = chunk.Renew(chk, vars.MaxChunkSize)
}
rs.StoreFetchedRows(rows)

stmt.StoreResultSet(rs)
err = cc.writeColumnInfo(rs.Columns(), mysql.ServerStatusCursorExists)
if err != nil {
Expand All @@ -259,6 +283,14 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}

// as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this
// `ResultSet`.
err = rs.Close()
if err != nil {
return false, err
}

// explicitly flush columnInfo to client.
return false, cc.flush(ctx)
}
Expand Down Expand Up @@ -309,6 +341,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}

return nil
}

Expand Down Expand Up @@ -639,6 +672,7 @@ func (cc *clientConn) handleStmtClose(data []byte) (err error) {
if stmt != nil {
return stmt.Close()
}

return
}

Expand Down
Loading

0 comments on commit 44d7fbe

Please sign in to comment.