From a7ff591368af4e9e24d23a30136ba3225c0a330a Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Tue, 31 Dec 2019 22:06:20 +0800 Subject: [PATCH] session, executor: add memTracker for DeleteExec (#14289) --- executor/delete.go | 25 ++++++++++-- executor/executor_test.go | 15 +++++++ executor/seqtest/seq_executor_test.go | 59 ++++++++++++++++++++------- session/txn.go | 11 ++++- 4 files changed, 91 insertions(+), 19 deletions(-) diff --git a/executor/delete.go b/executor/delete.go index c5c179d5679a5..69d0c5a9b4503 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" ) // DeleteExec represents a delete executor. @@ -35,6 +36,7 @@ type DeleteExec struct { // tblColPosInfos stores relationship between column ordinal to its table handle. // the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos tblColPosInfos plannercore.TblColPosInfoSlice + memTracker *memory.Tracker } // Next implements the Executor Next interface. @@ -78,9 +80,10 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize fields := retTypes(e.children[0]) chk := newFirstChunk(e.children[0]) + memUsageOfChk := int64(0) for { + e.memTracker.Consume(-memUsageOfChk) iter := chunk.NewIterator4Chunk(chk) - err := Next(ctx, e.children[0], chk) if err != nil { return err @@ -88,10 +91,11 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { if chk.NumRows() == 0 { break } - + memUsageOfChk = chk.MemoryUsage() + e.memTracker.Consume(memUsageOfChk) for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() { if batchDelete && rowCount >= batchDMLSize { - if err = e.ctx.StmtCommit(nil); err != nil { + if err = e.ctx.StmtCommit(e.memTracker); err != nil { return err } if err = e.ctx.NewTxn(ctx); err != nil { @@ -131,7 +135,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { tblRowMap := make(tableRowMapType) fields := retTypes(e.children[0]) chk := newFirstChunk(e.children[0]) + memUsageOfChk := int64(0) for { + e.memTracker.Consume(-memUsageOfChk) iter := chunk.NewIterator4Chunk(chk) err := Next(ctx, e.children[0], chk) if err != nil { @@ -140,6 +146,8 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { if chk.NumRows() == 0 { break } + memUsageOfChk = chk.MemoryUsage() + e.memTracker.Consume(memUsageOfChk) for joinedChunkRow := iter.Begin(); joinedChunkRow != iter.End(); joinedChunkRow = iter.Next() { joinedDatumRow := joinedChunkRow.GetDatumRow(fields) @@ -165,10 +173,16 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error { } func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error { - err := t.RemoveRecord(ctx, h, data) + txnState, err := e.ctx.Txn(false) + if err != nil { + return err + } + memUsageOfTxnState := txnState.Size() + err = t.RemoveRecord(ctx, h, data) if err != nil { return err } + e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState)) ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) return nil } @@ -180,6 +194,9 @@ func (e *DeleteExec) Close() error { // Open implements the Executor Open interface. func (e *DeleteExec) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + return e.children[0].Open(ctx) } diff --git a/executor/executor_test.go b/executor/executor_test.go index 9182e4f40fb58..531139ac4fee2 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4450,6 +4450,21 @@ func (s *testSuite) TestOOMPanicAction(c *C) { _, err = tk.Exec("replace into t select a from t1 order by a desc;") c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + tk.MustExec("set @@tidb_mem_quota_query=10000") + tk.MustExec("insert into t values (1),(2),(3),(4),(5);") + // Set the memory quota to 244 to make this SQL panic during the DeleteExec + // instead of the TableReaderExec. + tk.MustExec("set @@tidb_mem_quota_query=244;") + _, err = tk.Exec("delete from t") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + + tk.MustExec("set @@tidb_mem_quota_query=10000;") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 values(1)") + tk.MustExec("insert into t values (1),(2),(3),(4),(5);") + tk.MustExec("set @@tidb_mem_quota_query=244;") + _, err = tk.Exec("delete t, t1 from t join t1 on t.a = t1.a") + tk.MustExec("set @@tidb_mem_quota_query=100000;") tk.MustExec("truncate table t") tk.MustExec("insert into t values(1),(2),(3)") diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index ecbffd5afcbc0..dd6d64ea4e249 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1242,63 +1242,94 @@ func (s *testOOMSuite) TestMemTracker4UpdateExec(c *C) { } func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { - //log.SetLevel(zap.FatalLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table t_MemTracker4InsertAndReplaceExec (id int, a int, b int, index idx_a(`a`))") log.SetLevel(zap.InfoLevel) s.oom.tracker = "" - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("insert into t1 select * from t") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 select * from t") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 select * from t") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 select * from t") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 tk.Se.GetSessionVars().DMLBatchSize = 1 tk.Se.GetSessionVars().BatchInsert = true s.oom.tracker = "" - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 } +func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table MemTracker4DeleteExec1 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table MemTracker4DeleteExec2 (id int, a int, b int, index idx_a(`a`))") + + // delete from single table + log.SetLevel(zap.InfoLevel) + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + s.oom.tracker = "" + tk.MustExec("delete from MemTracker4DeleteExec1") + c.Assert(s.oom.tracker, Equals, "") + tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("delete from MemTracker4DeleteExec1") + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") + + // delete from multiple table + tk.Se.GetSessionVars().MemQuotaQuery = 100000 + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)") + tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)") + s.oom.tracker = "" + tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a") + c.Assert(s.oom.tracker, Equals, "") + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)") + tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)") + s.oom.tracker = "" + tk.Se.GetSessionVars().MemQuotaQuery = 10000 + tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a") + c.Assert(s.oom.tracker, Equals, "expensive_query during bootstrap phase") +} + type oomCapturer struct { zapcore.Core tracker string diff --git a/session/txn.go b/session/txn.go index 948fc42866415..a487b358e6a19 100755 --- a/session/txn.go +++ b/session/txn.go @@ -448,7 +448,16 @@ func (s *session) HasDirtyContent(tid int64) bool { // StmtCommit implements the sessionctx.Context interface. func (s *session) StmtCommit(memTracker *memory.Tracker) error { - defer s.txn.cleanup() + defer func() { + // If StmtCommit is called in batch mode, we need to clear the txn size + // in memTracker to avoid double-counting. If it's not batch mode, this + // work has no effect because that no more data will be appended into + // s.txn. + if memTracker != nil { + memTracker.Consume(int64(-s.txn.Size())) + } + s.txn.cleanup() + }() st := &s.txn txnSize := st.Transaction.Size() var count int