Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow rolling aggregations for window functions #464

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
*/
package io.prestosql.operator.aggregation;

import io.prestosql.operator.aggregation.state.NullableDoubleState;
import io.prestosql.operator.aggregation.minmaxby.LongDoubleState;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.function.AggregationFunction;
import io.prestosql.spi.function.AggregationState;
import io.prestosql.spi.function.CombineFunction;
import io.prestosql.spi.function.InputFunction;
import io.prestosql.spi.function.OutputFunction;
import io.prestosql.spi.function.RemoveInputFunction;
import io.prestosql.spi.function.SqlType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.StandardTypes;
Expand All @@ -30,27 +31,34 @@ public final class DoubleSumAggregation
private DoubleSumAggregation() {}

@InputFunction
public static void sum(@AggregationState NullableDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
public static void sum(@AggregationState LongDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
{
state.setNull(false);
state.setDouble(state.getDouble() + value);
state.setFirst(state.getFirst() + 1);
state.setSecond(state.getSecond() + value);
}

@CombineFunction
public static void combine(@AggregationState NullableDoubleState state, @AggregationState NullableDoubleState otherState)
@RemoveInputFunction
public static void removeInput(@AggregationState LongDoubleState state, @SqlType(StandardTypes.DOUBLE) double value)
{
if (state.isNull()) {
state.setNull(false);
state.setDouble(otherState.getDouble());
return;
}
state.setFirst(state.getFirst() - 1);
state.setSecond(state.getSecond() - value);
}

state.setDouble(state.getDouble() + otherState.getDouble());
@CombineFunction
public static void combine(@AggregationState LongDoubleState state, @AggregationState LongDoubleState otherState)
{
state.setFirst(state.getFirst() + otherState.getFirst());
state.setSecond(state.getSecond() + otherState.getSecond());
}

@OutputFunction(StandardTypes.DOUBLE)
public static void output(@AggregationState NullableDoubleState state, BlockBuilder out)
public static void output(@AggregationState LongDoubleState state, BlockBuilder out)
{
NullableDoubleState.write(DoubleType.DOUBLE, state, out);
if (state.getFirst() == 0) {
out.appendNull();
}
else {
DoubleType.DOUBLE.writeDouble(out, state.getSecond());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
*/
package io.prestosql.operator.aggregation;

import io.prestosql.operator.aggregation.state.NullableLongState;
import io.prestosql.operator.aggregation.minmaxby.LongLongState;
import io.prestosql.spi.block.BlockBuilder;
import io.prestosql.spi.function.AggregationFunction;
import io.prestosql.spi.function.AggregationState;
import io.prestosql.spi.function.CombineFunction;
import io.prestosql.spi.function.InputFunction;
import io.prestosql.spi.function.OutputFunction;
import io.prestosql.spi.function.RemoveInputFunction;
import io.prestosql.spi.function.SqlType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.StandardTypes;
Expand All @@ -31,27 +32,34 @@ public final class LongSumAggregation
private LongSumAggregation() {}

@InputFunction
public static void sum(@AggregationState NullableLongState state, @SqlType(StandardTypes.BIGINT) long value)
public static void sum(@AggregationState LongLongState state, @SqlType(StandardTypes.BIGINT) long value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this actually causes significant perf regression as long long stats is much bigger to serialize, cc @martint @dain

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, and just filed an issue to showcase the problem #18818

{
state.setNull(false);
state.setLong(BigintOperators.add(state.getLong(), value));
state.setFirst(state.getFirst() + 1);
state.setSecond(BigintOperators.add(state.getSecond(), value));
}

@CombineFunction
public static void combine(@AggregationState NullableLongState state, @AggregationState NullableLongState otherState)
@RemoveInputFunction
public static void removeInput(@AggregationState LongLongState state, @SqlType(StandardTypes.BIGINT) long value)
{
if (state.isNull()) {
state.setNull(false);
state.setLong(otherState.getLong());
return;
}
state.setFirst(state.getFirst() - 1);
state.setSecond(BigintOperators.subtract(state.getSecond(), value));
}

state.setLong(BigintOperators.add(state.getLong(), otherState.getLong()));
@CombineFunction
public static void combine(@AggregationState LongLongState state, @AggregationState LongLongState otherState)
{
state.setFirst(state.getFirst() + otherState.getFirst());
state.setSecond(BigintOperators.add(state.getSecond(), otherState.getSecond()));
}

@OutputFunction(StandardTypes.BIGINT)
public static void output(@AggregationState NullableLongState state, BlockBuilder out)
public static void output(@AggregationState LongLongState state, BlockBuilder out)
{
NullableLongState.write(BigintType.BIGINT, state, out);
if (state.getFirst() == 0) {
out.appendNull();
}
else {
BigintType.BIGINT.writeLong(out, state.getSecond());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ public class TestHashAggregationOperator
new Signature("sum", AGGREGATE, BIGINT.getTypeSignature(), BIGINT.getTypeSignature()));
private static final InternalAggregationFunction COUNT = metadata.getAggregateFunctionImplementation(
new Signature("count", AGGREGATE, BIGINT.getTypeSignature()));
private static final InternalAggregationFunction LONG_MIN = metadata.getAggregateFunctionImplementation(
new Signature("min", AGGREGATE, BIGINT.getTypeSignature(), BIGINT.getTypeSignature()));

private static final int MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024;

Expand Down Expand Up @@ -240,7 +242,7 @@ public void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEna
Step.SINGLE,
true,
ImmutableList.of(COUNT.bind(ImmutableList.of(0), Optional.empty()),
LONG_SUM.bind(ImmutableList.of(4), Optional.empty()),
LONG_MIN.bind(ImmutableList.of(4), Optional.empty()),
LONG_AVERAGE.bind(ImmutableList.of(4), Optional.empty()),
maxVarcharColumn.bind(ImmutableList.of(2), Optional.empty()),
countVarcharColumn.bind(ImmutableList.of(0), Optional.empty()),
Expand Down Expand Up @@ -336,7 +338,7 @@ public void testMemoryLimit(boolean hashEnabled)
ImmutableList.of(),
Step.SINGLE,
ImmutableList.of(COUNT.bind(ImmutableList.of(0), Optional.empty()),
LONG_SUM.bind(ImmutableList.of(3), Optional.empty()),
LONG_MIN.bind(ImmutableList.of(3), Optional.empty()),
LONG_AVERAGE.bind(ImmutableList.of(3), Optional.empty()),
maxVarcharColumn.bind(ImmutableList.of(2), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Expand Down Expand Up @@ -517,7 +519,7 @@ public void testMultiplePartialFlushes(boolean hashEnabled)
hashChannels,
ImmutableList.of(),
Step.PARTIAL,
ImmutableList.of(LONG_SUM.bind(ImmutableList.of(0), Optional.empty())),
ImmutableList.of(LONG_MIN.bind(ImmutableList.of(0), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
Expand Down Expand Up @@ -599,7 +601,7 @@ public void testMergeWithMemorySpill()
ImmutableList.of(),
Step.SINGLE,
false,
ImmutableList.of(LONG_SUM.bind(ImmutableList.of(0), Optional.empty())),
ImmutableList.of(LONG_MIN.bind(ImmutableList.of(0), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Optional.empty(),
1,
Expand Down Expand Up @@ -653,7 +655,7 @@ public void testSpillerFailure()
Step.SINGLE,
false,
ImmutableList.of(COUNT.bind(ImmutableList.of(0), Optional.empty()),
LONG_SUM.bind(ImmutableList.of(3), Optional.empty()),
LONG_MIN.bind(ImmutableList.of(3), Optional.empty()),
LONG_AVERAGE.bind(ImmutableList.of(3), Optional.empty()),
maxVarcharColumn.bind(ImmutableList.of(2), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Expand Down Expand Up @@ -700,7 +702,7 @@ private void testMemoryTracking(boolean useSystemMemory)
hashChannels,
ImmutableList.of(),
Step.SINGLE,
ImmutableList.of(LONG_SUM.bind(ImmutableList.of(0), Optional.empty())),
ImmutableList.of(LONG_MIN.bind(ImmutableList.of(0), Optional.empty())),
rowPagesBuilder.getHashChannel(),
Optional.empty(),
100_000,
Expand Down