Skip to content

Commit

Permalink
*: support memTracker.detach for HashJoin, Apply and IndexLookUp in C…
Browse files Browse the repository at this point in the history
…lose func (#54095) (#54261)

close #54005
  • Loading branch information
ti-chi-bot authored Jul 11, 2024
1 parent b90e57a commit 767c75a
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 10 deletions.
12 changes: 10 additions & 2 deletions pkg/executor/aggregate/agg_hash_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,11 @@ func (e *HashAggExec) initForUnparallelExec() {

e.tmpChkForSpill = exec.TryNewCacheChunk(e.Children(0))
if vars := e.Ctx().GetSessionVars(); vars.TrackAggregateMemoryUsage && variable.EnableTmpStorageOnOOM.Load() {
e.diskTracker = disk.NewTracker(e.ID(), -1)
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.diskTracker.AttachTo(vars.StmtCtx.DiskTracker)
e.dataInDisk.GetDiskTracker().AttachTo(e.diskTracker)
vars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
Expand Down Expand Up @@ -396,7 +400,11 @@ func (e *HashAggExec) initForParallelExec(ctx sessionctx.Context) error {
}, spillChunkFieldTypes)

if isTrackerEnabled && isParallelHashAggSpillEnabled {
e.diskTracker = disk.NewTracker(e.ID(), -1)
if e.diskTracker != nil {
e.diskTracker.Reset()
} else {
e.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.diskTracker.AttachTo(sessionVars.StmtCtx.DiskTracker)
e.spillHelper.diskTracker = e.diskTracker
sessionVars.MemTracker.FallbackOldAndSetNewActionForSoftLimit(e.ActionSpill())
Expand Down
7 changes: 5 additions & 2 deletions pkg/executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,11 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
// constructed by a "IndexLookUpJoin" and "Open" will not be called in that
// situation.
e.initRuntimeStats()
e.memTracker = memory.NewTracker(e.ID(), -1)
if e.memTracker != nil {
e.memTracker.Reset()
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.finished = make(chan struct{})
Expand Down Expand Up @@ -858,7 +862,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker = nil
e.resultCurr = nil
return nil
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (e *HashJoinExec) Close() error {
}
e.probeSideTupleFetcher.probeChkResourceCh = nil
terror.Call(e.rowContainer.Close)
e.hashJoinCtx.sessCtx.GetSessionVars().MemTracker.UnbindActionFromHardLimit(e.rowContainer.ActionSpill())
e.waiterWg.Wait()
}
e.outerMatchedStatus = e.outerMatchedStatus[:0]
Expand Down Expand Up @@ -214,8 +215,12 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
}
e.hashJoinCtx.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)

e.diskTracker = disk.NewTracker(e.ID(), -1)
e.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)
if e.hashJoinCtx.diskTracker != nil {
e.hashJoinCtx.diskTracker.Reset()
} else {
e.hashJoinCtx.diskTracker = disk.NewTracker(e.ID(), -1)
}
e.hashJoinCtx.diskTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.DiskTracker)

e.workerWg = util.WaitGroupWrapper{}
e.waiterWg = util.WaitGroupWrapper{}
Expand Down Expand Up @@ -1468,7 +1473,7 @@ func (e *NestedLoopApplyExec) fetchAllInners(ctx context.Context) error {

if e.canUseCache {
// create a new one since it may be in the cache
e.innerList = chunk.NewList(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize())
e.innerList = chunk.NewListWithMemTracker(exec.RetTypes(e.innerExec), e.InitCap(), e.MaxChunkSize(), e.innerList.GetMemTracker())
} else {
e.innerList.Reset()
}
Expand Down
11 changes: 8 additions & 3 deletions pkg/util/chunk/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,23 @@ type RowPtr struct {
RowIdx uint32
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
// NewListWithMemTracker creates a new List with field types, init chunk size, max chunk size and memory tracker.
func NewListWithMemTracker(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int, tracker *memory.Tracker) *List {
l := &List{
fieldTypes: fieldTypes,
initChunkSize: initChunkSize,
maxChunkSize: maxChunkSize,
memTracker: memory.NewTracker(memory.LabelForChunkList, -1),
memTracker: tracker,
consumedIdx: -1,
}
return l
}

// NewList creates a new List with field types, init chunk size and max chunk size.
func NewList(fieldTypes []*types.FieldType, initChunkSize, maxChunkSize int) *List {
return NewListWithMemTracker(fieldTypes, initChunkSize, maxChunkSize, memory.NewTracker(memory.LabelForChunkList, -1))
}

// GetMemTracker returns the memory tracker of this List.
func (l *List) GetMemTracker() *memory.Tracker {
return l.memTracker
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,27 @@ func (t *Tracker) UnbindActions() {
t.actionMuForHardLimit.actionOnExceed = &LogOnExceed{}
}

// UnbindActionFromHardLimit unbinds action from hardLimit.
func (t *Tracker) UnbindActionFromHardLimit(actionToUnbind ActionOnExceed) {
t.actionMuForHardLimit.Lock()
defer t.actionMuForHardLimit.Unlock()

var prev ActionOnExceed
for current := t.actionMuForHardLimit.actionOnExceed; current != nil; current = current.GetFallback() {
if current == actionToUnbind {
if prev == nil {
// actionToUnbind is the first element
t.actionMuForHardLimit.actionOnExceed = current.GetFallback()
} else {
// actionToUnbind is not the first element
prev.SetFallback(current.GetFallback())
}
break
}
prev = current
}
}

// reArrangeFallback merge two action chains and rearrange them by priority in descending order.
func reArrangeFallback(a ActionOnExceed, b ActionOnExceed) ActionOnExceed {
if a == nil {
Expand Down

0 comments on commit 767c75a

Please sign in to comment.