Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: make prepare analyze killed globally instead of query quota #44352

Merged
merged 3 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2092,10 +2092,20 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.MemTracker.SetBytesLimit(vars.MemQuotaQuery)
vars.MemTracker.ResetMaxConsumed()
vars.DiskTracker.ResetMaxConsumed()
vars.MemTracker.SessionID = vars.ConnectionID
vars.MemTracker.SessionID.Store(vars.ConnectionID)
vars.StmtCtx.TableStats = make(map[int64]interface{})

if _, ok := s.(*ast.AnalyzeTableStmt); ok {
isAnalyze := false
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
_, isAnalyze = prepareStmt.PreparedAst.Stmt.(*ast.AnalyzeTableStmt)
} else if _, ok := s.(*ast.AnalyzeTableStmt); ok {
isAnalyze = true
}
if isAnalyze {
sc.InitMemTracker(memory.LabelForAnalyzeMemory, -1)
vars.MemTracker.SetBytesLimit(-1)
vars.MemTracker.AttachTo(GlobalAnalyzeMemoryTracker)
Expand All @@ -2115,7 +2125,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
action.SetLogHook(logOnQueryExceedMemQuota)
vars.MemTracker.SetActionOnExceed(action)
}
sc.MemTracker.SessionID = vars.ConnectionID
sc.MemTracker.SessionID.Store(vars.ConnectionID)
sc.MemTracker.AttachTo(vars.MemTracker)
sc.InitDiskTracker(memory.LabelForSQLText, -1)
globalConfig := config.GetGlobalConfig()
Expand Down
43 changes: 43 additions & 0 deletions executor/test/analyzetest/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3086,6 +3086,49 @@ func TestGlobalMemoryControlForAnalyze(t *testing.T) {
tk0.MustExec(sql)
}

func TestGlobalMemoryControlForPrepareAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_mem_quota_query = 209715200 ") // 200MB
tk0.MustExec("set global tidb_server_memory_limit = 5GB")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk0.Session().ShowProcess()},
}
dom.ServerMemoryLimitHandle().SetSessionManager(sm)
go dom.ServerMemoryLimitHandle().Run()

tk0.MustExec("use test")
tk0.MustExec("create table t(a int)")
tk0.MustExec("insert into t select 1")
for i := 1; i <= 8; i++ {
tk0.MustExec("insert into t select * from t") // 256 Lines
}
sqlPrepare := "prepare stmt from 'analyze table t with 1.0 samplerate';"
sqlExecute := "execute stmt;" // Need about 100MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/util/memory/ReadMemStats", `return(536870912)`)) // 512MB
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume", `return(100)`))
// won't be killed by tidb_mem_quota_query
tk0.MustExec(sqlPrepare)
tk0.MustExec(sqlExecute)
runtime.GC()
// killed by tidb_server_memory_limit
tk0.MustExec("set global tidb_server_memory_limit = 512MB")
_, err0 := tk0.Exec(sqlPrepare)
require.NoError(t, err0)
_, err1 := tk0.Exec(sqlExecute)
// Killed and the WarnMsg is WarnMsgSuffixForInstance instead of WarnMsgSuffixForSingleQuery
require.True(t, strings.Contains(err1.Error(), memory.PanicMemoryExceedWarnMsg+memory.WarnMsgSuffixForInstance))
runtime.GC()
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/util/memory/ReadMemStats"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockAnalyzeMergeWorkerSlowConsume"))
tk0.MustExec(sqlPrepare)
tk0.MustExec(sqlExecute)
}

