Skip to content

Commit

Permalink
session, com_stmt: fetch all rows during EXECUTE command (#42473)
Browse files Browse the repository at this point in the history
close #41891, close #42424
  • Loading branch information
YangKeao authored Mar 23, 2023
1 parent cc02f50 commit 2e8a982
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 539 deletions.
26 changes: 10 additions & 16 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,11 +1340,13 @@ func TestLogAndShowSlowLog(t *testing.T) {
}

func TestReportingMinStartTimestamp(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
se := tk.Session()
_, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)

infoSyncer := dom.InfoSyncer()
sm := &testkit.MockSessionManager{
PS: make([]*util.ProcessInfo, 0),
}
infoSyncer.SetSessionManager(sm)
beforeTS := oracle.GoTimeToTS(time.Now())
infoSyncer.ReportMinStartTS(dom.Store())
afterTS := oracle.GoTimeToTS(time.Now())
Expand All @@ -1353,21 +1355,13 @@ func TestReportingMinStartTimestamp(t *testing.T) {
now := time.Now()
validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse)
lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse)
sm := se.GetSessionManager().(*testkit.MockSessionManager)
sm.PS = []*util.ProcessInfo{
{CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: 0},
{CurTxnStartTS: math.MaxUint64},
{CurTxnStartTS: lowerLimit},
{CurTxnStartTS: validTS},
}
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())

unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS-1, infoSyncer.GetMinStartTS())

unhold()
infoSyncer.SetSessionManager(sm)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())
}
Expand Down
16 changes: 14 additions & 2 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
if sm == nil {
return
}
pl := sm.ShowProcessList()
innerSessionStartTSList := sm.GetInternalSessionStartTSList()

// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
Expand All @@ -810,8 +812,18 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
minStartTS := oracle.GoTimeToTS(now)
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS),
zap.Uint64("StartTSLowerLimit", startTSLowerLimit))
if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS {
minStartTS = ts
for _, info := range pl {
if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}

for _, innerTS := range innerSessionStartTSList {
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS))
kv.PrintLongTimeInternalTxn(now, innerTS, false)
if innerTS > startTSLowerLimit && innerTS < minStartTS {
minStartTS = innerTS
}
}

is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS)
Expand Down
1 change: 0 additions & 1 deletion server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ go_test(
"//util/plancodec",
"//util/resourcegrouptag",
"//util/rowcodec",
"//util/sqlexec",
"//util/stmtsummary/v2:stmtsummary",
"//util/syncutil",
"//util/topsql",
Expand Down
22 changes: 0 additions & 22 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -2331,34 +2331,12 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error {
fetchedRows := rs.GetFetchedRows()
// if fetchedRows is not enough, getting data from recordSet.
// NOTE: chunk should not be allocated from the allocator
// the allocator will reset every statement
// but it maybe stored in the result set among statements
// ref https://github.com/pingcap/tidb/blob/7fc6ebbda4ddf84c0ba801ca7ebb636b934168cf/server/conn_stmt.go#L233-L239
// Here server.tidbResultSet implements Next method.
req := rs.NewChunk(nil)
for len(fetchedRows) < fetchSize {
if err := rs.Next(ctx, req); err != nil {
return err
}
rowCount := req.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
fetchedRows = append(fetchedRows, req.GetRow(i))
}
req = chunk.Renew(req, cc.ctx.GetSessionVars().MaxChunkSize)
}

// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if len(fetchedRows) == 0 {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
terror.Call(rs.Close)
return cc.writeEOF(ctx, serverStatus)
}

Expand Down
61 changes: 47 additions & 14 deletions server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/topsql"
Expand Down Expand Up @@ -285,7 +286,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
}
// since there are multiple implementations of ResultSet (the rs might be wrapped), we have to unwrap the rs before
// casting it to *tidbResultSet.
if result, ok := unwrapResultSet(rs).(*tidbResultSet); ok {
if result, ok := rs.(*tidbResultSet); ok {
if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok {
result.preparedStmt = planCacheStmt
}
Expand All @@ -297,29 +298,54 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
if useCursor {
cc.initResultEncoder(ctx)
defer cc.rsEncoder.clean()
// fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info
// will be set to sleep after fetch returned.
if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 {
unhold := pi.HoldTS(pi.CurTxnStartTS)
rs = &rsWithHooks{ResultSet: rs, onClosed: unhold}
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
//
// TODO: trace the memory used here
chk := rs.NewChunk(nil)
var rows []chunk.Row
for {
if err = rs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
row := chk.GetRow(i)
rows = append(rows, row)
}
chk = chunk.Renew(chk, vars.MaxChunkSize)
}
rs.StoreFetchedRows(rows)

stmt.StoreResultSet(rs)
// also store the preparedParams in the stmt, so we could restore the params in the following fetch command
// the params should have been parsed in `(&cc.ctx).ExecuteStmt(ctx, execStmt)`.
stmt.StorePreparedCtx(&PreparedStatementCtx{
Params: vars.PreparedParams,
})
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
return false, err
}
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}

// as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this
// `ResultSet`.
err = rs.Close()
if err != nil {
return false, err
}

stmt.SetCursorActive(true)

// explicitly flush columnInfo to client.
err = cc.writeEOF(ctx, cc.ctx.Status())
if err != nil {
return false, err
}

return false, cc.flush(ctx)
}
defer terror.Call(rs.Close)
Expand Down Expand Up @@ -351,9 +377,6 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID))
}

cc.ctx.GetSessionVars().PreparedParams = stmt.GetPreparedCtx().Params

if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
Expand All @@ -371,10 +394,19 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID))
}

sendingEOF := false
// if the `fetchedRows` are empty before writing result, we could say the `FETCH` command will send EOF
if len(rs.GetFetchedRows()) == 0 {
sendingEOF = true
}
_, err = cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), int(fetchSize))
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
if sendingEOF {
stmt.SetCursorActive(false)
}

return nil
}

Expand Down Expand Up @@ -712,6 +744,7 @@ func (cc *clientConn) handleStmtClose(data []byte) (err error) {
if stmt != nil {
return stmt.Close()
}

return
}

Expand Down
Loading

0 comments on commit 2e8a982

Please sign in to comment.