From e4bffdbaa3d89b6496987976b5623bbae8dc521b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 23 Dec 2019 14:31:38 +0800 Subject: [PATCH 01/24] *: add memory tracker for InsertExec and ReplaceExec --- executor/delete.go | 2 +- executor/insert.go | 10 ++++-- executor/insert_common.go | 65 +++++++++++++++++++++++++++++++-------- executor/load_data.go | 2 +- executor/replace.go | 10 ++++-- session/session.go | 3 +- session/tidb.go | 2 +- session/txn.go | 11 ++++++- sessionctx/context.go | 3 +- types/datum.go | 24 +++++++++++++++ util/mock/context.go | 2 +- 11 files changed, 111 insertions(+), 23 deletions(-) diff --git a/executor/delete.go b/executor/delete.go index 00727a265d38e..c5c179d5679a5 100644 --- a/executor/delete.go +++ b/executor/delete.go @@ -91,7 +91,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error { for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() { if batchDelete && rowCount >= batchDMLSize { - if err = e.ctx.StmtCommit(); err != nil { + if err = e.ctx.StmtCommit(nil); err != nil { return err } if err = e.ctx.NewTxn(ctx); err != nil { diff --git a/executor/insert.go b/executor/insert.go index fcdc6cd8ccb38..812a617e234ba 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" "go.uber.org/zap" ) @@ -40,10 +41,11 @@ type InsertExec struct { curInsertVals chunk.MutRow row4Update []types.Datum - Priority mysql.PriorityEnum + Priority mysql.PriorityEnum + memTracker *memory.Tracker } -func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { +func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error { logutil.Eventf(ctx, "insert %d rows into table `%s`", len(rows), stringutil.MemoizeStr(func() string { var tblName string if meta := e.Table.Meta(); meta != nil { @@ -86,6 +88,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { } } } + memTracker.Consume(int64(txn.Size())) return nil } @@ -266,6 +269,9 @@ func (e *InsertExec) Close() error { // Open implements the Executor Open interface. func (e *InsertExec) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + if e.OnDuplicate != nil { e.initEvalBuffer4Dup() } diff --git a/executor/insert_common.go b/executor/insert_common.go index 610e9e78d1619..d0d22f4a70ebf 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) @@ -78,14 +79,14 @@ type defaultVal struct { type insertCommon interface { insertCommon() *InsertValues - exec(ctx context.Context, rows [][]types.Datum) error + exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error } func (e *InsertValues) insertCommon() *InsertValues { return e } -func (e *InsertValues) exec(ctx context.Context, rows [][]types.Datum) error { +func (e *InsertValues) exec(_ context.Context, _ [][]types.Datum, _ *memory.Tracker) error { panic("derived should overload exec function") } @@ -200,6 +201,13 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { if err = e.processSetList(); err != nil { return err } + var memTracker *memory.Tracker + if insertExec, ok := base.(*InsertExec); ok { + memTracker = insertExec.memTracker + } else { + replaceExec := base.(*ReplaceExec) + memTracker = replaceExec.memTracker + } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML batchSize := sessVars.DMLBatchSize @@ -211,6 +219,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { } rows := make([][]types.Datum, 0, len(e.Lists)) + memUsageOfRows := int64(0) for i, list := range e.Lists { e.rowCount++ var row []types.Datum @@ -220,26 +229,40 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { } rows = append(rows, row) if batchInsert && e.rowCount%uint64(batchSize) == 0 { + memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) + memTracker.Consume(memUsageOfRows) // Before batch insert, fill the batch allocated autoIDs. rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows) if err != nil { return err } - if err = base.exec(ctx, rows); err != nil { + if err = base.exec(ctx, rows, memTracker); err != nil { return err } rows = rows[:0] - if err = e.doBatchInsert(ctx); err != nil { + memTracker.Consume(-memUsageOfRows) + memUsageOfRows = 0 + if err = e.doBatchInsert(ctx, memTracker); err != nil { return err } } } + if len(rows) != 0 { + memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) + memTracker.Consume(memUsageOfRows) + } // Fill the batch allocated autoIDs. rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows) if err != nil { return err } - return base.exec(ctx, rows) + err = base.exec(ctx, rows, memTracker) + if err != nil { + return err + } + rows = rows[:0] + memTracker.Consume(-memUsageOfRows) + return nil } func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int, err error) error { @@ -376,6 +399,13 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { chk := newFirstChunk(selectExec) iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) + var memTracker *memory.Tracker + if insertExec, ok := base.(*InsertExec); ok { + memTracker = insertExec.memTracker + } else { + replaceExec := base.(*ReplaceExec) + memTracker = replaceExec.memTracker + } sessVars := e.ctx.GetSessionVars() if !sessVars.StrictSQLMode { @@ -384,7 +414,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { } batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML batchSize := sessVars.DMLBatchSize - + memUsageOfRows := int64(0) for { err := Next(ctx, selectExec, chk) if err != nil { @@ -393,7 +423,8 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { if chk.NumRows() == 0 { break } - + chkMemUsage := chk.MemoryUsage() + memTracker.Consume(chkMemUsage) for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() { innerRow := innerChunkRow.GetDatumRow(fields) e.rowCount++ @@ -403,28 +434,38 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { } rows = append(rows, row) if batchInsert && e.rowCount%uint64(batchSize) == 0 { - if err = base.exec(ctx, rows); err != nil { + memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) + memTracker.Consume(memUsageOfRows) + if err = base.exec(ctx, rows, memTracker); err != nil { return err } rows = rows[:0] - if err = e.doBatchInsert(ctx); err != nil { + memTracker.Consume(-memUsageOfRows) + memUsageOfRows = 0 + if err = e.doBatchInsert(ctx, memTracker); err != nil { return err } } } - err = base.exec(ctx, rows) + if len(rows) != 0 { + memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) + memTracker.Consume(memUsageOfRows) + } + err = base.exec(ctx, rows, memTracker) if err != nil { return err } rows = rows[:0] + memTracker.Consume(-memUsageOfRows) + memTracker.Consume(-chkMemUsage) } return nil } -func (e *InsertValues) doBatchInsert(ctx context.Context) error { +func (e *InsertValues) doBatchInsert(ctx context.Context, memTracker *memory.Tracker) error { sessVars := e.ctx.GetSessionVars() - if err := e.ctx.StmtCommit(); err != nil { + if err := e.ctx.StmtCommit(memTracker); err != nil { return err } if err := e.ctx.NewTxn(ctx); err != nil { diff --git a/executor/load_data.go b/executor/load_data.go index 189ff4ca500b0..f922a877114c8 100644 --- a/executor/load_data.go +++ b/executor/load_data.go @@ -205,7 +205,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error failpoint.Inject("commitOneTaskErr", func() error { return errors.New("mock commit one task error") }) - if err = e.Ctx.StmtCommit(); err != nil { + if err = e.Ctx.StmtCommit(nil); err != nil { logutil.Logger(ctx).Error("commit error commit", zap.Error(err)) return err } diff --git a/executor/replace.go b/executor/replace.go index 2571a5a675e78..916081388d3c1 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -25,13 +25,15 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "go.uber.org/zap" ) // ReplaceExec represents a replace executor. type ReplaceExec struct { *InsertValues - Priority int + Priority int + memTracker *memory.Tracker } // Close implements the Executor Close interface. @@ -45,6 +47,9 @@ func (e *ReplaceExec) Close() error { // Open implements the Executor Open interface. func (e *ReplaceExec) Open(ctx context.Context) error { + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + if e.SelectExec != nil { return e.SelectExec.Open(ctx) } @@ -164,7 +169,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r return false, false, nil } -func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { +func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum, memTracker *memory.Tracker) error { /* * MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE): * 1. Try to insert the new row into the table @@ -202,6 +207,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { return err } } + memTracker.Consume(int64(txn.Size())) return nil } diff --git a/session/session.go b/session/session.go index 7c0f8da70fb61..8681d6e462855 100644 --- a/session/session.go +++ b/session/session.go @@ -689,7 +689,8 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { s.StmtRollback() break } - err = s.StmtCommit() + // TODO: pass the memTracker here. + err = s.StmtCommit(nil) if err != nil { return err } diff --git a/session/tidb.go b/session/tidb.go index 997111000eefa..7fa271cdbed09 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -277,7 +277,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) if err != nil { sctx.StmtRollback() } else { - err = sctx.StmtCommit() + err = sctx.StmtCommit(nil) } } } else { diff --git a/session/txn.go b/session/txn.go index 9bce3b7a957e0..54037e98421d7 100755 --- a/session/txn.go +++ b/session/txn.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" ) @@ -61,6 +62,11 @@ func (st *TxnState) init() { st.mutations = make(map[int64]*binlog.TableMutation) } +// Size implements the MemBuffer interface. +func (st *TxnState) Size() int { + return st.buf.Size() +} + // Valid implements the kv.Transaction interface. func (st *TxnState) Valid() bool { return st.Transaction != nil && st.Transaction.Valid() @@ -431,7 +437,7 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { } // StmtCommit implements the sessionctx.Context interface. -func (s *session) StmtCommit() error { +func (s *session) StmtCommit(memTracker *memory.Tracker) error { defer s.txn.cleanup() st := &s.txn var count int @@ -455,6 +461,9 @@ func (s *session) StmtCommit() error { st.doNotCommit = err return err } + if memTracker != nil { + memTracker.Consume(int64(st.Transaction.Size())) + } // Need to flush binlog. for tableID, delta := range st.mutations { diff --git a/sessionctx/context.go b/sessionctx/context.go index 6c0b7e100aebe..37b6a4c10e3b8 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" + "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tipb/go-binlog" ) @@ -74,7 +75,7 @@ type Context interface { StoreQueryFeedback(feedback interface{}) // StmtCommit flush all changes by the statement to the underlying transaction. - StmtCommit() error + StmtCommit(tracker *memory.Tracker) error // StmtRollback provides statement level rollback. StmtRollback() // StmtGetMutation gets the binlog mutation for current statement. diff --git a/types/datum.go b/types/datum.go index 525ce3be7b2b4..9238dae8e758d 100644 --- a/types/datum.go +++ b/types/datum.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" + "unsafe" ) // Kind constants. @@ -2057,3 +2058,26 @@ func ChangeReverseResultByUpperLowerBound( } return d, nil } + +const sizeOfEmptyDatum = int(unsafe.Sizeof(Datum{})) + +// EstimatedMemUsage returns the estimated bytes consumed of a one-dimensional +// or two-dimensional datum array. +func EstimatedMemUsage(array []Datum, numOfRows int) int64 { + if numOfRows == 0 { + return 0 + } + var bytesConsumed int + for _, d := range array { + switch d.Kind() { + case KindString, KindBytes, KindBinaryLiteral, KindMysqlJSON, KindMysqlEnum, KindMysqlSet, KindMysqlBit: + bytesConsumed += len(d.b) + case KindMysqlDecimal: + bytesConsumed += int(unsafe.Sizeof(d.GetMysqlDecimal())) + case KindMysqlTime: + bytesConsumed += int(unsafe.Sizeof(d.GetMysqlTime())) + } + } + bytesConsumed += len(array) * sizeOfEmptyDatum + return int64(bytesConsumed * numOfRows) +} diff --git a/util/mock/context.go b/util/mock/context.go index 41f5625cca6cd..4bbb3b84f847a 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -199,7 +199,7 @@ func (c *Context) GoCtx() context.Context { func (c *Context) StoreQueryFeedback(_ interface{}) {} // StmtCommit implements the sessionctx.Context interface. -func (c *Context) StmtCommit() error { +func (c *Context) StmtCommit(tracker *memory.Tracker) error { return nil } From 99a657422270dfde2b1d46106de749161a3eeecf Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 23 Dec 2019 17:59:07 +0800 Subject: [PATCH 02/24] test --- ddl/db_test.go | 2 +- executor/distsql.go | 14 +++++++------- executor/executor_test.go | 2 +- executor/hash_table.go | 2 +- executor/insert.go | 6 +++--- executor/insert_common.go | 8 ++++---- executor/replace.go | 6 +++--- executor/write_test.go | 2 +- planner/core/physical_plan_test.go | 6 +++--- session/session.go | 5 ++++- 10 files changed, 28 insertions(+), 25 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 596271bbe21dc..745431bfd734d 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -3265,7 +3265,7 @@ func (s *testDBSuite4) TestAddColumn2(c *C) { c.Assert(err, IsNil) _, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), table.IsUpdate) c.Assert(err, IsNil) - err = s.tk.Se.StmtCommit() + err = s.tk.Se.StmtCommit(nil) c.Assert(err, IsNil) err = s.tk.Se.CommitTxn(ctx) c.Assert(err, IsNil) diff --git a/executor/distsql.go b/executor/distsql.go index 82e71a9b7c199..b4f4cd35d9f36 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -76,13 +76,13 @@ type lookupTableTask struct { duplicatedIndexOrder map[int64]int // memUsage records the memory usage of this task calculated by table worker. - // memTracker is used to release memUsage after task is done and unused. + // MemTracker is used to release memUsage after task is done and unused. // // The sequence of function calls are: // 1. calculate task.memUsage. - // 2. task.memTracker = tableWorker.memTracker - // 3. task.memTracker.Consume(task.memUsage) - // 4. task.memTracker.Consume(-task.memUsage) + // 2. task.MemTracker = tableWorker.MemTracker + // 3. task.MemTracker.Consume(task.memUsage) + // 4. task.MemTracker.Consume(-task.memUsage) // // Step 1~3 are completed in "tableWorker.executeTask". // Step 4 is completed in "IndexLookUpExecutor.Next". @@ -345,7 +345,7 @@ type IndexLookUpExecutor struct { resultCurr *lookupTableTask feedback *statistics.QueryFeedback - // memTracker is used to track the memory usage of this executor. + // MemTracker is used to track the memory usage of this executor. memTracker *memory.Tracker // checkIndexValue is used to check the consistency of the index data. @@ -390,7 +390,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { } func (e *IndexLookUpExecutor) open(ctx context.Context) error { - // We have to initialize "memTracker" and other execution resources in here + // We have to initialize "MemTracker" and other execution resources in here // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. @@ -787,7 +787,7 @@ type tableWorker struct { keepOrder bool handleIdx int - // memTracker is used to track the memory usage of this executor. + // MemTracker is used to track the memory usage of this executor. memTracker *memory.Tracker // checkIndexValue is used to check the consistency of the index data. diff --git a/executor/executor_test.go b/executor/executor_test.go index 7572ea28d4483..5c33830f2dedb 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -538,7 +538,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, } ld.SetMessage() tk.CheckLastMessage(tt.expectedMsg) - err := ctx.StmtCommit() + err := ctx.StmtCommit(nil) c.Assert(err, IsNil) txn, err := ctx.Txn(true) c.Assert(err, IsNil) diff --git a/executor/hash_table.go b/executor/hash_table.go index c453e781efd21..0c24a0de97b7a 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -86,7 +86,7 @@ type hashRowContainer struct { // hashTable stores the map of hashKey and RowPtr hashTable *rowHashMap - // memTracker is the reference of records.GetMemTracker(). + // MemTracker is the reference of records.GetMemTracker(). // records would be set to nil for garbage collection when spilling is activated // so we need this reference. memTracker *memory.Tracker diff --git a/executor/insert.go b/executor/insert.go index 812a617e234ba..d5df351c77605 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -42,7 +42,7 @@ type InsertExec struct { row4Update []types.Datum Priority mysql.PriorityEnum - memTracker *memory.Tracker + MemTracker *memory.Tracker } func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error { @@ -269,8 +269,8 @@ func (e *InsertExec) Close() error { // Open implements the Executor Open interface. func (e *InsertExec) Open(ctx context.Context) error { - e.memTracker = memory.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.MemTracker = memory.NewTracker(e.id, -1) + e.MemTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) if e.OnDuplicate != nil { e.initEvalBuffer4Dup() diff --git a/executor/insert_common.go b/executor/insert_common.go index d0d22f4a70ebf..563c8501f3b07 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -203,10 +203,10 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { } var memTracker *memory.Tracker if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.memTracker + memTracker = insertExec.MemTracker } else { replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.memTracker + memTracker = replaceExec.MemTracker } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML @@ -401,10 +401,10 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { rows := make([][]types.Datum, 0, chk.Capacity()) var memTracker *memory.Tracker if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.memTracker + memTracker = insertExec.MemTracker } else { replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.memTracker + memTracker = replaceExec.MemTracker } sessVars := e.ctx.GetSessionVars() diff --git a/executor/replace.go b/executor/replace.go index 916081388d3c1..479ed0411f657 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -33,7 +33,7 @@ import ( type ReplaceExec struct { *InsertValues Priority int - memTracker *memory.Tracker + MemTracker *memory.Tracker } // Close implements the Executor Close interface. @@ -47,8 +47,8 @@ func (e *ReplaceExec) Close() error { // Open implements the Executor Open interface. func (e *ReplaceExec) Open(ctx context.Context) error { - e.memTracker = memory.NewTracker(e.id, -1) - e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.MemTracker = memory.NewTracker(e.id, -1) + e.MemTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) if e.SelectExec != nil { return e.SelectExec.Open(ctx) diff --git a/executor/write_test.go b/executor/write_test.go index 0faf78128bd6e..e957c86653960 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -2252,7 +2252,7 @@ func (s *testSuite4) TestLoadDataIntoPartitionedTable(c *C) { c.Assert(err, IsNil) ld.SetMaxRowsInBatch(20000) ld.SetMessage() - err = ctx.StmtCommit() + err = ctx.StmtCommit(nil) c.Assert(err, IsNil) txn, err := ctx.Txn(true) c.Assert(err, IsNil) diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 5257289563420..5eadbe5b544b5 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -312,7 +312,7 @@ func (s *testPlanSuite) TestDAGPlanBuilderUnionScan(c *C) { txn, err := se.Txn(true) c.Assert(err, IsNil) txn.Set(kv.Key("AAA"), []byte("BBB")) - c.Assert(se.StmtCommit(), IsNil) + c.Assert(se.StmtCommit(nil), IsNil) p, _, err := planner.Optimize(context.TODO(), se, stmt, s.is) c.Assert(err, IsNil) s.testData.OnRecord(func() { @@ -496,7 +496,7 @@ func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { txn, err := se.Txn(true) c.Assert(err, IsNil) txn.Set(kv.Key("AAA"), []byte("BBB")) - c.Assert(se.StmtCommit(), IsNil) + c.Assert(se.StmtCommit(nil), IsNil) p, _, err := planner.Optimize(context.TODO(), se, stmt, s.is) c.Assert(err, IsNil, comment) s.testData.OnRecord(func() { @@ -540,7 +540,7 @@ func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { txn, err := se.Txn(true) c.Assert(err, IsNil) txn.Set(kv.Key("AAA"), []byte("BBB")) - c.Assert(se.StmtCommit(), IsNil) + c.Assert(se.StmtCommit(nil), IsNil) p, _, err := planner.Optimize(context.TODO(), se, stmt, pis) c.Assert(err, IsNil, comment) c.Assert(core.ToString(p), Equals, tt.best, comment) diff --git a/session/session.go b/session/session.go index 8681d6e462855..2fab4a1bb05c5 100644 --- a/session/session.go +++ b/session/session.go @@ -70,6 +70,7 @@ import ( "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" + "github.com/pingcap/tidb/util/memory" ) var ( @@ -689,7 +690,9 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { s.StmtRollback() break } - // TODO: pass the memTracker here. + // We do not need to pass memTracker here, because that retry + // happened after commit, the memory usage was calculated during the + // first execution. err = s.StmtCommit(nil) if err != nil { return err From c6f9e297994be6201c60f20bae583b1bf308ce5a Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 23 Dec 2019 19:42:24 +0800 Subject: [PATCH 03/24] fix ci --- distsql/distsql.go | 4 ++-- executor/distsql.go | 14 +++++++------- executor/hash_table.go | 2 +- executor/insert.go | 6 +++--- executor/insert_common.go | 4 ++-- kv/kv.go | 2 +- session/session.go | 1 - session/tidb.go | 2 +- table/tables/tables_test.go | 2 +- 9 files changed, 18 insertions(+), 19 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 80d4960a3328e..21806d3b9eb51 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -56,9 +56,9 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie label = metrics.LblInternal } - // kvReq.MemTracker is used to trace and control memory usage in DistSQL layer; + // kvReq.memTracker is used to trace and control memory usage in DistSQL layer; // for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it; - // for selectResult, we just use the kvReq.MemTracker prepared for co-processor + // for selectResult, we just use the kvReq.memTracker prepared for co-processor // instead of creating a new one for simplification. if kvReq.Streaming { return &streamResult{ diff --git a/executor/distsql.go b/executor/distsql.go index b4f4cd35d9f36..82e71a9b7c199 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -76,13 +76,13 @@ type lookupTableTask struct { duplicatedIndexOrder map[int64]int // memUsage records the memory usage of this task calculated by table worker. - // MemTracker is used to release memUsage after task is done and unused. + // memTracker is used to release memUsage after task is done and unused. // // The sequence of function calls are: // 1. calculate task.memUsage. - // 2. task.MemTracker = tableWorker.MemTracker - // 3. task.MemTracker.Consume(task.memUsage) - // 4. task.MemTracker.Consume(-task.memUsage) + // 2. task.memTracker = tableWorker.memTracker + // 3. task.memTracker.Consume(task.memUsage) + // 4. task.memTracker.Consume(-task.memUsage) // // Step 1~3 are completed in "tableWorker.executeTask". // Step 4 is completed in "IndexLookUpExecutor.Next". @@ -345,7 +345,7 @@ type IndexLookUpExecutor struct { resultCurr *lookupTableTask feedback *statistics.QueryFeedback - // MemTracker is used to track the memory usage of this executor. + // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker // checkIndexValue is used to check the consistency of the index data. @@ -390,7 +390,7 @@ func (e *IndexLookUpExecutor) Open(ctx context.Context) error { } func (e *IndexLookUpExecutor) open(ctx context.Context) error { - // We have to initialize "MemTracker" and other execution resources in here + // We have to initialize "memTracker" and other execution resources in here // instead of in function "Open", because this "IndexLookUpExecutor" may be // constructed by a "IndexLookUpJoin" and "Open" will not be called in that // situation. @@ -787,7 +787,7 @@ type tableWorker struct { keepOrder bool handleIdx int - // MemTracker is used to track the memory usage of this executor. + // memTracker is used to track the memory usage of this executor. memTracker *memory.Tracker // checkIndexValue is used to check the consistency of the index data. diff --git a/executor/hash_table.go b/executor/hash_table.go index 0c24a0de97b7a..c453e781efd21 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -86,7 +86,7 @@ type hashRowContainer struct { // hashTable stores the map of hashKey and RowPtr hashTable *rowHashMap - // MemTracker is the reference of records.GetMemTracker(). + // memTracker is the reference of records.GetMemTracker(). // records would be set to nil for garbage collection when spilling is activated // so we need this reference. memTracker *memory.Tracker diff --git a/executor/insert.go b/executor/insert.go index d5df351c77605..812a617e234ba 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -42,7 +42,7 @@ type InsertExec struct { row4Update []types.Datum Priority mysql.PriorityEnum - MemTracker *memory.Tracker + memTracker *memory.Tracker } func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error { @@ -269,8 +269,8 @@ func (e *InsertExec) Close() error { // Open implements the Executor Open interface. func (e *InsertExec) Open(ctx context.Context) error { - e.MemTracker = memory.NewTracker(e.id, -1) - e.MemTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) if e.OnDuplicate != nil { e.initEvalBuffer4Dup() diff --git a/executor/insert_common.go b/executor/insert_common.go index 563c8501f3b07..186a97c832e3d 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -203,7 +203,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { } var memTracker *memory.Tracker if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.MemTracker + memTracker = insertExec.memTracker } else { replaceExec := base.(*ReplaceExec) memTracker = replaceExec.MemTracker @@ -401,7 +401,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { rows := make([][]types.Datum, 0, chk.Capacity()) var memTracker *memory.Tracker if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.MemTracker + memTracker = insertExec.memTracker } else { replaceExec := base.(*ReplaceExec) memTracker = replaceExec.MemTracker diff --git a/kv/kv.go b/kv/kv.go index 0f3a75fa61425..e64441da99623 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -264,7 +264,7 @@ type Request struct { IsolationLevel IsoLevel // Priority is the priority of this KV request, its value may be PriorityNormal/PriorityLow/PriorityHigh. Priority int - // MemTracker is used to trace and control memory usage in co-processor layer. + // memTracker is used to trace and control memory usage in co-processor layer. MemTracker *memory.Tracker // KeepOrder is true, if the response should be returned in order. KeepOrder bool diff --git a/session/session.go b/session/session.go index 2fab4a1bb05c5..2043e8aeaa925 100644 --- a/session/session.go +++ b/session/session.go @@ -70,7 +70,6 @@ import ( "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-binlog" "go.uber.org/zap" - "github.com/pingcap/tidb/util/memory" ) var ( diff --git a/session/tidb.go b/session/tidb.go index 7fa271cdbed09..3cfaea52bf6f9 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -277,7 +277,7 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement) if err != nil { sctx.StmtRollback() } else { - err = sctx.StmtCommit(nil) + err = sctx.StmtCommit(sctx.GetSessionVars().StmtCtx.MemTracker) } } } else { diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index 8912df5846b2f..465f5cb3c42de 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -319,7 +319,7 @@ func (ts *testSuite) TestUnsignedPK(c *C) { c.Assert(err, IsNil) c.Assert(len(row), Equals, 2) c.Assert(row[0].Kind(), Equals, types.KindUint64) - c.Assert(ts.se.StmtCommit(), IsNil) + c.Assert(ts.se.StmtCommit(nil), IsNil) txn, err := ts.se.Txn(true) c.Assert(err, IsNil) c.Assert(txn.Commit(context.Background()), IsNil) From 1bb264006d6ee5d25257ae0632152d34f13ff632 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 23 Dec 2019 23:05:48 +0800 Subject: [PATCH 04/24] fix ci --- executor/executor_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/executor/executor_test.go b/executor/executor_test.go index 5c33830f2dedb..d21c44e6a5082 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4507,6 +4507,10 @@ func (s *testSuite) TestOOMPanicAction(c *C) { tk.MustExec("drop table if exists t,t1") tk.MustExec("create table t (a bigint);") tk.MustExec("create table t1 (a bigint);") + _, err = tk.Exec("insert into t1 values (1),(2),(3),(4),(5);") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + tk.MustExec("set @@tidb_mem_quota_query=10000;") tk.MustExec("insert into t1 values (1),(2),(3),(4),(5);") tk.MustExec("set @@tidb_mem_quota_query=200;") _, err = tk.Exec("insert into t select a from t1 order by a desc;") From ddfbbe1661e887dcfe548ad4b7ab866a75a08927 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 24 Dec 2019 11:09:26 +0800 Subject: [PATCH 05/24] remove useless code --- types/datum.go | 1 - 1 file changed, 1 deletion(-) diff --git a/types/datum.go b/types/datum.go index d7528d3b73cc1..874d6f4d68855 100644 --- a/types/datum.go +++ b/types/datum.go @@ -32,7 +32,6 @@ import ( "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" - "unsafe" ) // Kind constants. From 04d4dad30e0edf28db230fa4e2849c9ae56da5ef Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 24 Dec 2019 11:09:50 +0800 Subject: [PATCH 06/24] refine --- executor/insert_common.go | 4 ++-- executor/replace.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 186a97c832e3d..d0d22f4a70ebf 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -206,7 +206,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { memTracker = insertExec.memTracker } else { replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.MemTracker + memTracker = replaceExec.memTracker } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML @@ -404,7 +404,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { memTracker = insertExec.memTracker } else { replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.MemTracker + memTracker = replaceExec.memTracker } sessVars := e.ctx.GetSessionVars() diff --git a/executor/replace.go b/executor/replace.go index 479ed0411f657..916081388d3c1 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -33,7 +33,7 @@ import ( type ReplaceExec struct { *InsertValues Priority int - MemTracker *memory.Tracker + memTracker *memory.Tracker } // Close implements the Executor Close interface. @@ -47,8 +47,8 @@ func (e *ReplaceExec) Close() error { // Open implements the Executor Open interface. func (e *ReplaceExec) Open(ctx context.Context) error { - e.MemTracker = memory.NewTracker(e.id, -1) - e.MemTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) + e.memTracker = memory.NewTracker(e.id, -1) + e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker) if e.SelectExec != nil { return e.SelectExec.Open(ctx) From 554f22c42e9574808a58b6d16128aa28bbfdf2bb Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 24 Dec 2019 16:18:58 +0800 Subject: [PATCH 07/24] remove useless code --- distsql/distsql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/distsql/distsql.go b/distsql/distsql.go index 21806d3b9eb51..80d4960a3328e 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -56,9 +56,9 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie label = metrics.LblInternal } - // kvReq.memTracker is used to trace and control memory usage in DistSQL layer; + // kvReq.MemTracker is used to trace and control memory usage in DistSQL layer; // for streamResult, since it is a pipeline which has no buffer, it's not necessary to trace it; - // for selectResult, we just use the kvReq.memTracker prepared for co-processor + // for selectResult, we just use the kvReq.MemTracker prepared for co-processor // instead of creating a new one for simplification. if kvReq.Streaming { return &streamResult{ From d43022754e8f2d4cd52307e9ab1a68844008b865 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Tue, 24 Dec 2019 17:01:19 +0800 Subject: [PATCH 08/24] add some tests --- executor/executor_test.go | 1 - executor/seqtest/seq_executor_test.go | 151 ++++++++++++++++++++++++++ 2 files changed, 151 insertions(+), 1 deletion(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index d21c44e6a5082..bd2455cfcdf30 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -112,7 +112,6 @@ var _ = Suite(&testSuite8{&baseTestSuite{}}) var _ = SerialSuites(&testShowStatsSuite{&baseTestSuite{}}) var _ = Suite(&testBypassSuite{}) var _ = Suite(&testUpdateSuite{}) -var _ = Suite(&testOOMSuite{}) var _ = Suite(&testPointGetSuite{}) var _ = Suite(&testBatchPointGetSuite{}) var _ = SerialSuites(&testRecoverTable{}) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index cf85e0c17167c..e3434e9225275 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -34,6 +34,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/pingcap/log" "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/terror" @@ -53,7 +54,10 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testkit" + "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func TestT(t *testing.T) { @@ -65,6 +69,7 @@ func TestT(t *testing.T) { var _ = Suite(&seqTestSuite{}) var _ = Suite(&seqTestSuite1{}) +var _ = Suite(&testOOMSuite{}) type seqTestSuite struct { cluster *mocktikv.Cluster @@ -1159,3 +1164,149 @@ func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) { c.Assert(variable.GetMaxDeltaSchemaCount(), Equals, int64(2048)) tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048")) } + +type testOOMSuite struct { + store kv.Storage + do *domain.Domain + oom *oomCapturer +} + +func (s *testOOMSuite) SetUpSuite(c *C) { + testleak.BeforeTest() + s.registerHook() + var err error + s.store, err = mockstore.NewMockTikvStore() + c.Assert(err, IsNil) + session.SetSchemaLease(0) + domain.RunAutoAnalyze = false + s.do, err = session.BootstrapSession(s.store) + c.Assert(err, IsNil) +} + +func (s *testOOMSuite) registerHook() { + conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}} + _, r, _ := log.InitLogger(conf) + s.oom = &oomCapturer{r.Core, ""} + lg := zap.New(s.oom) + log.ReplaceGlobals(lg, r) +} + +func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + + s.oom.tracker = "" + tk.MustQuery("select * from t") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaDistSQL = 1 + tk.MustQuery("select * from t") + c.Assert(s.oom.tracker, Equals, "TableReader_5") + tk.Se.GetSessionVars().MemQuotaDistSQL = -1 + + s.oom.tracker = "" + tk.MustQuery("select a from t") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaDistSQL = 1 + tk.MustQuery("select a from t use index(idx_a)") + c.Assert(s.oom.tracker, Equals, "IndexReader_5") + tk.Se.GetSessionVars().MemQuotaDistSQL = -1 + + s.oom.tracker = "" + tk.MustQuery("select * from t") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaIndexLookupReader = 1 + tk.MustQuery("select * from t use index(idx_a)") + c.Assert(s.oom.tracker, Equals, "IndexLookUp_6") + tk.Se.GetSessionVars().MemQuotaIndexLookupReader = -1 +} + +func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))") + + s.oom.tracker = "" + tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 + + s.oom.tracker = "" + tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 + + s.oom.tracker = "" + tk.MustExec("insert into t select * from t") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("insert into t select * from t") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 + + s.oom.tracker = "" + tk.MustExec("replace into t select * from t") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("replace into t select * from t") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 + + tk.Se.GetSessionVars().DMLBatchSize = 1 + tk.Se.GetSessionVars().BatchInsert = true + s.oom.tracker = "" + tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 + + s.oom.tracker = "" + tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(s.oom.tracker, Equals, "") + tk.Se.GetSessionVars().MemQuotaQuery = 1 + tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + c.Assert(len(s.oom.tracker) > 0, IsTrue) + tk.Se.GetSessionVars().MemQuotaQuery = -1 +} + +type oomCapturer struct { + zapcore.Core + tracker string +} + +func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error { + if strings.Contains(entry.Message, "memory exceeds quota") { + err, _ := fields[0].Interface.(error) + str := err.Error() + begin := strings.Index(str, "8001]") + if begin == -1 { + panic("begin not found") + } + end := strings.Index(str, " holds") + if end == -1 { + panic("end not found") + } + h.tracker = str[begin+len("8001]") : end] + return nil + } + h.tracker = entry.Message + return nil +} + +func (h *oomCapturer) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { + if h.Enabled(e.Level) { + return ce.AddCore(e, h) + } + return ce +} From 1f6ed6e94bf8cb273468e5f5101157c85eee9b82 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 11:19:27 +0800 Subject: [PATCH 09/24] refine code --- executor/executor_test.go | 131 -------------------------------------- executor/insert_common.go | 20 +++--- 2 files changed, 10 insertions(+), 141 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index bd2455cfcdf30..3b1e365279242 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -4415,137 +4415,6 @@ func (s *testSuiteP2) TestUnsignedFeedback(c *C) { c.Assert(result.Rows()[2][3], Equals, "table:t, range:[0,+inf], keep order:false") } -type testOOMSuite struct { - store kv.Storage - do *domain.Domain - oom *oomCapturer -} - -func (s *testOOMSuite) SetUpSuite(c *C) { - c.Skip("log.ReplaceGlobals(lg, r) in registerHook() may result in data race") - testleak.BeforeTest() - s.registerHook() - var err error - s.store, err = mockstore.NewMockTikvStore() - c.Assert(err, IsNil) - session.SetSchemaLease(0) - domain.RunAutoAnalyze = false - s.do, err = session.BootstrapSession(s.store) - c.Assert(err, IsNil) -} - -func (s *testOOMSuite) registerHook() { - conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}} - _, r, _ := log.InitLogger(conf) - s.oom = &oomCapturer{r.Core, ""} - lg := zap.New(s.oom) - log.ReplaceGlobals(lg, r) -} - -func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))") - tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") - - s.oom.tracker = "" - tk.MustQuery("select * from t") - c.Assert(s.oom.tracker, Equals, "") - tk.Se.GetSessionVars().MemQuotaDistSQL = 1 - tk.MustQuery("select * from t") - c.Assert(s.oom.tracker, Equals, "TableReaderDistSQLTracker") - tk.Se.GetSessionVars().MemQuotaDistSQL = -1 - - s.oom.tracker = "" - tk.MustQuery("select a from t") - c.Assert(s.oom.tracker, Equals, "") - tk.Se.GetSessionVars().MemQuotaDistSQL = 1 - tk.MustQuery("select a from t use index(idx_a)") - c.Assert(s.oom.tracker, Equals, "IndexReaderDistSQLTracker") - tk.Se.GetSessionVars().MemQuotaDistSQL = -1 - - s.oom.tracker = "" - tk.MustQuery("select * from t") - c.Assert(s.oom.tracker, Equals, "") - tk.Se.GetSessionVars().MemQuotaDistSQL = 1 - tk.MustQuery("select * from t use index(idx_a)") - c.Assert(s.oom.tracker, Equals, "IndexLookupDistSQLTracker") - tk.Se.GetSessionVars().MemQuotaDistSQL = -1 -} - -func setOOMAction(action string) { - old := config.GetGlobalConfig() - newConf := *old - newConf.OOMAction = action - config.StoreGlobalConfig(&newConf) -} - -func (s *testSuite) TestOOMPanicAction(c *C) { - tk := testkit.NewTestKit(c, s.store) - tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (a int primary key, b double);") - tk.MustExec("insert into t values (1,1)") - sm := &mockSessionManager1{ - PS: make([]*util.ProcessInfo, 0), - } - tk.Se.SetSessionManager(sm) - s.domain.ExpensiveQueryHandle().SetSessionManager(sm) - orgAction := config.GetGlobalConfig().OOMAction - setOOMAction(config.OOMActionCancel) - defer func() { - setOOMAction(orgAction) - }() - tk.MustExec("set @@tidb_mem_quota_query=1;") - err := tk.QueryToErr("select sum(b) from t group by a;") - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") - - // Test insert from select oom panic. - tk.MustExec("drop table if exists t,t1") - tk.MustExec("create table t (a bigint);") - tk.MustExec("create table t1 (a bigint);") - _, err = tk.Exec("insert into t1 values (1),(2),(3),(4),(5);") - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") - tk.MustExec("set @@tidb_mem_quota_query=10000;") - tk.MustExec("insert into t1 values (1),(2),(3),(4),(5);") - tk.MustExec("set @@tidb_mem_quota_query=200;") - _, err = tk.Exec("insert into t select a from t1 order by a desc;") - c.Assert(err, NotNil) - c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") -} - -type oomCapturer struct { - zapcore.Core - tracker string -} - -func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error { - if strings.Contains(entry.Message, "memory exceeds quota") { - err, _ := fields[0].Interface.(error) - str := err.Error() - begin := strings.Index(str, "8001]") - if begin == -1 { - panic("begin not found") - } - end := strings.Index(str, " holds") - if end == -1 { - panic("end not found") - } - h.tracker = str[begin+len("8001]") : end] - } - return nil -} - -func (h *oomCapturer) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry { - if h.Enabled(e.Level) { - return ce.AddCore(e, h) - } - return ce -} - type testRecoverTable struct { store kv.Storage dom *domain.Domain diff --git a/executor/insert_common.go b/executor/insert_common.go index d0d22f4a70ebf..25d04764ccdd2 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -202,11 +202,11 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { return err } var memTracker *memory.Tracker - if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.memTracker - } else { - replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.memTracker + switch x := base.(type) { + case *InsertExec: + memTracker = x.memTracker + case *ReplaceExec: + memTracker = x.memTracker } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML @@ -400,11 +400,11 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) var memTracker *memory.Tracker - if insertExec, ok := base.(*InsertExec); ok { - memTracker = insertExec.memTracker - } else { - replaceExec := base.(*ReplaceExec) - memTracker = replaceExec.memTracker + switch x := base.(type) { + case *InsertExec: + memTracker = x.memTracker + case *ReplaceExec: + memTracker = x.memTracker } sessVars := e.ctx.GetSessionVars() From 13edb1f6c2c56acc8be75122262d147fe80970f5 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 11:22:54 +0800 Subject: [PATCH 10/24] remove useless code --- executor/executor_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 3b1e365279242..676d696263951 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -73,8 +73,6 @@ import ( "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/timeutil" "github.com/pingcap/tipb/go-tipb" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) func TestT(t *testing.T) { From b55a29ee1377b154a3e87f0460e72672a56397e0 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 11:36:42 +0800 Subject: [PATCH 11/24] add test --- executor/executor_test.go | 47 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index 676d696263951..ff35bf4edb597 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/log" "github.com/pingcap/parser" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" @@ -4413,6 +4412,52 @@ func (s *testSuiteP2) TestUnsignedFeedback(c *C) { c.Assert(result.Rows()[2][3], Equals, "table:t, range:[0,+inf], keep order:false") } +func (s *testSuite) TestOOMPanicAction(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int primary key, b double);") + tk.MustExec("insert into t values (1,1)") + sm := &mockSessionManager1{ + PS: make([]*util.ProcessInfo, 0), + } + tk.Se.SetSessionManager(sm) + s.domain.ExpensiveQueryHandle().SetSessionManager(sm) + orgAction := config.GetGlobalConfig().OOMAction + setOOMAction(config.OOMActionCancel) + defer func() { + setOOMAction(orgAction) + }() + tk.MustExec("set @@tidb_mem_quota_query=1;") + err := tk.QueryToErr("select sum(b) from t group by a;") + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + + // Test insert from select oom panic. + tk.MustExec("drop table if exists t,t1") + tk.MustExec("create table t (a bigint);") + tk.MustExec("create table t1 (a bigint);") + tk.MustExec("set @@tidb_mem_quota_query=200;") + _, err = tk.Exec("insert into t1 values (1),(2),(3),(4),(5);") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + _, err = tk.Exec("replace into t1 values (1),(2),(3),(4),(5);") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + tk.MustExec("set @@tidb_mem_quota_query=10000") + tk.MustExec("insert into t1 values (1),(2),(3),(4),(5);") + tk.MustExec("set @@tidb_mem_quota_query=200;") + _, err = tk.Exec("insert into t select a from t1 order by a desc;") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") + _, err = tk.Exec("replace into t select a from t1 order by a desc;") + c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*") +} + +func setOOMAction(action string) { + old := config.GetGlobalConfig() + newConf := *old + newConf.OOMAction = action + config.StoreGlobalConfig(&newConf) +} + type testRecoverTable struct { store kv.Storage dom *domain.Domain From dbc820c57a2b79a85f6214cca48ac2385c488068 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 15:05:08 +0800 Subject: [PATCH 12/24] fix ci --- executor/seqtest/seq_executor_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index e3434e9225275..25c56f2ce31ef 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1184,7 +1184,7 @@ func (s *testOOMSuite) SetUpSuite(c *C) { } func (s *testOOMSuite) registerHook() { - conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}} + conf := &log.Config{Level: "info", File: log.FileLogConfig{}} _, r, _ := log.InitLogger(conf) s.oom = &oomCapturer{r.Core, ""} lg := zap.New(s.oom) @@ -1234,7 +1234,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" @@ -1242,7 +1242,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" @@ -1250,7 +1250,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("insert into t select * from t") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" @@ -1258,7 +1258,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("replace into t select * from t") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 tk.Se.GetSessionVars().DMLBatchSize = 1 @@ -1268,7 +1268,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" @@ -1276,7 +1276,7 @@ func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") - c.Assert(len(s.oom.tracker) > 0, IsTrue) + c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 } From 0b61a15ccee0f4abc6fbde4a48f9f7af1eefb615 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 16:46:57 +0800 Subject: [PATCH 13/24] test --- executor/seqtest/seq_executor_test.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index 25c56f2ce31ef..ace1336d4f3a2 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1183,8 +1183,13 @@ func (s *testOOMSuite) SetUpSuite(c *C) { c.Assert(err, IsNil) } +func (s *testOOMSuite) TearDownSuite(c *C) { + s.do.Close() + s.store.Close() +} + func (s *testOOMSuite) registerHook() { - conf := &log.Config{Level: "info", File: log.FileLogConfig{}} + conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}} _, r, _ := log.InitLogger(conf) s.oom = &oomCapturer{r.Core, ""} lg := zap.New(s.oom) @@ -1192,6 +1197,7 @@ func (s *testOOMSuite) registerHook() { } func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { + log.SetLevel(zap.WarnLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") @@ -1224,6 +1230,7 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { } func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { + log.SetLevel(zap.InfoLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") tk.MustExec("drop table if exists t") From 4797740f469dcf2374fffee4e9f74d3a31c06f92 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Wed, 25 Dec 2019 18:00:46 +0800 Subject: [PATCH 14/24] fix ci --- executor/distsql.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/executor/distsql.go b/executor/distsql.go index 0f7347a598c6c..c824d526ee4b9 100644 --- a/executor/distsql.go +++ b/executor/distsql.go @@ -509,6 +509,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency e.tblWorkerWg.Add(lookupConcurrencyLimit) for i := 0; i < lookupConcurrencyLimit; i++ { + workerID := i worker := &tableWorker{ idxLookup: e, workCh: workCh, @@ -517,7 +518,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha keepOrder: e.keepOrder, handleIdx: e.handleIdx, checkIndexValue: e.checkIndexValue, - memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }), + memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(workerID) }), e.ctx.GetSessionVars().MemQuotaIndexLookupReader), } worker.memTracker.AttachTo(e.memTracker) From c498234a4c5831066284eda1d0bd340eb7380d52 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 26 Dec 2019 11:51:15 +0800 Subject: [PATCH 15/24] address comment --- executor/insert_common.go | 4 ++++ session/txn.go | 8 ++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 25d04764ccdd2..a736fbfbc7c26 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -207,6 +207,8 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { memTracker = x.memTracker case *ReplaceExec: memTracker = x.memTracker + default: + return errors.Errorf("unexpected executor type %v", x) } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML @@ -405,6 +407,8 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { memTracker = x.memTracker case *ReplaceExec: memTracker = x.memTracker + default: + return errors.Errorf("unexpected executor type %v", x) } sessVars := e.ctx.GetSessionVars() diff --git a/session/txn.go b/session/txn.go index 54037e98421d7..d32e1c5905f73 100755 --- a/session/txn.go +++ b/session/txn.go @@ -438,8 +438,12 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { // StmtCommit implements the sessionctx.Context interface. func (s *session) StmtCommit(memTracker *memory.Tracker) error { - defer s.txn.cleanup() + defer func() { + s.txn.cleanup() + memTracker.Consume(int64(-s.txn.Size())) + }() st := &s.txn + txnSize := st.Transaction.Size() var count int err := kv.WalkMemBuffer(st.buf, func(k kv.Key, v []byte) error { failpoint.Inject("mockStmtCommitError", func(val failpoint.Value) { @@ -462,7 +466,7 @@ func (s *session) StmtCommit(memTracker *memory.Tracker) error { return err } if memTracker != nil { - memTracker.Consume(int64(st.Transaction.Size())) + memTracker.Consume(int64(st.Transaction.Size() - txnSize)) } // Need to flush binlog. From 46e698dea3ac27989b20c66dac5c4f937103b609 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 26 Dec 2019 14:55:21 +0800 Subject: [PATCH 16/24] fix ci --- executor/seqtest/seq_executor_test.go | 34 +++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/executor/seqtest/seq_executor_test.go b/executor/seqtest/seq_executor_test.go index ace1336d4f3a2..40dbce6b762c0 100644 --- a/executor/seqtest/seq_executor_test.go +++ b/executor/seqtest/seq_executor_test.go @@ -1177,7 +1177,6 @@ func (s *testOOMSuite) SetUpSuite(c *C) { var err error s.store, err = mockstore.NewMockTikvStore() c.Assert(err, IsNil) - session.SetSchemaLease(0) domain.RunAutoAnalyze = false s.do, err = session.BootstrapSession(s.store) c.Assert(err, IsNil) @@ -1197,13 +1196,12 @@ func (s *testOOMSuite) registerHook() { } func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { - log.SetLevel(zap.WarnLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))") tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + log.SetLevel(zap.WarnLevel) s.oom.tracker = "" tk.MustQuery("select * from t") c.Assert(s.oom.tracker, Equals, "") @@ -1230,59 +1228,59 @@ func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) { } func (s *testOOMSuite) TestMemTracker4InsertAndReplaceExec(c *C) { - log.SetLevel(zap.InfoLevel) + //log.SetLevel(zap.FatalLevel) tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") - tk.MustExec("drop table if exists t") - tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))") + tk.MustExec("create table t1 (id int, a int, b int, index idx_a(`a`))") + log.SetLevel(zap.InfoLevel) s.oom.tracker = "" - tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("insert into t select * from t") + tk.MustExec("insert into t1 select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t select * from t") + tk.MustExec("insert into t1 select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t select * from t") + tk.MustExec("replace into t1 select * from t") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t select * from t") + tk.MustExec("replace into t1 select * from t") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 tk.Se.GetSessionVars().DMLBatchSize = 1 tk.Se.GetSessionVars().BatchInsert = true s.oom.tracker = "" - tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("insert into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 s.oom.tracker = "" - tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Equals, "") tk.Se.GetSessionVars().MemQuotaQuery = 1 - tk.MustExec("replace into t values (1,1,1), (2,2,2), (3,3,3)") + tk.MustExec("replace into t1 values (1,1,1), (2,2,2), (3,3,3)") c.Assert(s.oom.tracker, Matches, "expensive_query during bootstrap phase") tk.Se.GetSessionVars().MemQuotaQuery = -1 } From 3fb96e98164bde60ae36203dd14d3d3dd971487a Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Thu, 26 Dec 2019 16:03:31 +0800 Subject: [PATCH 17/24] address comment --- session/txn.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/session/txn.go b/session/txn.go index d32e1c5905f73..a8263f73c2322 100755 --- a/session/txn.go +++ b/session/txn.go @@ -438,10 +438,7 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { // StmtCommit implements the sessionctx.Context interface. func (s *session) StmtCommit(memTracker *memory.Tracker) error { - defer func() { - s.txn.cleanup() - memTracker.Consume(int64(-s.txn.Size())) - }() + defer s.txn.cleanup() st := &s.txn txnSize := st.Transaction.Size() var count int From 6419d091119abd3048b6cdc87c29b475d8dce579 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 27 Dec 2019 11:58:03 +0800 Subject: [PATCH 18/24] address comment --- executor/insert.go | 7 +++---- executor/insert_common.go | 11 ++++++----- executor/replace.go | 7 +++---- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/executor/insert.go b/executor/insert.go index 812a617e234ba..fc3954a9f9d50 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -41,11 +41,10 @@ type InsertExec struct { curInsertVals chunk.MutRow row4Update []types.Datum - Priority mysql.PriorityEnum - memTracker *memory.Tracker + Priority mysql.PriorityEnum } -func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error { +func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { logutil.Eventf(ctx, "insert %d rows into table `%s`", len(rows), stringutil.MemoizeStr(func() string { var tblName string if meta := e.Table.Meta(); meta != nil { @@ -88,7 +87,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum, memTracker } } } - memTracker.Consume(int64(txn.Size())) + e.memTracker.Consume(int64(txn.Size())) return nil } diff --git a/executor/insert_common.go b/executor/insert_common.go index a736fbfbc7c26..8c505b0e37676 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -69,6 +69,7 @@ type InsertValues struct { // Other statements like `insert select from` don't guarantee consecutive autoID. // https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html lazyFillAutoID bool + memTracker *memory.Tracker } type defaultVal struct { @@ -79,7 +80,7 @@ type defaultVal struct { type insertCommon interface { insertCommon() *InsertValues - exec(ctx context.Context, rows [][]types.Datum, memTracker *memory.Tracker) error + exec(ctx context.Context, rows [][]types.Datum) error } func (e *InsertValues) insertCommon() *InsertValues { @@ -238,7 +239,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { if err != nil { return err } - if err = base.exec(ctx, rows, memTracker); err != nil { + if err = base.exec(ctx, rows); err != nil { return err } rows = rows[:0] @@ -258,7 +259,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { if err != nil { return err } - err = base.exec(ctx, rows, memTracker) + err = base.exec(ctx, rows) if err != nil { return err } @@ -440,7 +441,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { if batchInsert && e.rowCount%uint64(batchSize) == 0 { memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) memTracker.Consume(memUsageOfRows) - if err = base.exec(ctx, rows, memTracker); err != nil { + if err = base.exec(ctx, rows); err != nil { return err } rows = rows[:0] @@ -456,7 +457,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows)) memTracker.Consume(memUsageOfRows) } - err = base.exec(ctx, rows, memTracker) + err = base.exec(ctx, rows) if err != nil { return err } diff --git a/executor/replace.go b/executor/replace.go index 916081388d3c1..766839ba4bca8 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -32,8 +32,7 @@ import ( // ReplaceExec represents a replace executor. type ReplaceExec struct { *InsertValues - Priority int - memTracker *memory.Tracker + Priority int } // Close implements the Executor Close interface. @@ -169,7 +168,7 @@ func (e *ReplaceExec) removeIndexRow(ctx context.Context, txn kv.Transaction, r return false, false, nil } -func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum, memTracker *memory.Tracker) error { +func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { /* * MySQL uses the following algorithm for REPLACE (and LOAD DATA ... REPLACE): * 1. Try to insert the new row into the table @@ -207,7 +206,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum, memTrac return err } } - memTracker.Consume(int64(txn.Size())) + e.memTracker.Consume(int64(txn.Size())) return nil } From 1df692ff56a17720230b5c5af4e24ec31487260b Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 27 Dec 2019 11:59:26 +0800 Subject: [PATCH 19/24] address comment --- executor/insert_common.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 8c505b0e37676..554d4153b6205 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -87,7 +87,7 @@ func (e *InsertValues) insertCommon() *InsertValues { return e } -func (e *InsertValues) exec(_ context.Context, _ [][]types.Datum, _ *memory.Tracker) error { +func (e *InsertValues) exec(_ context.Context, _ [][]types.Datum) error { panic("derived should overload exec function") } From 7b9ed0748021f448753e2bfa1ad1d77c0debd778 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 27 Dec 2019 12:07:03 +0800 Subject: [PATCH 20/24] address comment --- executor/insert_common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 554d4153b6205..848bac30a7327 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -468,9 +468,9 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { return nil } -func (e *InsertValues) doBatchInsert(ctx context.Context, memTracker *memory.Tracker) error { +func (e *InsertValues) doBatchInsert(ctx context.Context) error { sessVars := e.ctx.GetSessionVars() - if err := e.ctx.StmtCommit(memTracker); err != nil { + if err := e.ctx.StmtCommit(e.memTracker); err != nil { return err } if err := e.ctx.NewTxn(ctx); err != nil { From 438ee6c6a12b3704e88cae999a4754d7afa008e5 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Fri, 27 Dec 2019 15:31:11 +0800 Subject: [PATCH 21/24] fix ci --- executor/insert_common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 848bac30a7327..373a394ed3e55 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -245,7 +245,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { rows = rows[:0] memTracker.Consume(-memUsageOfRows) memUsageOfRows = 0 - if err = e.doBatchInsert(ctx, memTracker); err != nil { + if err = e.doBatchInsert(ctx); err != nil { return err } } @@ -447,7 +447,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { rows = rows[:0] memTracker.Consume(-memUsageOfRows) memUsageOfRows = 0 - if err = e.doBatchInsert(ctx, memTracker); err != nil { + if err = e.doBatchInsert(ctx); err != nil { return err } } From 0fd20842690c135d5e343073eff902a9bdae0c4e Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 30 Dec 2019 11:15:11 +0800 Subject: [PATCH 22/24] address comment --- executor/insert_common.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index 373a394ed3e55..257b629008585 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -202,15 +202,6 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { if err = e.processSetList(); err != nil { return err } - var memTracker *memory.Tracker - switch x := base.(type) { - case *InsertExec: - memTracker = x.memTracker - case *ReplaceExec: - memTracker = x.memTracker - default: - return errors.Errorf("unexpected executor type %v", x) - } sessVars := e.ctx.GetSessionVars() batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML batchSize := sessVars.DMLBatchSize @@ -223,6 +214,7 @@ func insertRows(ctx context.Context, base insertCommon) (err error) { rows := make([][]types.Datum, 0, len(e.Lists)) memUsageOfRows := int64(0) + memTracker := e.memTracker for i, list := range e.Lists { e.rowCount++ var row []types.Datum @@ -402,15 +394,6 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { chk := newFirstChunk(selectExec) iter := chunk.NewIterator4Chunk(chk) rows := make([][]types.Datum, 0, chk.Capacity()) - var memTracker *memory.Tracker - switch x := base.(type) { - case *InsertExec: - memTracker = x.memTracker - case *ReplaceExec: - memTracker = x.memTracker - default: - return errors.Errorf("unexpected executor type %v", x) - } sessVars := e.ctx.GetSessionVars() if !sessVars.StrictSQLMode { @@ -420,6 +403,7 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error { batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML batchSize := sessVars.DMLBatchSize memUsageOfRows := int64(0) + memTracker := e.memTracker for { err := Next(ctx, selectExec, chk) if err != nil { From 0b63c28f5845ce1070c46ed94249c2e660c87c07 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 30 Dec 2019 11:27:24 +0800 Subject: [PATCH 23/24] address comment --- executor/insert_common.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/executor/insert_common.go b/executor/insert_common.go index 257b629008585..0314b470605c2 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/sirupsen/logrus" "go.uber.org/zap" ) @@ -197,6 +198,7 @@ func (e *InsertValues) processSetList() error { // insertRows processes `insert|replace into values ()` or `insert|replace into set x=y` func insertRows(ctx context.Context, base insertCommon) (err error) { + logrus.Warning("common here") e := base.insertCommon() // For `insert|replace into set x=y`, process the set list here. if err = e.processSetList(); err != nil { From 76253900d1ee03dba6db80c7fba99f50f060a702 Mon Sep 17 00:00:00 2001 From: xuhuaiyu <391585975@qq.com> Date: Mon, 30 Dec 2019 11:27:51 +0800 Subject: [PATCH 24/24] remove useless code --- executor/insert_common.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/executor/insert_common.go b/executor/insert_common.go index d253b00c6a906..3f917c4701aa4 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -31,7 +31,6 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" - "github.com/sirupsen/logrus" "go.uber.org/zap" ) @@ -199,7 +198,6 @@ func (e *InsertValues) processSetList() error { // insertRows processes `insert|replace into values ()` or `insert|replace into set x=y` func insertRows(ctx context.Context, base insertCommon) (err error) { - logrus.Warning("common here") e := base.insertCommon() // For `insert|replace into set x=y`, process the set list here. if err = e.processSetList(); err != nil {