Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make partial aggregation adaptive #11011

Merged
merged 4 commits into from
Mar 11, 2022
Merged

Conversation

lukasz-stec
Copy link
Member

Description

This is an optimization for the HashAggregationOperator that is split into partial and final steps.
In case when partial aggregation step does not reduce the number of rows too much (e.g. 90 % of rows are unique) this step brings a small benefit in terms of network savings but costs a lot of CPU to do.
In this case, it would be beneficial to skip partial aggregation altogether at the planning time,
but given we don't always have reliable statistics for the number of unique values, especially in the intermediate query stages it is not easy to do.
Instead (although it's complementary to the planner changes) this adds simple runtime adaptation for the partial aggregation step, that sends raw, ungrouped rows to the final step if the ratio of unique to input rows is big enough (0.8 by default).

With this change, there is a still significant overhead on the partial step mainly in the PartitionedOutputOperator that has to handle the superfluous accumulator state for the raw rows + in the HashAggregationOperator that needs to create and populate this state.
There are potential improvements for this in both HashAggregationOperator and PartitionedOutputOperator that would limit the overhead.
Another possible approach is to have a separate pipe (as it has a different layout) from partial to final step with only the input pages without the accumulator state. This would eliminate almost all of the overhead but require larger changes in the core engine.

tpch/tpcds benchmark results for orc sf1000

part

overall ~6% TPCH and 1.5 % tpcds improvement. Most queries are not affected, some gain between 10 to 35%
adaptive-pa-part-nocode.pdf

uppart

overall 3.5% for tpch and 2.5% for tpcds
adaptive-pa-unpart-nocode.pdf

General information

Is this change a fix, improvement, new feature, refactoring, or other?

performance improvement

Is this a change to the core query engine, a connector, client library, or the SPI interfaces? (be specific)

core query engine (HashAggregationOperator)

How would you describe this change to a non-technical end user or system administrator?

Improves group by performance by skipping partial aggregation step

Related issues, pull requests, and links

Documentation

( x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.

Release notes

( ) No release notes entries required.
( x) Release notes entries required with the following suggested text:

Improve performance of GROUP BY with a large number of groups.

@cla-bot cla-bot bot added the cla-signed label Feb 10, 2022
@lukasz-stec lukasz-stec requested a review from sopel39 February 10, 2022 21:09
@lukasz-stec lukasz-stec force-pushed the ls/adaptive-pa branch 2 times, most recently from 3f52d8c to dcbae65 Compare February 11, 2022 12:36
Copy link
Member

@skrzypo987 skrzypo987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks much easier that I anticipated.

false),
booleanProperty(
ADAPTIVE_PARTIAL_AGGREGATION_ENABLED,
"When enabled, partial aggregation can be skipped on a operator level if it's not beneficial",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a->an

{
private boolean adaptivePartialAggregationEnabled = true;
private long minNumberOfRowsProcessed = 100_000;
private double uniqueRowsRatio = 0.8;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you tested different thresholds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I haven't. minRows is more or less constant per partial aggregation memory limit (PA will either compute the whole split or stop when the memory limit is reached but then the number of rows will be > 100K). So this is mainly a sanity check for some small splits.
For uniqueRowsRatio, I suspect that in tpcds/tpch most of the cases are either well below 0.8 or close to 1 so the exact number does not matter. That said I will run some benchmarks to confirm that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I additionally ran tpch/tpcds sf1000 orc part with ratio set to 0.5, 0.9, 0.95. As I expected mostly no change. most of the queries either trigger adaptation or don't consistently across ratios. There are some differences in the overall result but I think this is due to variability.
adaptive-pa-part-ratios-nocode.pdf

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value of 100K is way too large for tests (it will never fire), and I think it's good to check the adaptive part outside of unit tests but checking all possible aggregation variation with and without adaptation seems too much.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a bit fishy. For me it's ok, but I guess someone can have problems with that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an example of a general issue with trino properties in tests. Some properties like this one, are targeted at larger data scales, which means the functionality behind the property will never fire if not explicitly tested.
This is especially true for memory-related properties e.g. task.max-partial-aggregation-memory.
It seems to me, we should use scaled-down property values in "query" tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change the property here. Change property in unit tests. You can also add another child of AbstractTestAggregations with minimal PA limits.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better if all query tests were run with the lower limit and not just AbstractTestAggregations?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better if all query tests were run with the lower limit and not just AbstractTestAggregations?

no. We specifically split different queries into different abstract test classes (join, aggregation, etc) so that we don't have to cross test everything. It's a mess.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I added TestAdaptivePartialAggregation and removed the change here.

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

review comments addresed

{
private boolean adaptivePartialAggregationEnabled = true;
private long minNumberOfRowsProcessed = 100_000;
private double uniqueRowsRatio = 0.8;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I haven't. minRows is more or less constant per partial aggregation memory limit (PA will either compute the whole split or stop when the memory limit is reached but then the number of rows will be > 100K). So this is mainly a sanity check for some small splits.
For uniqueRowsRatio, I suspect that in tpcds/tpch most of the cases are either well below 0.8 or close to 1 so the exact number does not matter. That said I will run some benchmarks to confirm that.

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value of 100K is way too large for tests (it will never fire), and I think it's good to check the adaptive part outside of unit tests but checking all possible aggregation variation with and without adaptation seems too much.

Copy link
Member

@skrzypo987 skrzypo987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % comments

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a bit fishy. For me it's ok, but I guess someone can have problems with that.

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

last set of comments addressed

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an example of a general issue with trino properties in tests. Some properties like this one, are targeted at larger data scales, which means the functionality behind the property will never fire if not explicitly tested.
This is especially true for memory-related properties e.g. task.max-partial-aggregation-memory.
It seems to me, we should use scaled-down property values in "query" tests.

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some initial comments


import io.airlift.configuration.Config;

public class AdaptivePartialAggregationConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be called AggregationConfig and should have all aggregation properties from FeaturesConfig. Also, it should be different commit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added AggregationConfig and moved some properties there. PTAL if this is full list

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, this should be adaptive in both ways (on and off), not only on => off. Do you have ideas how this can be made?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is to choose one or more splits randomly to use partial aggregation once in a while (e.g. once enough rows have been processed since the last check). Depending on the results of PA, we could switch PA back on for every ongoing or new split.
This will complicate the code a lot and only help with the unusual data distribution case where first you process a lot of unique groups and the switch to splits with a small number of groups.
If we add off switch, we will also be vulnerable to ping pong cases where we do the opposite of what we should be doing because of the data distribution (i.e. we do PA for unique rows and send raw rows for very duplicated rows).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will complicate the code a lot and only help with the unusual data distribution case where first you process a lot of unique groups and the switch to splits with a small number of groups.

How do you know it's unusual?
You can turn this argument around and say we hit a bad file and we turned off PA completely while the rest of data is rather flat.

In other places we turn adaptivness on/off: DictionaryAwarePageProjectionWork#createDictionaryBlockProjection. Generally, I think it's a preferred way because prefix of query might not be representative for the remainder of the query.

At very least we need to have a TODO and a plan for that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know it's unusual?

No such case in tpch/tpcds that I know of + hard for me to come up with real-life data set that would have this.
Also, if the partial aggregation is intermediate node after partitioned exchange it seems not likely to have this kind of skew.
For the source stage, if we had split level NDV stats, we could decide per split to disable adaptation if expected number of groups is small, or even decide that we skip partial aggregation if we know the number of distinct values is big

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No such case in tpch/tpcds that I know of + hard for me to come up with real-life data set that would have this.

Tpch/Tpcds is just a benchmark, but I could imagine some data social data where some even is generating a lot of unique rows, but only 10% of time. If he hit that data at the beginning, we would turn PA even when it's efficient

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please create an issue and add a TODO (in PartialAggregationController) for enabling partial aggregation adaptively

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done #11361

*/
package io.trino.operator.aggregation.partial;

public interface PartialAggregationControl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing javadoc

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javadoc

public class AdaptivePartialAggregationConfig
{
private boolean adaptivePartialAggregationEnabled = true;
private long minRows = 100_000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems low. Preferably you should make decision after you flushed PA buffer at least once, because then you can check if PA managed to reduce anything or not.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what is actually happening (the check is after a flush). This threshold is just a failsafe in case of some strange case of very small splits (ie split with 10 rows, potentially due to partitioning or something else).
So for a normal case, the check is after a full split so moire than 1M rows or, when the PA hits the memory limit, and this should be then around 100K to 400K for the default 16M limit.

* - modify fields via synchronized {@link #onPartialAggregationFlush}.
* - read volatile {@link #partialAggregationDisabled} (volatile here gives visibility).
*/
public class ActivePartialAggregationControl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just make it class PartialAggregationController

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean class PartialAggregationController implements PartialAggregationControl or drop the interface?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the interface, drop the tracker. Just keep the class PartialAggregationController

if (!isPartialAggregationDisabled()) {
onPartialAggregationFlush(numberOfRowsProcessed, numberOfUniqueRowsProduced);
}
clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would duplicate counts during the next flush otherwise

this.uniqueRowsRatio = uniqueRowsRatio;
}

public synchronized void onPartialAggregationFlush(long rowsProcessed, long uniqueRowsProduced)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onPartialAggregationFlush -> onFlush

@Override
public void onFlush()
{
if (!isPartialAggregationDisabled()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just move this logic to onPartialAggregationFlush (make tracker dumb). onFlush is not frequent

import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatio;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;

public class PartialAggregationControlFactory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove factory and NoOpPartialAggregationControl and just make Optional<PartialAggregationController> partialAggregationControler in HashAggregationOperatorFactory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit from Optional<PartialAggregationController>?
With the current setup the code is simple in the HashAggregationOperator e.g. partialAggregationTracker.onFlush() vs partialAggregationController.ifPresent(controller -> controller.onFlush(x, y))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit from Optional?

If you use Optional then you don't have to have PartialAggregationControl or PartialAggregationControlFactory as interfaces (since there is just single implementation). There isn't really going to be another implementation and it won't be pluggable really. Hence, an interface just to have noop is an overkill.

}

@Override
public void onFlush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can just make it onFlush(int totalPositionCount, int uniquePositionCount). Make row tracking internal to HashAggregationOperator. Then you don't even need this tracker at all

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would complicate HashAggregationOperator, especially because it handles more cases than partial aggregation. Having these counts here makes it explicit that this is only relevant for partial aggregation.

Copy link
Member

@sopel39 sopel39 Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you call these methods anyway. What we do here is simple row tracking. We don't need factory, interfaces and tracker for that. It's an overkill for what is a simple increment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds 3 instead of 1 field to the class that already has ~25 fields. I don't consider this a good practice.
That said, I refactored this as requested.

@lukasz-stec lukasz-stec force-pushed the ls/adaptive-pa branch 2 times, most recently from c693891 to 52eb84d Compare February 28, 2022 15:57
Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I extracted the AggregatonConfig to a separate commit + some comments addressed, some repied.

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is to choose one or more splits randomly to use partial aggregation once in a while (e.g. once enough rows have been processed since the last check). Depending on the results of PA, we could switch PA back on for every ongoing or new split.
This will complicate the code a lot and only help with the unusual data distribution case where first you process a lot of unique groups and the switch to splits with a small number of groups.
If we add off switch, we will also be vulnerable to ping pong cases where we do the opposite of what we should be doing because of the data distribution (i.e. we do PA for unique rows and send raw rows for very duplicated rows).

@@ -70,6 +74,8 @@
private final SpillerFactory spillerFactory;
private final JoinCompiler joinCompiler;
private final BlockTypeOperators blockTypeOperators;
private final PartialAggregationControlFactory partialAggregationControlFactory;
private final PartialAggregationControl partialAggregationControl;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used 'null object pattern' here. For not partial aggregation, this is gonna be NoOpPartialAggregationControl.
Optional or null here makes code more convoluted as it needs to be handled on every access.

@@ -368,8 +386,11 @@ public void addInput(Page page)
inputProcessed = true;

if (aggregationBuilder == null) {
// TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause or DISTINCT because they are not yet implemented for spilling.
if (step.isOutputPartial() || !spillEnabled || !isSpillable()) {
if (partialAggregationTracker.isPartialAggregationDisabled()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for !step.isOutputPartial() case NoOpPartialAggregationTracker. isPartialAggregationDisabled returns false

* - modify fields via synchronized {@link #onPartialAggregationFlush}.
* - read volatile {@link #partialAggregationDisabled} (volatile here gives visibility).
*/
public class ActivePartialAggregationControl
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean class PartialAggregationController implements PartialAggregationControl or drop the interface?

if (!isPartialAggregationDisabled()) {
onPartialAggregationFlush(numberOfRowsProcessed, numberOfUniqueRowsProduced);
}
clear();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we would duplicate counts during the next flush otherwise

public class AdaptivePartialAggregationConfig
{
private boolean adaptivePartialAggregationEnabled = true;
private long minRows = 100_000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what is actually happening (the check is after a flush). This threshold is just a failsafe in case of some strange case of very small splits (ie split with 10 rows, potentially due to partitioning or something else).
So for a normal case, the check is after a full split so moire than 1M rows or, when the PA hits the memory limit, and this should be then around 100K to 400K for the default 16M limit.


import io.airlift.configuration.Config;

public class AdaptivePartialAggregationConfig
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added AggregationConfig and moved some properties there. PTAL if this is full list

import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatio;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;

public class PartialAggregationControlFactory
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit from Optional<PartialAggregationController>?
With the current setup the code is simple in the HashAggregationOperator e.g. partialAggregationTracker.onFlush() vs partialAggregationController.ifPresent(controller -> controller.onFlush(x, y))

*/
package io.trino.operator.aggregation.partial;

public interface PartialAggregationControl
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javadoc

@lukasz-stec lukasz-stec requested a review from sopel39 February 28, 2022 15:58
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;

public class AggregationConfig
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #11066 (comment). We should extract OptimizerConfig config file. I suggest you skip this extract for now since it will be part of #11066

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will complicate the code a lot and only help with the unusual data distribution case where first you process a lot of unique groups and the switch to splits with a small number of groups.

How do you know it's unusual?
You can turn this argument around and say we hit a bad file and we turned off PA completely while the rest of data is rather flat.

In other places we turn adaptivness on/off: DictionaryAwarePageProjectionWork#createDictionaryBlockProjection. Generally, I think it's a preferred way because prefix of query might not be representative for the remainder of the query.

At very least we need to have a TODO and a plan for that

aggregationConfig.getAdaptivePartialAggregationMinRows(),
false),
doubleProperty(
ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO -> ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD

false),
doubleProperty(
ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO,
"Adaptive partial aggregation unique rows ratio threshold",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expand this description, threshold for what (on, off)?

@@ -70,6 +74,8 @@
private final SpillerFactory spillerFactory;
private final JoinCompiler joinCompiler;
private final BlockTypeOperators blockTypeOperators;
private final PartialAggregationControlFactory partialAggregationControlFactory;
private final PartialAggregationControl partialAggregationControl;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional or null here makes code more convoluted as it needs to be handled on every access.

If you use Optional then you don't have to have PartialAggregationControl or PartialAggregationControlFactory as interfaces (since there is just single implementation). There isn't really going to be another implementation and it won't be pluggable really. Hence, an interface just to have noop is an overkill.

* - modify fields via synchronized {@link #onPartialAggregationFlush}.
* - read volatile {@link #partialAggregationDisabled} (volatile here gives visibility).
*/
public class ActivePartialAggregationControl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop the interface, drop the tracker. Just keep the class PartialAggregationController

}

@Override
public void onFlush()
Copy link
Member

@sopel39 sopel39 Mar 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you call these methods anyway. What we do here is simple row tracking. We don't need factory, interfaces and tracker for that. It's an overkill for what is a simple increment

import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatio;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;

public class PartialAggregationControlFactory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit from Optional?

If you use Optional then you don't have to have PartialAggregationControl or PartialAggregationControlFactory as interfaces (since there is just single implementation). There isn't really going to be another implementation and it won't be pluggable really. Hence, an interface just to have noop is an overkill.

@lukasz-stec lukasz-stec force-pushed the ls/adaptive-pa branch 2 times, most recently from 1c0b3b0 to 23b0367 Compare March 3, 2022 12:01
Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregationConfig extraction dropped, properties moved to the FeaturesConfig for now.
NoOpPartialAggregationControl refactored to Optional

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you know it's unusual?

No such case in tpch/tpcds that I know of + hard for me to come up with real-life data set that would have this.
Also, if the partial aggregation is intermediate node after partitioned exchange it seems not likely to have this kind of skew.
For the source stage, if we had split level NDV stats, we could decide per split to disable adaptation if expected number of groups is small, or even decide that we skip partial aggregation if we know the number of distinct values is big

}

@Override
public void onFlush()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds 3 instead of 1 field to the class that already has ~25 fields. I don't consider this a good practice.
That said, I refactored this as requested.

@lukasz-stec lukasz-stec requested a review from sopel39 March 3, 2022 12:04
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly lgtm % comments

@@ -149,6 +149,10 @@
private boolean allowSetViewAuthorization;

private boolean hideInaccessibleColumns;
// adaptive partial aggregation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to rebase. There is new OptimizerConfig class

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to OptimizerConfig

return adaptivePartialAggregationUniqueRowsRatioThreshold;
}

@Config("adaptive-partial-aggregation.unique-rows-ratio-threshold")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add description

return adaptivePartialAggregationMinRows;
}

@Config("adaptive-partial-aggregation.min-rows")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add description

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No such case in tpch/tpcds that I know of + hard for me to come up with real-life data set that would have this.

Tpch/Tpcds is just a benchmark, but I could imagine some data social data where some even is generating a lot of unique rows, but only 10% of time. If he hit that data at the beginning, we would turn PA even when it's efficient

import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatioThreshold;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;

public class PartialAggregationControllerFactory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need a controller factory. Just create PartialAggregationController in LocalExecutionPlanner.Visitor#createHashAggregationOperatorFactory.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the factory, added io.trino.operator.aggregation.partial.PartialAggregationController#duplicate

return outputBuilders;
}

private Block[] constructOutputPage(BlockBuilder[] outputBuilders)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constructOutputPage should return Page

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change the property here. Change property in unit tests. You can also add another child of AbstractTestAggregations with minimal PA limits.

.addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on
.build();
List<Page> operator1Expected = rowPagesBuilder(BIGINT, BIGINT)
.addSequencePage(10, 0, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this intermediate page has 10 positions? Could we use aggregation and input data that would actually squash input positions (e.g. all rows belong to same group)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand but the reason we need (almost) unique groups is to trigger adaptation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so can we use 9 unique rows out of 10 rows? Or maybe you change ration from 0.8 into 0.5? The reason is the I would like to see that aggregation actually happens before it's disabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored to 9 out of 10 unique (0, 1, 2, 3, 4, 5, 6, 7, 8, 8)

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
new PartialAggregationControllerFactory(true, 5, 0.8));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a test (or change this one) that there needs to be at least one flush before PA is disabled (E.g. low min row count, and flush happening after second input page.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added separate test case as this requires using HashAggregationOperatorFactory with a different maxPartialMemory limit

@sopel39
Copy link
Member

sopel39 commented Mar 4, 2022

please rebase due to conflicts

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebased on the master and moved config properties to OptimizerConfig + other comments addressed

@@ -149,6 +149,10 @@
private boolean allowSetViewAuthorization;

private boolean hideInaccessibleColumns;
// adaptive partial aggregation
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to OptimizerConfig

@@ -368,8 +387,11 @@ public void addInput(Page page)
inputProcessed = true;

if (aggregationBuilder == null) {
// TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause or DISTINCT because they are not yet implemented for spilling.
if (step.isOutputPartial() || !spillEnabled || !isSpillable()) {
if (partialAggregationController.map(PartialAggregationController::isPartialAggregationDisabled).orElse(false)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to have if (step.isOutputPartial()) because if influences other branches/ifs below, but I added step.isOutputPartial() to the condition to make it clear that this works only for partial aggregation.
I added partialAggregationController.isPresent() vs step.isOutputPartial() to the constructor (we dont need to check it for every addInput)

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better if all query tests were run with the lower limit and not just AbstractTestAggregations?

.addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on
.build();
List<Page> operator1Expected = rowPagesBuilder(BIGINT, BIGINT)
.addSequencePage(10, 0, 0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I understand but the reason we need (almost) unique groups is to trigger adaptation.

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
new PartialAggregationControllerFactory(true, 5, 0.8));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added separate test case as this requires using HashAggregationOperatorFactory with a different maxPartialMemory limit

import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatioThreshold;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;

public class PartialAggregationControllerFactory
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the factory, added io.trino.operator.aggregation.partial.PartialAggregationController#duplicate

@lukasz-stec lukasz-stec requested a review from sopel39 March 7, 2022 11:38
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly lgtm

}

@Config("adaptive-partial-aggregation.min-rows")
@ConfigDescription("Minimum number of processed rows before partial aggregation can be skipped")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use descriptions from session properties (make these consistent)

for (int i = 0; i < groupByChannels.size(); i++) {
hashChannels[i] = groupByChannels.get(i);
}
inputHashChannel.ifPresent(inputHash -> hashChannels[groupByChannels.size()] = inputHash);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inputHash -> channelIndex

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't it be better if all query tests were run with the lower limit and not just AbstractTestAggregations?

no. We specifically split different queries into different abstract test classes (join, aggregation, etc) so that we don't have to cross test everything. It's a mess.

.addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on
.build();
List<Page> operator1Expected = rowPagesBuilder(BIGINT, BIGINT)
.addSequencePage(10, 0, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so can we use 9 unique rows out of 10 rows? Or maybe you change ration from 0.8 into 0.5? The reason is the I would like to see that aggregation actually happens before it's disabled

assertOutputEquals(operatorFactory, operator2Input, operator2Expected);
}

private void assertOutputEquals(OperatorFactory operatorFactory, List<Page> input, List<Page> expectedPages)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use one of io.trino.operator.OperatorAssertion#assertOperatorEquals instead of creating a new method (see TestHashJoinOperator)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperatorAssertion#assertOperatorEquals closes the factory and in this case I need the factory to be reused but I extended assertOperatorEquals with boolean closeOperatorFactory so it works now.

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
Optional.of(new PartialAggregationController(5, 0.8)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add assertion on PartialAggregationController#isPartialAggregationDisabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
Optional.of(new PartialAggregationController(5, 0.8)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add assertion on PartialAggregationController#isPartialAggregationDisabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Member Author

@lukasz-stec lukasz-stec left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comments addressed + added a commit to skip types-check in TestShowQueries (tests failed because of new properties with longer names were added)

false),
longProperty(
ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS,
"Minimum number of processed rows before partial aggregation can be skipped",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done #11361

@@ -279,6 +295,8 @@ public OperatorFactory duplicate()

// for yield when memory is not available
private Work<?> unfinishedWork;
private long numberOfRowsProcessed;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed

{
private final LocalMemoryContext memoryContext;
private final List<GroupedAggregator> groupedAggregators;
private volatile Page currentPage;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -59,6 +60,7 @@ public static SessionBuilder testSessionBuilder(SessionPropertyManager sessionPr
.setTimeZoneKey(DEFAULT_TIME_ZONE_KEY)
.setLocale(ENGLISH)
.setRemoteUserAddress("address")
.setUserAgent("agent");
.setUserAgent("agent")
.setSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, "1000");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I added TestAdaptivePartialAggregation and removed the change here.

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
Optional.of(new PartialAggregationController(5, 0.8)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

.addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on
.build();
List<Page> operator1Expected = rowPagesBuilder(BIGINT, BIGINT)
.addSequencePage(10, 0, 0)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored to 9 out of 10 unique (0, 1, 2, 3, 4, 5, 6, 7, 8, 8)

joinCompiler,
blockTypeOperators,
// use 5 rows threshold to trigger adaptive partial aggregation after each page flush
Optional.of(new PartialAggregationController(5, 0.8)));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

assertOutputEquals(operatorFactory, operator2Input, operator2Expected);
}

private void assertOutputEquals(OperatorFactory operatorFactory, List<Page> input, List<Page> expectedPages)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OperatorAssertion#assertOperatorEquals closes the factory and in this case I need the factory to be reused but I extended assertOperatorEquals with boolean closeOperatorFactory so it works now.

@lukasz-stec lukasz-stec requested a review from sopel39 March 8, 2022 13:55
Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % comments

@@ -20,6 +20,11 @@
{
private final T result;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark result as @Nullable

requireNonNull(step, "step is null");
requireNonNull(aggregatorFactories, "aggregatorFactories is null");
requireNonNull(operatorContext, "operatorContext is null");

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: restore newline

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restored after the checkArgument

.map(PartialAggregationController::isPartialAggregationDisabled)
.orElse(false);
if (step.isOutputPartial() && partialAggregationDisabled) {
aggregationBuilder = new SkipAggregationBuilder(groupByChannels, hashChannel, aggregatorFactories, memoryContext);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

future improvement. It would be great to actually collect metrics

  • how many pages were processed via skip aggregation builder
  • how many flushes there were for PA.
  • what average row count per flush
  • etc..

This can be returned via io.trino.operator.OperatorContext#setLatestMetrics

@lukasz-stec Maybe create an issue for that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. This would allow easier monitoring of the adaptation.
#11376 created.

return outputBuilders;
}

private Page constructOutputPage(Page page, BlockBuilder[] outputBuilders)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically, your output page might become quite large since we add more columns. However, this probably isn't a big issue.
In io.trino.operator.project.PageProcessor we avoid this by keeping page under 4MB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I guess this currently can increase page size easily 10x by adding the accumulator state (given enough aggregations).
Without much overhead, we could use Page.getRegion to partition the output page into smaller pages but that would retain the full page until all pages are processed. WDYT?

.addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on
.build();
List<Page> operator1Expected = rowPagesBuilder(BIGINT, BIGINT)
.addBlocksPage(createLongsBlock(0, 1, 2, 3, 4, 5, 6, 7, 8), createLongsBlock(0, 1, 2, 3, 4, 5, 6, 7, 8)) // tha last position was aggregated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: tha -> the

OperatorAssertion.assertOperatorEquals(operatorFactory, driverContext, input, expected, false, false);
}

private MaterializedResult processInput(Operator operator, List<Page> input, MaterializedResult expected)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused, remove

{
return TpchQueryRunnerBuilder.builder()
.setExtraProperties(ImmutableMap.of(
"adaptive-partial-aggregation.min-rows", "1000"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline with previous line

set it to 0?

you should also reduce PA buffer to 0 so that flush happens after every page

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, min-rows=0 makes sense.
I would leave task.max-partial-aggregation-memory with default as flushing per page is not a realistic scenario and there should be enough splits in the test data for flush per split to use adaptation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and there should be enough splits in the test data for flush per split to use adaptation.

I'm not so sure since we use tiny schema. I would rather reduce it to make sure it triggers in most cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I added task.max-partial-aggregation-memory=0B

* It passes the input pages, augmented with initial accumulator state to the output.
* It can only be used at the partial aggregation step as it relies on rows be aggregated at the final step.
*/
public class SkipAggregationBuilder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this produce pages with a schema that matches the one defined by the query plan?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This produces a "projected" input page by selecting group by channels + optionally hash channel + aggregation accumulators state channels, which matches aggregation partial step output schema.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand how the input data is passed to the final aggregation, then. Specifically, the comment above that says: "It passes the input pages, augmented with initial accumulator state to the output". If that's the case, then the columns from the input to the aggregation would not match the expected input of the final aggregation (according to the query plan)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment may be not precise enough. What is happening is we take the group by (or hash) channels from the input and add the accumulator state. This is done in the constructOutputPage method.
BlockBuilder[] outputBuilders contain accumaltor state.

private Page constructOutputPage(Page page, BlockBuilder[] outputBuilders)
{
        Block[] outputBlocks = new Block[hashChannels.length + outputBuilders.length];
        for (int i = 0; i < hashChannels.length; i++) {
            outputBlocks[i] = page.getBlock(hashChannels[i]);
        }
        for (int i = 0; i < outputBuilders.length; i++) {
            outputBlocks[hashChannels.length + i] = outputBuilders[i].build();
        }
        return new Page(page.getPositionCount(), outputBlocks);
    }

Copy link
Member

@sopel39 sopel39 Mar 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martint
With this PR partial aggregation will still produce intermediate rows when it's turned off. We haven't implemented dual representation (raw rows vs intermediate rows). Approach as in this PR is simpler and will be improved.
For example, @radek-starburst is working on splitting single decimal aggregation state into smaller, primitive states. When this is done, we would be able to essentially passthrough input decimals when PA is adaptively disabled without any extra CPU cost. For example, for sum decimal aggregation overflow can be represented as RLE block.
@lukasz-stec is working on sending RLE blocks via partitioned exchange, so that intermediate aggregation rows can be serialized and transmitted over network more efficiently.

In case when partial aggregation
operator does not reduce the number
of rows too much, disable aggregation
and send raw rows to the final step
@sopel39 sopel39 merged commit c950e30 into trinodb:master Mar 11, 2022
@sopel39 sopel39 mentioned this pull request Mar 11, 2022
@sopel39
Copy link
Member

sopel39 commented Mar 11, 2022

thanks!

@github-actions github-actions bot added this to the 374 milestone Mar 11, 2022
prithvip added a commit to prithvip/presto that referenced this pull request Nov 27, 2023
When partial aggregation is not effectively reducing cardinality, instead send
raw input rows directly to the final aggregation step.

Port of:
github.com/trinodb/trino/pull/11011
github.com/trinodb/trino/pull/17143

Co-authored-by: Lukasz Stec <lukasz.s.stec@gmail.com>
Co-authored-by: Karol Sobczak <sopel39@users.noreply.github.com>
prithvip added a commit to prestodb/presto that referenced this pull request Nov 27, 2023
When partial aggregation is not effectively reducing cardinality, instead send
raw input rows directly to the final aggregation step.

Port of:
github.com/trinodb/trino/pull/11011
github.com/trinodb/trino/pull/17143

Co-authored-by: Lukasz Stec <lukasz.s.stec@gmail.com>
Co-authored-by: Karol Sobczak <sopel39@users.noreply.github.com>
mlyublena pushed a commit to prestodb/presto that referenced this pull request Nov 28, 2023
When partial aggregation is not effectively reducing cardinality, instead send
raw input rows directly to the final aggregation step.

Port of:
github.com/trinodb/trino/pull/11011
github.com/trinodb/trino/pull/17143

Co-authored-by: Lukasz Stec <lukasz.s.stec@gmail.com>
Co-authored-by: Karol Sobczak <sopel39@users.noreply.github.com>
kaikalur pushed a commit to kaikalur/presto that referenced this pull request Mar 14, 2024
When partial aggregation is not effectively reducing cardinality, instead send
raw input rows directly to the final aggregation step.

Port of:
github.com/trinodb/trino/pull/11011
github.com/trinodb/trino/pull/17143

Co-authored-by: Lukasz Stec <lukasz.s.stec@gmail.com>
Co-authored-by: Karol Sobczak <sopel39@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

4 participants