func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
Expand Down
2 changes: 1 addition & 1 deletion executor/test/issuetest/executor_issue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func TestIssue42662(t *testing.T) {
tk := testkit.NewTestKit(t, store)
tk.Session().GetSessionVars().ConnectionID = 12345
tk.Session().GetSessionVars().MemTracker = memory.NewTracker(memory.LabelForSession, -1)
tk.Session().GetSessionVars().MemTracker.SessionID = 12345
tk.Session().GetSessionVars().MemTracker.SessionID.Store(12345)
tk.Session().GetSessionVars().MemTracker.IsRootTrackerOfSess = true

sm := &testkit.MockSessionManager{
Expand Down
2 changes: 1 addition & 1 deletion util/memory/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (a *PanicOnExceed) Action(t *Tracker) {
if !a.acted {
if a.logHook == nil {
logutil.BgLogger().Warn("memory exceeds quota",
zap.Uint64("conn", t.SessionID), zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String())))
zap.Uint64("conn", t.SessionID.Load()), zap.Error(errMemExceedThreshold.GenWithStackByArgs(t.label, t.BytesConsumed(), t.GetBytesLimit(), t.String())))
} else {
a.logHook(a.ConnID)
}
Expand Down
2 changes: 1 addition & 1 deletion util/memory/memstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func ReadMemStats() (memStats *runtime.MemStats) {
}
failpoint.Inject("ReadMemStats", func(val failpoint.Value) {
injectedSize := val.(int)
memStats.HeapInuse += uint64(injectedSize)
memStats = &runtime.MemStats{HeapInuse: memStats.HeapInuse + uint64(injectedSize)}
})
return
}
Expand Down
12 changes: 6 additions & 6 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ type Tracker struct {
}
label int // Label of this "Tracker".
// following fields are used with atomic operations, so make them 64-byte aligned.
bytesConsumed int64 // Consumed bytes.
bytesReleased int64 // Released bytes.
maxConsumed atomicutil.Int64 // max number of bytes consumed during execution.
SessionID uint64 // SessionID indicates the sessionID the tracker is bound.
NeedKill atomic.Bool // NeedKill indicates whether this session need kill because OOM
bytesConsumed int64 // Consumed bytes.
bytesReleased int64 // Released bytes.
maxConsumed atomicutil.Int64 // max number of bytes consumed during execution.
SessionID atomicutil.Uint64 // SessionID indicates the sessionID the tracker is bound.
Copy link
Contributor Author

@chrysan chrysan Jun 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is to fix race. cc @XuHuaiyu

NeedKill atomic.Bool // NeedKill indicates whether this session need kill because OOM
hawkingrei marked this conversation as resolved.
Show resolved Hide resolved
NeedKillReceived sync.Once
IsRootTrackerOfSess bool // IsRootTrackerOfSess indicates whether this tracker is bound for session
isGlobal bool // isGlobal indicates whether this tracker is global tracker
Expand Down Expand Up @@ -462,7 +462,7 @@ func (t *Tracker) Consume(bs int64) {
sessionRootTracker.NeedKillReceived.Do(
func() {
logutil.BgLogger().Warn("global memory controller, NeedKill signal is received successfully",
zap.Uint64("conn", sessionRootTracker.SessionID))
zap.Uint64("conn", sessionRootTracker.SessionID.Load()))
})
tryActionLastOne(&sessionRootTracker.actionMuForHardLimit, sessionRootTracker)
}
Expand Down
5 changes: 3 additions & 2 deletions util/servermemorylimit/servermemorylimit.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,13 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) {
if instanceStats.HeapInuse > bt {
t := memory.MemUsageTop1Tracker.Load()
if t != nil {
sessionID := t.SessionID.Load()
memUsage := t.BytesConsumed()
// If the memory usage of the top1 session is less than tidb_server_memory_limit_sess_min_size, we do not need to kill it.
if uint64(memUsage) < limitSessMinSize {
memory.MemUsageTop1Tracker.CompareAndSwap(t, nil)
t = nil
} else if info, ok := sm.GetProcessInfo(t.SessionID); ok {
} else if info, ok := sm.GetProcessInfo(sessionID); ok {
logutil.BgLogger().Warn("global memory controller tries to kill the top1 memory consumer",
zap.Uint64("conn", info.ID),
zap.String("sql digest", info.Digest),
Expand All @@ -152,7 +153,7 @@ func killSessIfNeeded(s *sessionToBeKilled, bt uint64, sm util.SessionManager) {
zap.Uint64("heap inuse", instanceStats.HeapInuse),
zap.Int64("sql memory usage", info.MemTracker.BytesConsumed()),
)
s.sessionID = t.SessionID
s.sessionID = sessionID
s.sqlStartTime = info.Time
s.isKilling = true
s.sessionTracker = t
Expand Down