-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the execution support for segmented aggregation #17618
Conversation
80a46c3
to
959b1e6
Compare
eb3107e
to
f59a0fa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haven't finished reviewing the segmented aggregation logic inside HashAggregationOperator
, will first post some of the suggestions here
- We need some changes to release note to Hive Changes:
there is no segmented aggregation logic added to hive connector, for hive connector, we added theorder_based_execution_enabled
session property and thehive.order-based-execution-enabled
configuration property to disable splitting and expose the data property to leverage segmented aggregation - For refactor commit, one optional suggestion is to add comments in the class to explain the functions of different fields, and a succinct summary of logic within key functions (needsInput, addInput, getOutput, finish etc). I think it would help people understand better when you introduce extra logic for segmented aggregation in the later commit
@@ -3154,7 +3159,7 @@ private OperatorFactory createHashAggregationOperatorFactory( | |||
.map(entry -> source.getTypes().get(entry)) | |||
.collect(toImmutableList()); | |||
|
|||
if (isStreamable) { | |||
if (isStreamable && preGroupbyVariables.size() == groupbyVariables.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the size check here is not needed any more
we can pass in isStreamable
and isSegmentedAggregationEligible
from AggregationNode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check is not for segmented aggregation but for streaming aggregation right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can take a look at the definition of isStreamable
, it already includes the logic of checking the size
Or maybe you need to pull the newest code if you can't see it in your local right now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks for the pointer!
presto-main/src/test/java/com/facebook/presto/operator/OperatorAssertion.java
Outdated
Show resolved
Hide resolved
if (operatorType.equalsIgnoreCase("segmented")) { | ||
operatorFactory = createHashAggregationOperatorFactory(pagesBuilder.getHashChannel(), ImmutableList.of(VARCHAR, BIGINT), ImmutableList.of(0)); | ||
} | ||
else { | ||
operatorFactory = createHashAggregationOperatorFactory(pagesBuilder.getHashChannel(), ImmutableList.of(), ImmutableList.of()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: We can similarly create another boolean segmentedAggregation
if (!hashAggregation) {
}
else if (segmentedAggregation) {
}
else {
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, segmented aggregation essentially is still hash aggregation - just that in some cases we can flush the output when possible and rebuild the hash. The proposed code structure seems to me like they are mutually exclusive. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mine is the same logic as yours, just less nested logic, it's the same as
if streaming aggregation
else if hash aggregation
if segmented hash aggregation
if normal hash aggregation
@@ -127,7 +128,12 @@ public void setup() | |||
pages = pagesBuilder.build(); | |||
|
|||
if (hashAggregation) { | |||
operatorFactory = createHashAggregationOperatorFactory(pagesBuilder.getHashChannel()); | |||
if (operatorType.equalsIgnoreCase("segmented")) { | |||
operatorFactory = createHashAggregationOperatorFactory(pagesBuilder.getHashChannel(), ImmutableList.of(VARCHAR, BIGINT), ImmutableList.of(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For segmented aggregation, this benchmark doesn't seem right because in the createHashAggregationOperatorFactory
, we only create a HashAggregationOperatorFactory
with groupByChannels
as ImmutableList.of(0)
, in this case, it should actually do streaming aggreagtion
So I think we should create a new Benmark test where we group by at least two fields; and we can test benchmark for hash, segmented and streaming aggregation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Created one comparing hash and segmented aggregation.
@@ -702,6 +727,51 @@ public void testMask() | |||
assertEquals(outputPage.getBlock(1).getLong(0), 1L); | |||
} | |||
|
|||
@Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest put segmented aggregation's functionality and correctness tests in a separate files, similarly to streaming aggregation: https://github.com/mbasmanova/presto/blob/cc916c4af244cd04d2ef14c3ffad2942a656b431/presto-main/src/test/java/com/facebook/presto/operator/TestStreamingAggregationOperator.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamingAggregationOperator.java is a separate operator while segmented aggregation is part of HashAggregationOperator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the reason why I suggested moving to a new class is that you would have many different cases specific to segmented aggregation that we should test, and moving it to a new class would be clearer IMO
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/operator/HashAggregationOperator.java
Show resolved
Hide resolved
8378e30
to
709683a
Compare
Hey @kewang1024, sorry for the delay. I've addressed most of the comments including creating a new benchmark for hash vs segmented aggregation. For the test for functionality and correctness tests, if a separate file is needed, could you please elaborate more about what other cases should be covered? |
e0fa371
to
52f563a
Compare
What I have in mind initially is to cover the cases in Streaming Aggregation test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For SegmentedAggregation benchmark, the performance benchmark result shows segmented aggregation is much worse than hash aggregation, we need to have an investigation on what’s causing the performance downgrading
|| (finishing && currentPage == null); | ||
} | ||
|
||
private boolean isAggregationBuilderFull() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT:
can we add comments here explaining this is "partial aggregation reached memory limit" because it's also used in other places as well
I found this function really confusing and need to pick up the context each time I reviewed, it would help understanding the logic
// It would only return true when both of the followings are true:
// 1. This is a partial aggregation.
// 2. The aggregationBuilder reached memory limit.
Or another option could be changing the function name to partialAggregationReachedMemoryLimit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. It took me a while to figure out the logic behind. The new function name makes more sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the first PR:
Let's start a new PR for refactor so that we can quickly merge it, I have left all the change suggestion in comments
For the second PR:
Let's also start a new PR and keep this one for old reference.
The design where we flush by segment
- It's very costly performance wise, because it closes and rebuilds hashBuilder for each segment especially when one page has multiple segment
- It makes the logic too complicated
We're introducing too many variables which makes theHashAggregationOperator
logic more error-prone
My suggestion
- We set a threshold for segmented aggregation flushing
- We process by the minimum unit of Page (same as the current
HashAggregationOperator
behavior) - It will trigger the flush when threshold limit is hit
Let me know if you want to have VC to discuss
private void initializeAggregationBuilder() | ||
{ | ||
if (aggregationBuilder == null) { | ||
if (step.isOutputPartial() || !spillEnabled) { | ||
aggregationBuilder = new InMemoryHashAggregationBuilder( | ||
accumulatorFactories, | ||
step, | ||
expectedGroups, | ||
groupByTypes, | ||
groupByChannels, | ||
hashChannel, | ||
operatorContext, | ||
maxPartialMemory, | ||
joinCompiler, | ||
true, | ||
useSystemMemory); | ||
} | ||
else { | ||
verify(!useSystemMemory, "using system memory in spillable aggregations is not supported"); | ||
aggregationBuilder = new SpillableHashAggregationBuilder( | ||
accumulatorFactories, | ||
step, | ||
expectedGroups, | ||
groupByTypes, | ||
groupByChannels, | ||
hashChannel, | ||
operatorContext, | ||
memoryLimitForMerge, | ||
memoryLimitForMergeWithMemory, | ||
spillerFactory, | ||
joinCompiler); | ||
} | ||
// assume initial aggregationBuilder is not full | ||
} | ||
else { | ||
checkState(!aggregationBuilder.isFull(), "Aggregation buffer is full"); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: it can be rewritten to a more concise version, also we can change the name to reflect the logic
private void initializeAggregationBuilderIfNeeded()
{
if (aggregationBuilder != null) {
checkState(!aggregationBuilder.isFull(), "Aggregation buffer is full");
return;
}
if (step.isOutputPartial() || !spillEnabled) {
aggregationBuilder = new InMemoryHashAggregationBuilder(
accumulatorFactories,
step,
expectedGroups,
groupByTypes,
groupByChannels,
hashChannel,
operatorContext,
maxPartialMemory,
joinCompiler,
true,
useSystemMemory);
}
else {
verify(!useSystemMemory, "using system memory in spillable aggregations is not supported");
aggregationBuilder = new SpillableHashAggregationBuilder(
accumulatorFactories,
step,
expectedGroups,
groupByTypes,
groupByChannels,
hashChannel,
operatorContext,
memoryLimitForMerge,
memoryLimitForMergeWithMemory,
spillerFactory,
joinCompiler);
}
// assume initial aggregationBuilder is not full
}
|| (finishing && currentPage == null); | ||
} | ||
|
||
private boolean isAggregationBuilderFull() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the rename (partialAggregationReachedMemoryLimit) to the refactor commit
checkState(!finishing, "Operator is already finishing"); | ||
requireNonNull(page, "page is null"); | ||
currentPage = requireNonNull(page, "page is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never set currentPage
to null in this refactor PR, which makes the logic not correct
Also I would strongly suggest remove the currentPage in refactor PR, this is highly tight to the next implementation of segmented aggregation, and we need to rethink the current segmented aggregation design
checkState(!finishing, "Operator is already finishing"); | ||
requireNonNull(page, "page is null"); | ||
currentPage = requireNonNull(page, "page is null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never set currentPage
to null in our logic, which makes this refactor PR not correct
// Produce results if one of the following is true: | ||
// - partial aggregation reached memory limit. | ||
// - received finish() signal and there is no more input remaining to process. | ||
private boolean shouldFlush() | ||
{ | ||
return isAggregationBuilderFull() | ||
|| (finishing && currentPage == null); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: make the comment more descriptive and adjust the order
// Flush for any of the scenarios below:
// - 1. finishing: received finish() signal (no more input to come).
// - 2. If this is partial aggregation and it has reached memory limit.
private boolean shouldFlush()
{
return finishing ||
partialAggregationReachedMemoryLimit();
}
return false; | ||
} | ||
else { | ||
return unfinishedWork == null; | ||
return unfinishedWork == null && currentPage == null; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT let's also include this change to make this function easier to understand
// This operator only needs input when
// - 1. It hasn't received finish() signal (more input to come).
// - 2. Page has been processed.
// - 3. Aggregation has been processed.
// - 4. If this is partial aggregation and it hasn't reached memory limit.
@Override
public boolean needsInput()
{
return !finishing &&
unfinishedWork == null &&
outputPages == null &&
partialAggregationReachedMemoryLimit();
}
Depends on #17458
Benchmark:
Manual testing(717,977,748,003 rows / 3.02 TB):
Latency regression is observed during testing, which is expected. In order to enable segmented aggregation, splitting files needs to be disabled to preserve the order. As the result, much less splits are generated and it decreased the table scan concurrency drastically especially when there are a lot of big files to scan.