From 9fa2f04a9fef0727001ac2fe4455ec78616a1cec Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 31 Dec 2019 10:55:51 +0800 Subject: [PATCH 1/4] session, executor: add memTracker for DeleteExec --- executor/delete.go | 27 +++++++++++++++++++++------ executor/executor_test.go | 16 ++++++++++++++++ executor/seqtest/seq_executor_test.go | 27 ++++++++++++++++++++++++++- session/txn.go | 11 ++++++++++- 4 files changed, 73 insertions(+), 8 deletions(-) diff --git a/executor/delete.go b/executor/delete.go index c5c179d5679a5..85bc798457999 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" ) // DeleteExec represents a delete executor. @@ -35,6 +36,7 @@ type DeleteExec struct { // tblColPosInfos stores relationship between column ordinal to its table handle. // the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos tblColPosInfos plannercore.TblColPosInfoSlice + memTracker *memory.Tracker } // Next implements the Executor Next interface. @@ -78,9 +80,10 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize fields := retTypes(e.children[0]) chk := newFirstChunk(e.children[0]) + memUsageOfChk := int64(0) for { + e.memTracker.Consume(-memUsageOfChk) iter := chunk.NewIterator4Chunk(chk) - err := Next(ctx, e.children[0], chk) if err != nil { return err @@ -88,10 +91,11 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { if chk.NumRows() == 0 { break } - + memUsageOfChk = chk.MemoryUsage() + e.memTracker.Consume(memUsageOfChk) for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() { if batchDelete && rowCount >= batchDMLSize { - if err = e.ctx.StmtCommit(nil); err != nil { + if err = e.ctx.StmtCommit(e.memTracker); err != nil { return err } if err = e.ctx.NewTxn(ctx); err != nil { @@ -108,7 +112,6 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { } rowCount++ } - chk = chunk.Renew(chk, e.maxChunkSize) } return nil @@ -131,7 +134,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { tblRowMap := make(tableRowMapType) fields := retTypes(e.children[0]) chk := newFirstChunk(e.children[0]) + memUsageOfChk := int64(0) for { + e.memTracker.Consume(-memUsageOfChk) iter := chunk.NewIterator4Chunk(chk) err := Next(ctx, e.children[0], chk) if err != nil { @@ -140,12 +145,13 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { if chk.NumRows() == 0 { break } + memUsageOfChk = chk.MemoryUsage() + e.memTracker.Consume(memUsageOfChk) for joinedChunkRow := iter.Begin(); joinedChunkRow != iter.End(); joinedChunkRow = iter.Next() { joinedDatumRow := joinedChunkRow.GetDatumRow(fields) e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow) } - chk = chunk.Renew(chk, e.maxChunkSize) } return e.removeRowsInTblRowMap(tblRowMap) @@ -165,10 +171,16 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error { } func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error { - err := t.RemoveRecord(ctx, h, data) + // err is always nil if `active` is false. + txnState, err := e.ctx.Txn(false) + if err != nil { + } + memUsageOfTxnState := txnState.Size() + err = t.RemoveRecord(ctx, h, data) if err != nil { return err } + e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState)) ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) return nil } @@ -180,6 +192,9 @@ func (e *DeleteExec) Close() error { // Open implements the Executor Open interface. func (e *DeleteExec) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + return e.children[0].Open(ctx) } diff --git a/executor/executor_test.go b/executor/executor_test.go index ff35bf4edb597..07441a90024cb 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4449,6 +4449,22 @@ func (s *testSuite) TestOOMPanicAction(c *C) { c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") _, err = tk.Exec("replace into t select a from t1 order by a desc;") c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + + tk.MustExec("set @@tidb_mem_quota_query=10000") + tk.MustExec("insert into t values (1),(2),(3),(4),(5);") + // Set the memory quota to 244 to make this SQL panic during the DeleteExec + // instead of the TableReaderExec. + tk.MustExec("set @@tidb_mem_quota_query=244;") + _, err = tk.Exec("delete from t") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + + tk.MustExec("set @@tidb_mem_quota_query=10000;") + tk.MustExec("delete from t1") + tk.MustExec("insert into t1 values(1)") + tk.MustExec("insert into t values (1),(2),(3),(4),(5);") + tk.MustExec("set @@tidb_mem_quota_query=244;") + _, err = tk.Exec("delete t, t1 from t join t1 on t.a = t1.a") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") } func setOOMAction(action string) { diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 40dbce6b762c0..768634b9667e0 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1228,7 +1228,6 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { } func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { - //log.SetLevel(zap.FatalLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") @@ -1285,6 +1284,32 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { tk.Se.GetSessionVars().MemQuotaQuery = -1 } +func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table t2 (id int, a int, b int, index idx_a(`a`))") + + log.SetLevel(zap.InfoLevel) + tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + s.oom.tracker = "" + tk.MustExec("delete from t1") + c.Assert(s.oom.tracker, Equals, "") + tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("delete from t1") + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") + + tk.Se.GetSessionVars().MemQuotaQuery = 10000 + tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a") +} + type oomCapturer struct { zapcore.Core tracker string diff --git a/session/txn.go b/session/txn.go index a8263f73c2322..dbc10c1ec62d0 100755 --- a/session/txn.go +++ b/session/txn.go @@ -438,7 +438,16 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { // StmtCommit implements the sessionctx.Context interface. func (s *session) StmtCommit(memTracker *memory.Tracker) error { - defer s.txn.cleanup() + defer func() { + s.txn.cleanup() + // If StmtCommit is called in batch mode, we need to clear the txn size + // in memTracker to avoid double-counting. If it's not batch mode, this + // work has no effect because that no more data will be appended into + // s.txn. + if memTracker != nil { + memTracker.Consume(int64(-s.txn.Size())) + } + }() st := &s.txn txnSize := st.Transaction.Size() var count int From 3824de95a9421f83bef67e651009adc77edc9e04 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 31 Dec 2019 11:16:17 +0800 Subject: [PATCH 2/4] refine test --- executor/seqtest/seq_executor_test.go | 58 +++++++++++++++------------ 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 768634b9667e0..f3357ddc465fe 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1230,56 +1230,56 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table t_MemTracker4InsertAndReplaceExec (id int, a int, b int, index idx_a(`a`))") log.SetLevel(zap.InfoLevel) s.oom.tracker = "" - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("insert into t1 select * from t") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 select * from t") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 select * from t") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 select * from t") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 tk.Se.GetSessionVars().DMLBatchSize = 1 tk.Se.GetSessionVars().BatchInsert = true s.oom.tracker = "" - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 } @@ -1287,27 +1287,33 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") - tk.MustExec("create table t2 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table MemTracker4DeleteExec1 (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table MemTracker4DeleteExec2 (id int, a int, b int, index idx_a(`a`))") + // delete from single table log.SetLevel(zap.InfoLevel) - tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") s.oom.tracker = "" - tk.MustExec("delete from t1") + tk.MustExec("delete from MemTracker4DeleteExec1") c.Assert(s.oom.tracker, Equals, "") - tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("delete from t1") + tk.MustExec("delete from MemTracker4DeleteExec1") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") + // delete from multiple table + tk.Se.GetSessionVars().MemQuotaQuery = 100000 + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)") + tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)") + s.oom.tracker = "" + tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a") + c.Assert(s.oom.tracker, Equals, "") + tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)") + tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)") + s.oom.tracker = "" tk.Se.GetSessionVars().MemQuotaQuery = 10000 - tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") - tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") - tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a") - tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") - tk.MustExec("insert into t2 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)") - tk.MustExec("delete t1, t2 from t1 join t2 on t1.a=t2.a") + tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a") + c.Assert(s.oom.tracker, Equals, "expensive_query during bootstrap phase") } type oomCapturer struct { From f04c0e63f56fd865cda310fd69d44ba523334404 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 31 Dec 2019 14:35:56 +0800 Subject: [PATCH 3/4] address comment --- executor/delete.go | 2 +- session/txn.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/delete.go b/executor/delete.go index 85bc798457999..09613baf4f9ca 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -171,9 +171,9 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error { } func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error { - // err is always nil if `active` is false. txnState, err := e.ctx.Txn(false) if err != nil { + return err } memUsageOfTxnState := txnState.Size() err = t.RemoveRecord(ctx, h, data) diff --git a/session/txn.go b/session/txn.go index dbc10c1ec62d0..c4ec8dfa90c5e 100755 --- a/session/txn.go +++ b/session/txn.go @@ -439,7 +439,6 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { // StmtCommit implements the sessionctx.Context interface. func (s *session) StmtCommit(memTracker *memory.Tracker) error { defer func() { - s.txn.cleanup() // If StmtCommit is called in batch mode, we need to clear the txn size // in memTracker to avoid double-counting. If it's not batch mode, this // work has no effect because that no more data will be appended into @@ -447,6 +446,7 @@ func (s *session) StmtCommit(memTracker *memory.Tracker) error { if memTracker != nil { memTracker.Consume(int64(-s.txn.Size())) } + s.txn.cleanup() }() st := &s.txn txnSize := st.Transaction.Size() From 75cd749f293e65aba84649075d8848b5c66d9da1 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 31 Dec 2019 19:18:02 +0800 Subject: [PATCH 4/4] address comment --- executor/delete.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/delete.go b/executor/delete.go index 09613baf4f9ca..69d0c5a9b4503 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -112,6 +112,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { } rowCount++ } + chk = chunk.Renew(chk, e.maxChunkSize) } return nil @@ -152,6 +153,7 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error { joinedDatumRow := joinedChunkRow.GetDatumRow(fields) e.composeTblRowMap(tblRowMap, colPosInfos, joinedDatumRow) } + chk = chunk.Renew(chk, e.maxChunkSize) } return e.removeRowsInTblRowMap(tblRowMap)