-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow rolling aggregations for window functions #8974
Conversation
61ff46b
to
1e4aec4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this patch. Added two comments though. Think of them as notes, not necessarily requests for changes.
Speaking of second commit, I think it's generally OK to proceed with this patch in current shape and schedule further refactoring to separate WF aggregates from non-WF aggregates to not pay the cost of rolling accumulators in non-rolling contexts because it'll be slightly bigger refactor.
Although, it should be weighted how much this can potentially affect network load in production systems. E.g. the extreme case of having sum(x) group by x
with x
being a column of all distinct values (cardinality = NDV). The overhead is an extra long (8B) per intermediate state, which in this degenerated case can end up being 1 long per row of an overhead.
@@ -130,7 +131,9 @@ public static GenericAccumulatorFactoryBinder generateAccumulatorFactoryBinder(A | |||
|
|||
// Generate methods | |||
generateAddInput(definition, stateField, inputChannelsField, maskChannelField, metadata.getInputMetadata(), metadata.getInputFunction(), callSiteBinder, grouped); | |||
generateAddInputWindowIndex(definition, stateField, metadata.getInputMetadata(), metadata.getInputFunction(), callSiteBinder); | |||
generateAddOrRemoveInputWindowIndex(definition, stateField, metadata.getInputMetadata(), metadata.getInputFunction(), "addInput", callSiteBinder); | |||
metadata.getRemoveInputFunction().ifPresent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in case of not having remove input function, we still should generate a function matching the implemented interface but with, let's say the only statement in the body to be throw new IllegalStateException
.
However, I didn't find anything to support the statement that your implementation may be illegal/may be prone to the undefined behavior, based on https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-4.html and https://docs.oracle.com/javase/specs/jvms/se8/html/jvms-6.html#jvms-6.5.invokeinterface
But I think a good rule of thumb is not to generate the code which cannot be generated straightforwardly by the compiler.
So, let's treat this comment as a note/warning, not necessarily an "error"/must change.
.collect(toImmutableList()); | ||
|
||
// Note that a missing removeInput function is not an error -- they are optional. | ||
checkArgument(removeInputFunctions.size() <= 1, "Ambiguous aggregation removeInput functions"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's rather checkState
, not an checkArgument
because it may happen because of the wrong content of the class not because someone passed wrong argument to the method, although I understand the transitivity of "wrong method comes from the wrong class which was passed as an argument"...
@electrum, @martint this is the patch implementing the idea I was discussing with you when I was visiting Menlo campus for the meetup in May. Please have a look, it speeds up sliding windows significantly, reducing the time complexity from O(n^2) to O(n) in some cases. |
@@ -93,8 +93,8 @@ | |||
|
|||
private static final InternalAggregationFunction LONG_AVERAGE = metadata.getFunctionRegistry().getAggregateFunctionImplementation( | |||
new Signature("avg", AGGREGATE, DOUBLE.getTypeSignature(), BIGINT.getTypeSignature())); | |||
private static final InternalAggregationFunction LONG_SUM = metadata.getFunctionRegistry().getAggregateFunctionImplementation( | |||
new Signature("sum", AGGREGATE, BIGINT.getTypeSignature(), BIGINT.getTypeSignature())); | |||
private static final InternalAggregationFunction LONG_MIN = metadata.getFunctionRegistry().getAggregateFunctionImplementation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this? Does sum no longer work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second commit in the PR changes the aggregation state for LongSumAggregation to LongLongState. The test is validating the aggregation state, and thus needs some change. It was simplest for me to change the aggregation in the test to be one with the same aggregation state that LongSumAggregation used previously.
@CombineFunction | ||
public static void combine(@AggregationState NullableDoubleState state, @AggregationState NullableDoubleState otherState) | ||
@RemoveInputFunction | ||
public static void removeInput(@AggregationState LongDoubleState state, @SqlType(StandardTypes.DOUBLE) double value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any tests for these. Could we add them to TestDoubleSumAggregation
and TestLongSumAggregation
? Or as window function queries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not at all sure that the second commit, which adds removeInput for some Sum aggregations, is a good idea. It increases the size of the aggregation state, which will hurt straightforward grouping queries that don't even use window functions. There are several other aggregations, however, that can support rolling without any additional costs: I attempted to triage the aggregations here: #8982
I think for testing, it makes sense to extend AbstractTestAggregationFunction to use removeInput() if it exists. I can look into doing this next week. Please keep in mind that I have only one week left at Teradata -- the 22nd will be my last day.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added AbstractTestAggregationFunction.testSlidingWindow in the latest version of this PR.
1e4aec4
to
a51529a
Compare
Add a test for the WindowNode use case for Accumulators. Fix a few bugs in the aggregation tests uncovered by the additional coverage. The new tests didn't uncover any product bugs.
2197cbc
to
22b3b5f
Compare
Add a removeInput() function to some Accumulators, and when it exists, use it in aggregate window functions to roll the aggregation forward incrementally. Dramatically speeds up queries such as: SELECT COUNT(quantity) OVER (ROWS BETWEEN 2000 PRECEDING AND 2000 FOLLOWING)
Implement removeInput() in some SUM aggregations, to speed up rolling window functions. This requires additional storage in the AggregationState for the input count, so that the aggregator knows when its result should become null.
22b3b5f
to
1809213
Compare
@electrum It would be great if you could take another look at this patch. I think it's good to go, with good unit test coverage of the new feature. Leaving out the changes to Sum (by dropping the last commit in the PR) would be reasonable in my opinion. Tomorrow is my last day at Teradata. |
This pull request has been automatically marked as stale because it has not had recent activity. If you'd still like this PR merged, please comment on the task, make sure you've addressed reviewer comments, and rebase on the latest master. Thank you for your contributions! |
Add a removeInput() function to some Accumulators, and when it exists, use it in aggregate window functions to roll the aggregation forward incrementally. Dramatically speeds up queries such as: SELECT COUNT(quantity) OVER (ROWS BETWEEN 2000 PRECEDING AND 2000 FOLLOWING) Extracted from: prestodb/presto#8974
Implement removeInput() in some SUM aggregations, to speed up rolling window functions. This requires additional storage in the AggregationState for the input count, so that the aggregator knows when its result should become null. Extracted from: prestodb/presto#8974
Add a removeInput() function to some Accumulators, and when it exists, use it in aggregate window functions to roll the aggregation forward incrementally. Dramatically speeds up queries such as: SELECT COUNT(quantity) OVER (ROWS BETWEEN 2000 PRECEDING AND 2000 FOLLOWING) Extracted from: prestodb/presto#8974
Implement removeInput() in some SUM aggregations, to speed up rolling window functions. This requires additional storage in the AggregationState for the input count, so that the aggregator knows when its result should become null. Extracted from: prestodb/presto#8974
Add a removeInput() function to some Accumulators, and when it exists,
use it in aggregate window functions to roll the aggregation forward
incrementally. Dramatically speeds up queries such as:
SELECT COUNT(quantity) OVER
(ROWS BETWEEN 2000 PRECEDING AND 2000 FOLLOWING)
This PR also includes a commit that adds removeInput() to some SUM
aggregations, but at the cost of increasing the size of the aggregation
state. That may not be desired, and avoiding the bad effect there would
require refactoring into more window-function-specific code paths.
In the long term, we want to be able to use different algorithms with
different state structures for rolling window aggregations as opposed
to standard grouping. For example, a rolling min() needs to hold onto
all the values seen, whereas a non-rolling min() just needs to hold onto
the minimum value seen.
See #8982 for a list of the
aggregations and what changes they would need to support rolling.