-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: support spill intermediate data for unparalleled hash agg #25714
Conversation
Simple Benchmarkcpu: AMD Ryzen 7 3700X 8-Core Processor
Memory usage testWorkload: tpch-sf=3g 1tidb 1tikv 1pd
Memory usage in Grafana montior |
return e.spillAction | ||
} | ||
|
||
const maxSpillTimes = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a comment for this
executor/aggregate.go
Outdated
listInDisk *chunk.ListInDisk | ||
lastChunkNum int | ||
processIdx int | ||
spillMode uint32 | ||
spillChunk *chunk.Chunk | ||
spillAction *AggSpillDiskAction | ||
childDrained bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need comments for these variables
executor/aggregate.go
Outdated
return err | ||
} | ||
} | ||
if e.spillAction != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Useless.. I remove the code now.
executor/aggregate.go
Outdated
@@ -233,6 +243,15 @@ func (e *HashAggExec) Close() error { | |||
if e.memTracker != nil { | |||
e.memTracker.ReplaceBytesUsed(0) | |||
} | |||
if e.listInDisk != nil { | |||
if err := e.listInDisk.Close(); err != nil { | |||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we close the chilrenExec? This may cause leaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
executor/aggregate.go
Outdated
e.executed, e.childDrained = false, false | ||
e.listInDisk = chunk.NewListInDisk(retTypes(e.children[0])) | ||
e.spillChunk = newFirstChunk(e.children[0]) | ||
if e.ctx.GetSessionVars().TrackAggregateMemoryUsage { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to check this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If tidb doesn't track aggregate executor memory usgae, should we also try to spill hashAgg when exceeded?
In addition, oom-use-tmp-storage also should be check... I add the check now. PTAL again.
executor/aggregate.go
Outdated
@@ -922,6 +974,17 @@ func (e *HashAggExec) execute(ctx context.Context) (err error) { | |||
for j := 0; j < e.childResult.NumRows(); j++ { | |||
groupKey := string(e.groupKeyBuffer[j]) // do memory copy here, because e.groupKeyBuffer may be reused. | |||
if !e.groupSet.Exist(groupKey) { | |||
if atomic.LoadUint32(&e.spillMode) == 1 && e.groupSet.Count() > 0 { | |||
e.spillChunk.Append(e.childResult, j, j+1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Chunk.sel
to optimize this if-block?
- We can check
e.groupSet.Exist(groupKey)
and build thesel
firstly, and then invokee.spillChunk.Append
based on thesel
. - Further, if
len(sel) == len(e.childResult)
, we can invokee.listInDisk.Add(e.childResult)
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed.
executor/aggregate.go
Outdated
|
||
// spill unprocessed data when exceeded. | ||
if len(sel) > 0 { | ||
err = e.spillUnprocessedData(sel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The input argument sel
is useless?
e.childResult.SetSel(sel)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
e.childResult.SetSel(sel)
will let len(sel) == len(e.childResult)
always true, and e.listInDisk.Add(e.childResult)
directly. If there are only a few elements in sel, it maybe have performance issue.
I remove the logic e.listInDisk.Add(e.childResult)
and always append to tmpChkForSpill, PTAL
executor/aggregate.go
Outdated
listInDisk *chunk.ListInDisk // listInDisk is the chunks to store row values for spilling data. | ||
lastChunkNum int // lastChunkNum indicates the num of spilling chunk. | ||
processIdx int // processIdx indicates the num of processed chunk in disk. | ||
spillMode uint32 // spillMode means that no new groups are added to hash table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- isSpillModeSet?
- Add an explanation for what does
0
and1
mean
Co-authored-by: HuaiyuXu <xuhuaiyu@pingcap.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: ffbcf52
|
@wshwsh12: Your PR was out of date, I have automatically updated it for you. At the same time I will also trigger all tests for you: /run-all-tests If the CI test fails, you just re-trigger the test that failed and the bot will merge the PR for you after the CI passes. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
/run-unit-test |
What problem does this PR solve?
Issue Number: close #xxx
Problem Summary:
What is changed and how it works?
Proposal: Design
What's Changed:
Based PR #25820, introduce soft limit.
Support spilling intermediate date for unparalleled hashAgg.
How it Works:
a. If the key exists in the Map, aggreagte the result.
b. If the key doesn't exist in the Map, spill the data to disk.
Related changes
pingcap/docs
/pingcap/docs-cn
:Check List
Tests
Set AggregateConcurrency to 1 and run all correctness test in tidb repo.
Side effects
Release note