Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session, executor: add memTracker for DeleteExec #14289

Merged
merged 6 commits into from
Dec 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
)

// DeleteExec represents a delete executor.
Expand All @@ -35,6 +36,7 @@ type DeleteExec struct {
// tblColPosInfos stores relationship between column ordinal to its table handle.
// the columns ordinals is present in ordinal range format, @see plannercore.TblColPosInfos
tblColPosInfos plannercore.TblColPosInfoSlice
memTracker *memory.Tracker
}

// Next implements the Executor Next interface.
Expand Down Expand Up @@ -78,20 +80,22 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {
batchDMLSize := e.ctx.GetSessionVars().DMLBatchSize
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
iter := chunk.NewIterator4Chunk(chk)

err := Next(ctx, e.children[0], chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}

memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
if batchDelete && rowCount >= batchDMLSize {
if err = e.ctx.StmtCommit(nil); err != nil {
if err = e.ctx.StmtCommit(e.memTracker); err != nil {
return err
}
if err = e.ctx.NewTxn(ctx); err != nil {
Expand Down Expand Up @@ -131,7 +135,9 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
tblRowMap := make(tableRowMapType)
fields := retTypes(e.children[0])
chk := newFirstChunk(e.children[0])
memUsageOfChk := int64(0)
for {
e.memTracker.Consume(-memUsageOfChk)
iter := chunk.NewIterator4Chunk(chk)
err := Next(ctx, e.children[0], chk)
if err != nil {
Expand All @@ -140,6 +146,8 @@ func (e *DeleteExec) deleteMultiTablesByChunk(ctx context.Context) error {
if chk.NumRows() == 0 {
break
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)

for joinedChunkRow := iter.Begin(); joinedChunkRow != iter.End(); joinedChunkRow = iter.Next() {
joinedDatumRow := joinedChunkRow.GetDatumRow(fields)
Expand All @@ -165,10 +173,16 @@ func (e *DeleteExec) removeRowsInTblRowMap(tblRowMap tableRowMapType) error {
}

func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h int64, data []types.Datum) error {
err := t.RemoveRecord(ctx, h, data)
txnState, err := e.ctx.Txn(false)
if err != nil {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
return err
}
memUsageOfTxnState := txnState.Size()
err = t.RemoveRecord(ctx, h, data)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only one line change, is it easier to estimate the size of data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the existing interface is easier?

if err != nil {
return err
}
e.memTracker.Consume(int64(txnState.Size() - memUsageOfTxnState))
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
return nil
}
Expand All @@ -180,6 +194,9 @@ func (e *DeleteExec) Close() error {

// Open implements the Executor Open interface.
func (e *DeleteExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

return e.children[0].Open(ctx)
}

Expand Down
15 changes: 15 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4450,6 +4450,21 @@ func (s *testSuite) TestOOMPanicAction(c *C) {
_, err = tk.Exec("replace into t select a from t1 order by a desc;")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=10000")
tk.MustExec("insert into t values (1),(2),(3),(4),(5);")
// Set the memory quota to 244 to make this SQL panic during the DeleteExec
// instead of the TableReaderExec.
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("delete from t")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")

tk.MustExec("set @@tidb_mem_quota_query=10000;")
tk.MustExec("delete from t1")
tk.MustExec("insert into t1 values(1)")
tk.MustExec("insert into t values (1),(2),(3),(4),(5);")
tk.MustExec("set @@tidb_mem_quota_query=244;")
_, err = tk.Exec("delete t, t1 from t join t1 on t.a = t1.a")

tk.MustExec("set @@tidb_mem_quota_query=100000;")
tk.MustExec("truncate table t")
tk.MustExec("insert into t values(1),(2),(3)")
Expand Down
59 changes: 45 additions & 14 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,63 +1242,94 @@ func (s *testOOMSuite) TestMemTracker4UpdateExec(c *C) {
}

func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) {
//log.SetLevel(zap.FatalLevel)
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("create table t_MemTracker4InsertAndReplaceExec (id int, a int, b int, index idx_a(`a`))")

log.SetLevel(zap.InfoLevel)
s.oom.tracker = ""
tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1

s.oom.tracker = ""
tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1

s.oom.tracker = ""
tk.MustExec("insert into t1 select * from t")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t1 select * from t")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec select * from t")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1

s.oom.tracker = ""
tk.MustExec("replace into t1 select * from t")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t1 select * from t")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec select * from t")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1

tk.Se.GetSessionVars().DMLBatchSize = 1
tk.Se.GetSessionVars().BatchInsert = true
s.oom.tracker = ""
tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("insert into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1

s.oom.tracker = ""
tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)")
tk.MustExec("replace into t_MemTracker4InsertAndReplaceExec values (1,1,1), (2,2,2), (3,3,3)")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")
tk.Se.GetSessionVars().MemQuotaQuery = -1
}

func (s *testOOMSuite) TestMemTracker4DeleteExec(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("create table MemTracker4DeleteExec1 (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("create table MemTracker4DeleteExec2 (id int, a int, b int, index idx_a(`a`))")

// delete from single table
log.SetLevel(zap.InfoLevel)
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5)")
s.oom.tracker = ""
tk.MustExec("delete from MemTracker4DeleteExec1")
c.Assert(s.oom.tracker, Equals, "")
tk.MustExec("insert into MemTracker4DeleteExec1 values (1,1,1), (2,2,2), (3,3,3)")
tk.Se.GetSessionVars().MemQuotaQuery = 1
tk.MustExec("delete from MemTracker4DeleteExec1")
c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase")

// delete from multiple table
tk.Se.GetSessionVars().MemQuotaQuery = 100000
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)")
tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)")
s.oom.tracker = ""
tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a")
c.Assert(s.oom.tracker, Equals, "")
tk.MustExec("insert into MemTracker4DeleteExec1 values(1,1,1)")
tk.MustExec("insert into MemTracker4DeleteExec2 values(1,1,1)")
s.oom.tracker = ""
tk.Se.GetSessionVars().MemQuotaQuery = 10000
tk.MustExec("delete MemTracker4DeleteExec1, MemTracker4DeleteExec2 from MemTracker4DeleteExec1 join MemTracker4DeleteExec2 on MemTracker4DeleteExec1.a=MemTracker4DeleteExec2.a")
c.Assert(s.oom.tracker, Equals, "expensive_query during bootstrap phase")
}

type oomCapturer struct {
zapcore.Core
tracker string
Expand Down
11 changes: 10 additions & 1 deletion session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,16 @@ func (s *session) HasDirtyContent(tid int64) bool {

// StmtCommit implements the sessionctx.Context interface.
func (s *session) StmtCommit(memTracker *memory.Tracker) error {
defer s.txn.cleanup()
defer func() {
// If StmtCommit is called in batch mode, we need to clear the txn size
// in memTracker to avoid double-counting. If it's not batch mode, this
// work has no effect because that no more data will be appended into
// s.txn.
if memTracker != nil {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
memTracker.Consume(int64(-s.txn.Size()))
}
s.txn.cleanup()
}()
st := &s.txn
txnSize := st.Transaction.Size()
var count int
Expand Down