-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement segmented aggregation execution #17886
Conversation
e810b55
to
f805c44
Compare
b37f125
to
0062065
Compare
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
b0dcf74
to
96c95ce
Compare
// If the current segment ends in the current page, flush it with all the segments (if exist) except the last segment of the current page. | ||
int lastSegmentStart = findLastSegmentStart(preGroupedHashStrategy.get(), page.extractChannels(preGroupedChannels)); | ||
unfinishedWork = aggregationBuilder.processPage(page.getRegion(0, lastSegmentStart)); | ||
remainingPage = page.getRegion(0, lastRowInPage - lastSegmentStart + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't the remainingPage be page.getRegion(lastSegmentStart, lastRowInPage - lastSegmentStart + 1);
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this indicates that our test coverage is not enough and didn't catch this issue, we need to enhance our test coverage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added all the tests I can think of so far. The correctness will be well captured.
6701a7c
to
dd31cf4
Compare
I just tested again and found out why our test unit didn't catch the bug above. In the test, we only tested with one page. We need to add tests to cover multiple pages, let's test all the scenariors we discussed above On top of my mind (but not limited to) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another point: let's separate out an indipendent test class TestSegmentedHashAggregationOperator
given our test cases are growing and also I think we would be expecting more and more segmented aggregation specific test to be added in the future. But it's up to you if you want to do it in this PR
// Record the last segment. | ||
firstUnfinishedSegment = page.getRegion(lastRowInPage, 1); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle the smaller branch first and return to avoid indents for the larger branch. Reducing indents can improve the readability of the code.
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
// If the current segment might have more data in the incoming pages, process the whole page. | ||
unfinishedWork = aggregationBuilder.processPage(page); | ||
} | ||
else if (preGroupedHashStrategy.get().rowEqualsRow(0, firstUnfinishedSegment.extractChannels(preGroupedChannels), 0, page.extractChannels(preGroupedChannels))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is the first page, line 504 is executed, no need to make this comparison. Please try to avoid it in that case.
firstUnfinishedSegment = page.getRegion(0, 1); | ||
} | ||
|
||
if (preGroupedHashStrategy.get().rowEqualsRow(0, firstUnfinishedSegment.extractChannels(preGroupedChannels), lastRowInPage, page.extractChannels(preGroupedChannels))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an assumption that the preGroupedChannels must be a subset of the groupbyChannels. Maybe better to add a check for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is guaranteed during planning. The operator should be able to trust the generated plan right?
firstUnfinishedSegment = page.getRegion(0, 1); | ||
} | ||
|
||
if (preGroupedHashStrategy.get().rowEqualsRow(0, firstUnfinishedSegment.extractChannels(preGroupedChannels), lastRowInPage, page.extractChannels(preGroupedChannels))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extractChannels is called many times in this function. let's call it once and reuse the result.
unfinishedWork = aggregationBuilder.processPage(page); | ||
} | ||
else if (preGroupedHashStrategy.get().rowEqualsRow(0, firstUnfinishedSegment.extractChannels(preGroupedChannels), 0, page.extractChannels(preGroupedChannels))) { | ||
// If the current page starts with a new segment, flush before processing it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we are here if the first row in the page is same as the last unfinished segment. In that case, the current page doesn't not start with a new segment, right? Did I miss anything?
remainingPageForSegmentedAggregation = page.getRegion(lastSegmentStart, lastRowInPage - lastSegmentStart + 1); | ||
} | ||
// Record the last segment. | ||
firstUnfinishedSegment = page.getRegion(lastRowInPage, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that lastRowInPage is never updated. So this firstUnfinishedSegment is always the last row in the page? What does it mean? Could you add member field comments for the new fields added in this PR?
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Show resolved
Hide resolved
High level design looks good. Just left some comments on the implementation details. |
There are some design details me and Ke would like to clarify: For example, Say we have 3 pages:
The segments will be [1, 1, 1], [2, 2, 2, 3, 4], [5, 5], [6, 6], [7, 7, 7]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few NIT, otherwise LGTM
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
Let's open an issue as a followup? introduce a threshold config / session property to control the flush timing of segmented aggregation. With that, we can tune the config to get a balance between memory and latency. |
|
private final List<Integer> groupByChannels; | ||
private final List<Integer> preGroupedChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't these two serving the same purpose except that one is sorted and the other is not? Can we merge these two into one and use something else (like a flag) to indicate the difference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The channels of preGroupedChannels
are already sorted, while not all channels in groupByChannels
are. preGroupedChannels
is a subset of groupByChannels
so they don't contains exactly the same elements. I think it makes sense to keep them separated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is the case, highly recommend to add a comment to explain. Also can we check preGroupedChannels
is fully contained in groupByChannels
.
if (preGroupedChannels.isEmpty()) { | ||
preGroupedHashStrategy = Optional.empty(); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many if/else in the logic branching. Check my other comment below
if (!preGroupedHashStrategy.isPresent()) { | ||
unfinishedWork = aggregationBuilder.processPage(page); | ||
return; | ||
} | ||
|
||
// 2. segmented aggregation | ||
if (firstUnfinishedSegment == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. Whether to use pre-grouped channel or not is actually determined at the beginning of the execution or planning phase. There is no need to check if/else for every page input. Instead, it would be good to abstract the design a bit. For example, can we have a base abstract hash operator with two implementations: Hash and Segmented. Then we can leave the branches within the different implementations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether to use pre-grouped channel or not is actually determined at the beginning of the execution or planning phase. There is no need to check if/else for every page input.
Great point.
can we have a base abstract hash operator with two implementations: Hash and Segmented.
I thought about this idea as well before implementing the PR. Not sure if it's worth it to restructure the whole operator given that Segmented aggregation is essentially still Hash aggregation just with a few tricks. The difference is not that big. WDYT? @kewang1024
// The whole page is in one segment. | ||
if (preGroupedHashStrategy.get().rowEqualsRow(0, firstUnfinishedSegment.extractChannels(preGroupedChannels), 0, pageOnPreGroupedChannels)) { | ||
// All rows in this page belong to the previous unfinished segment, process the whole page. | ||
unfinishedWork = aggregationBuilder.processPage(page); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assume we don't have to care about the pages for a segment once it's done right? Do we need to close the aggregationBuilder
to clear the memory after having processed a segment? It would be good to reflect the memory usage in your benchmark as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to close the aggregationBuilder to clear the memory after having processed a segment
Yes, that how it is implemented currently. Once it has fully processed at least one segments, the aggregationBuilder
will be closed and rebuilt if there are more segments to process.
to reflect the memory usage in your benchmark as well.
Regarding the memory usage, thought the memory comparison has been covered from the manual test attached in the PR description. Could you please elaborate a bit how to run benchmark against the memory usage? Any pointers I can refer to? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- segmented hash based aggregation itself doesn't have too much specific logic and heavily share hash based aggregation logic, only separating it out would still look a bit hacky
- currently hash aggregation operator is intertwined with a lot of other features (spilling, partial aggregation mode, etc) in a hacky fashion, in order to do the refactor properly, we need to come up with a better design to refactor
HashAggregationOperatorFactory
as a whole considering all the features that's currently inside this class, which could be a non-trivial amount of work
What I think could be the next steps
- Fix the current code to be more concise and reduce nested if statement
- Systematically refactor
HashAggregationOperatorFactory
as a followup
if (remainingPageForSegmentedAggregation != null) { | ||
// Running in segmented aggregation mode, reopen the aggregation builder and process the remaining page. | ||
initializeAggregationBuilderIfNeeded(); | ||
unfinishedWork = aggregationBuilder.processPage(remainingPageForSegmentedAggregation); | ||
remainingPageForSegmentedAggregation = null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract it to a private function to
- reduce the nested if statement
- we can reuse this function when we introduce the threshold
private void processRemainingPageForSegmentedAggregation()
{
// Running in segmented aggregation mode, reopen the aggregation builder and process the remaining page.
if (remainingPageForSegmentedAggregation != null) {
initializeAggregationBuilderIfNeeded();
unfinishedWork = aggregationBuilder.processPage(remainingPageForSegmentedAggregation);
remainingPageForSegmentedAggregation = null;
}
}
if (preGroupedChannels.isEmpty()) { | ||
preGroupedHashStrategy = Optional.empty(); | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: make it more concise
preGroupedHashStrategy = preGroupedChannels.isEmpty() ? Optional.empty() : Optional.of(joinCompiler.compilePagesHashStrategyFactory(
preGroupedChannels.stream().map(channel -> groupByTypes.get(channel)).collect(toImmutableList()),
preGroupedChannels,
Optional.empty()).createPagesHashStrategy(groupByTypes.stream().map(type -> ImmutableList.<Block>of()).collect(toImmutableList()), OptionalInt.empty()));
186bcfd
to
315023f
Compare
Changing it back to linear search from binary, as this no improvement observed and it's more readable and clean. cc: @kewang1024 |
Attach the issue where we introduce a config to tune between linear and binary search, it could be a good bootcamp task |
private final List<Integer> groupByChannels; | ||
private final List<Integer> preGroupedChannels; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that is the case, highly recommend to add a comment to explain. Also can we check preGroupedChannels
is fully contained in groupByChannels
.
this.preGroupedHashStrategy = preGroupedChannels.isEmpty() | ||
? Optional.empty() | ||
: Optional.of(joinCompiler.compilePagesHashStrategyFactory( | ||
preGroupedChannels.stream().map(channel -> groupByTypes.get(channel)).collect(toImmutableList()), preGroupedChannels, Optional.empty()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: groupByTypes::get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Checking if preGroupedChannels
is fully contained in groupByChannels
might not be necessary. It has been checked thoroughly during the planning phase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be really good to check the containment in the constructor. Planner and execution are usually separated. Just like Presto frontend and Velox backend. The modularization would break the streamlined assumption. Also, it would make it much clearer when we read the code to understand the logic.
Splitted from #17618
When running in segmented aggregation mode, a segment is finished, we need to close the aggregation builder, destroy the hash table, then reopen the aggregation builder and recreate the hash table. If the segments are very small, this process have to be repeated too many times, resulting in overhead costs.
To address the issue, we adjust the design - for each page, we process all the data before the last segment in the page all together (because we don't know if the last segment has more data in the next page but we do know the segments before the last one are done), flush then process the last segment. In the next page, we repeat the process - find where the last segment starts, process all the data before that and flush, then process the last segment.
For example, Say we have 3 pages:
The segments will be [1, 1, 1], [2, 2, 2, 3, 4], [5, 5], [6, 6], [7, 7, 7].
Benchmark:
Manual test (Input: 799,180,100,298 rows / 3.36 TB):
Latency increase is observed during testing which is expected. In order to enable segmented aggregation, splitting files needs to be disabled to preserve the order. As the result, much less splits are generated and it decreased the table scan concurrency drastically especially when there are a lot of big files to scan.