Skip to content

Commit

Permalink
*: add memory tracker for InsertExec and ReplaceExec (#14179)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored Dec 30, 2019
1 parent ca9ecf9 commit c1bc9ff
Show file tree
Hide file tree
Showing 18 changed files with 245 additions and 119 deletions.
2 changes: 1 addition & 1 deletion ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3265,7 +3265,7 @@ func (s *testDBSuite4) TestAddColumn2(c *C) {
c.Assert(err, IsNil)
_, err = writeOnlyTable.AddRecord(s.tk.Se, types.MakeDatums(oldRow[0].GetInt64(), 2, oldRow[2].GetInt64()), table.IsUpdate)
c.Assert(err, IsNil)
err = s.tk.Se.StmtCommit()
err = s.tk.Se.StmtCommit(nil)
c.Assert(err, IsNil)
err = s.tk.Se.CommitTxn(ctx)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (e *DeleteExec) deleteSingleTableByChunk(ctx context.Context) error {

for chunkRow := iter.Begin(); chunkRow != iter.End(); chunkRow = iter.Next() {
if batchDelete && rowCount >= batchDMLSize {
if err = e.ctx.StmtCommit(); err != nil {
if err = e.ctx.StmtCommit(nil); err != nil {
return err
}
if err = e.ctx.NewTxn(ctx); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
e.tblWorkerWg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
workerID := i
worker := &tableWorker{
idxLookup: e,
workCh: workCh,
Expand All @@ -517,7 +518,7 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
checkIndexValue: e.checkIndexValue,
memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }),
memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(workerID) }),
e.ctx.GetSessionVars().MemQuotaIndexLookupReader),
}
worker.memTracker.AttachTo(e.memTracker)
Expand Down
113 changes: 14 additions & 99 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -73,8 +72,6 @@ import (
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func TestT(t *testing.T) {
Expand Down Expand Up @@ -112,7 +109,6 @@ var _ = Suite(&testSuite8{&baseTestSuite{}})
var _ = SerialSuites(&testShowStatsSuite{&baseTestSuite{}})
var _ = Suite(&testBypassSuite{})
var _ = Suite(&testUpdateSuite{})
var _ = Suite(&testOOMSuite{})
var _ = Suite(&testPointGetSuite{})
var _ = Suite(&testBatchPointGetSuite{})
var _ = SerialSuites(&testRecoverTable{})
Expand Down Expand Up @@ -538,7 +534,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
}
ld.SetMessage()
tk.CheckLastMessage(tt.expectedMsg)
err := ctx.StmtCommit()
err := ctx.StmtCommit(nil)
c.Assert(err, IsNil)
txn, err := ctx.Txn(true)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -4416,72 +4412,6 @@ func (s *testSuiteP2) TestUnsignedFeedback(c *C) {
c.Assert(result.Rows()[2][3], Equals, "table:t, range:[0,+inf], keep order:false")
}

type testOOMSuite struct {
store kv.Storage
do *domain.Domain
oom *oomCapturer
}

func (s *testOOMSuite) SetUpSuite(c *C) {
c.Skip("log.ReplaceGlobals(lg, r) in registerHook() may result in data race")
testleak.BeforeTest()
s.registerHook()
var err error
s.store, err = mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
session.SetSchemaLease(0)
domain.RunAutoAnalyze = false
s.do, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testOOMSuite) registerHook() {
conf := &log.Config{Level: os.Getenv("log_level"), File: log.FileLogConfig{}}
_, r, _ := log.InitLogger(conf)
s.oom = &oomCapturer{r.Core, ""}
lg := zap.New(s.oom)
log.ReplaceGlobals(lg, r)
}

func (s *testOOMSuite) TestDistSQLMemoryControl(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int, a int, b int, index idx_a(`a`))")
tk.MustExec("insert into t values (1,1,1), (2,2,2), (3,3,3)")

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "TableReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select a from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select a from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexReaderDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1

s.oom.tracker = ""
tk.MustQuery("select * from t")
c.Assert(s.oom.tracker, Equals, "")
tk.Se.GetSessionVars().MemQuotaDistSQL = 1
tk.MustQuery("select * from t use index(idx_a)")
c.Assert(s.oom.tracker, Equals, "IndexLookupDistSQLTracker")
tk.Se.GetSessionVars().MemQuotaDistSQL = -1
}

func setOOMAction(action string) {
old := config.GetGlobalConfig()
newConf := *old
newConf.OOMAction = action
config.StoreGlobalConfig(&newConf)
}

func (s *testSuite) TestOOMPanicAction(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand All @@ -4507,40 +4437,25 @@ func (s *testSuite) TestOOMPanicAction(c *C) {
tk.MustExec("drop table if exists t,t1")
tk.MustExec("create table t (a bigint);")
tk.MustExec("create table t1 (a bigint);")
tk.MustExec("set @@tidb_mem_quota_query=200;")
_, err = tk.Exec("insert into t1 values (1),(2),(3),(4),(5);")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
_, err = tk.Exec("replace into t1 values (1),(2),(3),(4),(5);")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
tk.MustExec("set @@tidb_mem_quota_query=10000")
tk.MustExec("insert into t1 values (1),(2),(3),(4),(5);")
tk.MustExec("set @@tidb_mem_quota_query=200;")
_, err = tk.Exec("insert into t select a from t1 order by a desc;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
_, err = tk.Exec("replace into t select a from t1 order by a desc;")
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}

type oomCapturer struct {
zapcore.Core
tracker string
}

func (h *oomCapturer) Write(entry zapcore.Entry, fields []zapcore.Field) error {
if strings.Contains(entry.Message, "memory exceeds quota") {
err, _ := fields[0].Interface.(error)
str := err.Error()
begin := strings.Index(str, "8001]")
if begin == -1 {
panic("begin not found")
}
end := strings.Index(str, " holds")
if end == -1 {
panic("end not found")
}
h.tracker = str[begin+len("8001]") : end]
}
return nil
}

func (h *oomCapturer) Check(e zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if h.Enabled(e.Level) {
return ce.AddCore(e, h)
}
return ce
func setOOMAction(action string) {
old := config.GetGlobalConfig()
newConf := *old
newConf.OOMAction = action
config.StoreGlobalConfig(&newConf)
}

type testRecoverTable struct {
Expand Down
5 changes: 5 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -86,6 +87,7 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error {
}
}
}
e.memTracker.Consume(int64(txn.Size()))
return nil
}

Expand Down Expand Up @@ -266,6 +268,9 @@ func (e *InsertExec) Close() error {

// Open implements the Executor Open interface.
func (e *InsertExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

if e.OnDuplicate != nil {
e.initEvalBuffer4Dup()
}
Expand Down
40 changes: 35 additions & 5 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -68,6 +69,7 @@ type InsertValues struct {
// Other statements like `insert select from` don't guarantee consecutive autoID.
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
memTracker *memory.Tracker
}

type defaultVal struct {
Expand All @@ -85,7 +87,7 @@ func (e *InsertValues) insertCommon() *InsertValues {
return e
}

func (e *InsertValues) exec(ctx context.Context, rows [][]types.Datum) error {
func (e *InsertValues) exec(_ context.Context, _ [][]types.Datum) error {
panic("derived should overload exec function")
}

Expand Down Expand Up @@ -212,6 +214,8 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
}

rows := make([][]types.Datum, 0, len(e.Lists))
memUsageOfRows := int64(0)
memTracker := e.memTracker
for i, list := range e.Lists {
e.rowCount++
var row []types.Datum
Expand All @@ -221,6 +225,8 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
}
rows = append(rows, row)
if batchInsert && e.rowCount%uint64(batchSize) == 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
// Before batch insert, fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
Expand All @@ -230,17 +236,29 @@ func insertRows(ctx context.Context, base insertCommon) (err error) {
return err
}
rows = rows[:0]
memTracker.Consume(-memUsageOfRows)
memUsageOfRows = 0
if err = e.doBatchInsert(ctx); err != nil {
return err
}
}
}
if len(rows) != 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
}
// Fill the batch allocated autoIDs.
rows, err = e.lazyAdjustAutoIncrementDatum(ctx, rows)
if err != nil {
return err
}
return base.exec(ctx, rows)
err = base.exec(ctx, rows)
if err != nil {
return err
}
rows = rows[:0]
memTracker.Consume(-memUsageOfRows)
return nil
}

func (e *InsertValues) handleErr(col *table.Column, val *types.Datum, rowIdx int, err error) error {
Expand Down Expand Up @@ -385,7 +403,8 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
}
batchInsert := sessVars.BatchInsert && !sessVars.InTxn() && config.GetGlobalConfig().EnableBatchDML
batchSize := sessVars.DMLBatchSize

memUsageOfRows := int64(0)
memTracker := e.memTracker
for {
err := Next(ctx, selectExec, chk)
if err != nil {
Expand All @@ -394,7 +413,8 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
if chk.NumRows() == 0 {
break
}

chkMemUsage := chk.MemoryUsage()
memTracker.Consume(chkMemUsage)
for innerChunkRow := iter.Begin(); innerChunkRow != iter.End(); innerChunkRow = iter.Next() {
innerRow := innerChunkRow.GetDatumRow(fields)
e.rowCount++
Expand All @@ -404,28 +424,38 @@ func insertRowsFromSelect(ctx context.Context, base insertCommon) error {
}
rows = append(rows, row)
if batchInsert && e.rowCount%uint64(batchSize) == 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
if err = base.exec(ctx, rows); err != nil {
return err
}
rows = rows[:0]
memTracker.Consume(-memUsageOfRows)
memUsageOfRows = 0
if err = e.doBatchInsert(ctx); err != nil {
return err
}
}
}

if len(rows) != 0 {
memUsageOfRows = types.EstimatedMemUsage(rows[0], len(rows))
memTracker.Consume(memUsageOfRows)
}
err = base.exec(ctx, rows)
if err != nil {
return err
}
rows = rows[:0]
memTracker.Consume(-memUsageOfRows)
memTracker.Consume(-chkMemUsage)
}
return nil
}

func (e *InsertValues) doBatchInsert(ctx context.Context) error {
sessVars := e.ctx.GetSessionVars()
if err := e.ctx.StmtCommit(); err != nil {
if err := e.ctx.StmtCommit(e.memTracker); err != nil {
return err
}
if err := e.ctx.NewTxn(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (e *LoadDataInfo) CommitOneTask(ctx context.Context, task CommitTask) error
failpoint.Inject("commitOneTaskErr", func() error {
return errors.New("mock commit one task error")
})
if err = e.Ctx.StmtCommit(); err != nil {
if err = e.Ctx.StmtCommit(nil); err != nil {
logutil.Logger(ctx).Error("commit error commit", zap.Error(err))
return err
}
Expand Down
5 changes: 5 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

Expand All @@ -45,6 +46,9 @@ func (e *ReplaceExec) Close() error {

// Open implements the Executor Open interface.
func (e *ReplaceExec) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

if e.SelectExec != nil {
return e.SelectExec.Open(ctx)
}
Expand Down Expand Up @@ -202,6 +206,7 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
return err
}
}
e.memTracker.Consume(int64(txn.Size()))
return nil
}

Expand Down
Loading

0 comments on commit c1bc9ff

Please sign in to comment.