Skip to content

Commit

Permalink
session, executor: add memTracker for DeleteExec
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu committed Dec 31, 2019
1 parent c1bc9ff commit 9fa2f04
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 8 deletions.
27 changes: 21 additions & 6 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
)

// DeleteExec represents a delete executor.
Expand All @@ -35,6 +36,7 @@ type DeleteExec struct {
// tblColPosInfos stores relationship between column ordinal to its table handle.
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -78,20 +80,22 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
iter := chunk.NewIterator4Chunk(chk)

err := Next(ctx, e.children[0], chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}

memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
if batchDelete && rowCount >= batchDMLSize {
if err = e.ctx.StmtCommit(nil); err != nil {
if err = e.ctx.StmtCommit(e.memTracker); err != nil {
return err
}
if err = e.ctx.NewTxn(ctx); err != nil {
Expand All @@ -108,7 +112,6 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
}
rowCount++
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return nil
Expand All @@ -131,7 +134,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
tblRowMap := make(tableRowMapType)
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
iter := chunk.NewIterator4Chunk(chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
Expand All @@ -140,12 +145,13 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
if chk.NumRows() == 0 {
break
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)

for joinedChunkRow := iter.Begin(); joinedChunkRow != iter.End(); joinedChunkRow = iter.Next() {
joinedDatumRow := joinedChunkRow.GetDatumRow(fields)
e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow)
}
chk = chunk.Renew(chk, e.maxChunkSize)
}

return e.removeRowsInTblRowMap(tblRowMap)
Expand All @@ -165,10 +171,16 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
}

func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error {
err := t.RemoveRecord(ctx, h, data)
// err is always nil if `active` is false.
txnState, err := e.ctx.Txn(false)
if err != nil {
}
memUsageOfTxnState := txnState.Size()
err = t.RemoveRecord(ctx, h, data)
if err != nil {
return err
}
e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState))
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}
Expand All @@ -180,6 +192,9 @@ func (e *DeleteExec) Close() error {

// Open implements the Executor Open interface.
func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
}

Expand Down
16 changes: 16 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4449,6 +4449,22 @@ func (s *testSuite) TestOOMPanicAction(c *C) {
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
_, err = tk.Exec("replace into t select a from t1 order by a desc;")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=10000")
tk.MustExec("insert into t values (1),(2),(3),(4),(5);")
// Set the memory quota to 244 to make this SQL panic during the DeleteExec
// instead of the TableReaderExec.
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("delete from t")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=10000;")
tk.MustExec("delete from t1")
tk.MustExec("insert into t1 values(1)")
tk.MustExec("insert into t values (1),(2),(3),(4),(5);")
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("delete t, t1 from t join t1 on t.a = t1.a")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}

func setOOMAction(action string) {
Expand Down
27 changes: 26 additions & 1 deletion executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,6 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
}

func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) {
//log.SetLevel(zap.FatalLevel)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))")
Expand Down Expand Up @@ -1285,6 +1284,32 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) {
tk.Se.GetSessionVars().MemQuotaQuery = -1
}

func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("create table t2 (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
s.oom.tracker = ""
tk.MustExec("delete from t1")
c.Assert(s.oom.tracker, Equals, "")
tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("delete from t1")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")

tk.Se.GetSessionVars().MemQuotaQuery = 10000
tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a")
}

type oomCapturer struct {
zapcore.Core
tracker string
Expand Down
11 changes: 10 additions & 1 deletion session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,16 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture {

// StmtCommit implements the sessionctx.Context interface.
func (s *session) StmtCommit(memTracker *memory.Tracker) error {
defer s.txn.cleanup()
defer func() {
s.txn.cleanup()
// If StmtCommit is called in batch mode, we need to clear the txn size
// in memTracker to avoid double-counting. If it's not batch mode, this
// work has no effect because that no more data will be appended into
// s.txn.
if memTracker != nil {
memTracker.Consume(int64(-s.txn.Size()))
}
}()
st := &s.txn
txnSize := st.Transaction.Size()
var count int
Expand Down

0 comments on commit 9fa2f04

Please sign in to comment.