From e00887e3b975d07a2217d77490923eae79f8dc56 Mon Sep 17 00:00:00 2001 From: HuaiyuXu <391585975@qq.com> Date: Tue, 31 Dec 2019 21:59:31 +0800 Subject: [PATCH] executor: add memTracker for UpdateExec (#14299) --- executor/executor_test.go | 9 +++++++++ executor/insert.go | 2 +- executor/seqtest/seq_executor_test.go | 14 ++++++++++++++ executor/update.go | 17 ++++++++++++++--- executor/write.go | 11 ++++++++--- 5 files changed, 46 insertions(+), 7 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index ff35bf4edb597..9182e4f40fb58 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4449,6 +4449,15 @@ func (s *testSuite) TestOOMPanicAction(c *C) { c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") _, err = tk.Exec("replace into t select a from t1 order by a desc;") c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + + tk.MustExec("set @@tidb_mem_quota_query=100000;") + tk.MustExec("truncate table t") + tk.MustExec("insert into t values(1),(2),(3)") + // set the memory to quota to make the SQL panic during UpdateExec instead + // of TableReader. + tk.MustExec("set @@tidb_mem_quota_query=244;") + _, err = tk.Exec("update t set a = 4") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") } func setOOMAction(action string) { diff --git a/executor/insert.go b/executor/insert.go index fc3954a9f9d50..8169927ccd125 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -337,7 +337,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle int64, oldRow [] } newData := e.row4Update[:len(oldRow)] - _, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true) + _, handleChanged, newHandle, err := updateRecord(ctx, e.ctx, handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker) if err != nil { return nil, false, 0, err } diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 40dbce6b762c0..ecbffd5afcbc0 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1227,6 +1227,20 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { tk.Se.GetSessionVars().MemQuotaIndexLookupReader = -1 } +func (s *testOOMSuite) TestMemTracker4UpdateExec(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("create table t_MemTracker4UpdateExec (id int, a int, b int, index idx_a(`a`))") + + log.SetLevel(zap.InfoLevel) + s.oom.tracker = "" + tk.MustExec("insert into t_MemTracker4UpdateExec values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 244 + tk.MustExec("update t_MemTracker4UpdateExec set a = 4") + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") +} + func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { //log.SetLevel(zap.FatalLevel) tk := testkit.NewTestKit(c, s.store) diff --git a/executor/update.go b/executor/update.go index aa2491292e2df..602223f2bb003 100644 --- a/executor/update.go +++ b/executor/update.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" + "github.com/pingcap/tidb/util/memory" ) // UpdateExec represents a new update executor. @@ -49,8 +50,8 @@ type UpdateExec struct { tblColPosInfos plannercore.TblColPosInfoSlice evalBuffer chunk.MutRow allAssignmentsAreConstant bool - - fetched bool + fetched bool + memTracker *memory.Tracker } func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]types.Datum, error) { @@ -101,7 +102,7 @@ func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema) ([]typ } // Update row - changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false) + changed, _, _, err1 := updateRecord(ctx, e.ctx, handle, oldData, newTableData, flags, tbl, false, e.memTracker) if err1 == nil { e.updatedRowKeys[content.TblID][handle] = changed continue @@ -174,7 +175,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { if !e.allAssignmentsAreConstant { composeFunc = e.composeNewRow } + memUsageOfChk := int64(0) for { + e.memTracker.Consume(-memUsageOfChk) err := Next(ctx, e.children[0], chk) if err != nil { return err @@ -183,6 +186,9 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { if chk.NumRows() == 0 { break } + memUsageOfChk = chk.MemoryUsage() + e.memTracker.Consume(memUsageOfChk) + firstRowIdx := globalRowIdx for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { chunkRow := chk.GetRow(rowIdx) datumRow := chunkRow.GetDatumRow(fields) @@ -194,6 +200,8 @@ func (e *UpdateExec) fetchChunkRows(ctx context.Context) error { e.newRowsData = append(e.newRowsData, newRow) globalRowIdx++ } + e.memTracker.Consume(types.EstimatedMemUsage(e.rows[firstRowIdx], globalRowIdx-firstRowIdx)) + e.memTracker.Consume(types.EstimatedMemUsage(e.newRowsData[firstRowIdx], globalRowIdx-firstRowIdx)) chk = chunk.Renew(chk, e.maxChunkSize) } return nil @@ -279,6 +287,9 @@ func (e *UpdateExec) Close() error { // Open implements the Executor Open interface. func (e *UpdateExec) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + return e.children[0].Open(ctx) } diff --git a/executor/write.go b/executor/write.go index 55846ea615e29..f5dc79e8f2afa 100644 --- a/executor/write.go +++ b/executor/write.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) @@ -48,13 +49,18 @@ var ( // 3. newHandle (int64) : if handleChanged == true, the newHandle means the new handle after update. // 4. err (error) : error in the update. func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData, newData []types.Datum, modified []bool, t table.Table, - onDup bool) (bool, bool, int64, error) { + onDup bool, memTracker *memory.Tracker) (bool, bool, int64, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context())) defer span1.Finish() ctx = opentracing.ContextWithSpan(ctx, span1) } - + txn, err := sctx.Txn(false) + if err != nil { + return false, false, 0, err + } + memUsageOfTxnState := txn.Size() + defer memTracker.Consume(int64(txn.Size() - memUsageOfTxnState)) sc := sctx.GetSessionVars().StmtCtx changed, handleChanged := false, false // onUpdateSpecified is for "UPDATE SET ts_field = old_value", the @@ -150,7 +156,6 @@ func updateRecord(ctx context.Context, sctx sessionctx.Context, h int64, oldData } // 5. If handle changed, remove the old then add the new record, otherwise update the record. - var err error if handleChanged { if sc.DupKeyAsWarning { // For `UPDATE IGNORE`/`INSERT IGNORE ON DUPLICATE KEY UPDATE`