diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index ca86dc07c8a9..ae71e08703c3 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -44,6 +44,7 @@ import static io.trino.spi.session.PropertyMetadata.doubleProperty; import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; +import static io.trino.spi.session.PropertyMetadata.longProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; import static io.trino.spi.type.IntegerType.INTEGER; import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; @@ -156,6 +157,9 @@ public final class SystemSessionProperties public static final String FAULT_TOLERANT_EXECUTION_TARGET_TASK_SPLIT_COUNT = "fault_tolerant_execution_target_task_split_count"; public static final String FAULT_TOLERANT_EXECUTION_MAX_TASK_SPLIT_COUNT = "fault_tolerant_execution_max_task_split_count"; public static final String FAULT_TOLERANT_EXECUTION_TASK_MEMORY = "fault_tolerant_execution_task_memory"; + public static final String ADAPTIVE_PARTIAL_AGGREGATION_ENABLED = "adaptive_partial_aggregation_enabled"; + public static final String ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS = "adaptive_partial_aggregation_min_rows"; + public static final String ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD = "adaptive_partial_aggregation_unique_rows_ratio_threshold"; private final List> sessionProperties; @@ -739,6 +743,21 @@ public SystemSessionProperties( FAULT_TOLERANT_EXECUTION_TASK_MEMORY, "Estimated amount of memory a single task will use when task level retries are used; value is used allocating nodes for tasks execution", memoryManagerConfig.getFaultTolerantTaskMemory(), + false), + booleanProperty( + ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, + "When enabled, partial aggregation might be adaptively turned off when it does not provide any performance gain", + optimizerConfig.isAdaptivePartialAggregationEnabled(), + false), + longProperty( + ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, + "Minimum number of processed rows before partial aggregation might be adaptively turned off", + optimizerConfig.getAdaptivePartialAggregationMinRows(), + false), + doubleProperty( + ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD, + "Ratio between aggregation output and input rows above which partial aggregation might be adaptively turned off", + optimizerConfig.getAdaptivePartialAggregationUniqueRowsRatioThreshold(), false)); } @@ -1330,4 +1349,19 @@ public static DataSize getFaultTolerantExecutionDefaultTaskMemory(Session sessio { return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_TASK_MEMORY, DataSize.class); } + + public static boolean isAdaptivePartialAggregationEnabled(Session session) + { + return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_ENABLED, Boolean.class); + } + + public static long getAdaptivePartialAggregationMinRows(Session session) + { + return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_MIN_ROWS, Long.class); + } + + public static double getAdaptivePartialAggregationUniqueRowsRatioThreshold(Session session) + { + return session.getSystemProperty(ADAPTIVE_PARTIAL_AGGREGATION_UNIQUE_ROWS_RATIO_THRESHOLD, Double.class); + } } diff --git a/core/trino-main/src/main/java/io/trino/operator/CompletedWork.java b/core/trino-main/src/main/java/io/trino/operator/CompletedWork.java index 476584c08c6c..fd1e603b8e57 100644 --- a/core/trino-main/src/main/java/io/trino/operator/CompletedWork.java +++ b/core/trino-main/src/main/java/io/trino/operator/CompletedWork.java @@ -13,13 +13,21 @@ */ package io.trino.operator; +import javax.annotation.Nullable; + import static java.util.Objects.requireNonNull; public final class CompletedWork implements Work { + @Nullable private final T result; + public CompletedWork() + { + this.result = null; + } + public CompletedWork(T value) { this.result = requireNonNull(value); @@ -31,6 +39,7 @@ public boolean process() return true; } + @Nullable @Override public T getResult() { diff --git a/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java b/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java index a5f6eb4fe8d3..a500a160e102 100644 --- a/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/HashAggregationOperator.java @@ -22,6 +22,8 @@ import io.trino.operator.aggregation.builder.HashAggregationBuilder; import io.trino.operator.aggregation.builder.InMemoryHashAggregationBuilder; import io.trino.operator.aggregation.builder.SpillableHashAggregationBuilder; +import io.trino.operator.aggregation.partial.PartialAggregationController; +import io.trino.operator.aggregation.partial.SkipAggregationBuilder; import io.trino.operator.scalar.CombineHashFunction; import io.trino.spi.Page; import io.trino.spi.PageBuilder; @@ -36,6 +38,7 @@ import java.util.List; import java.util.Optional; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.operator.aggregation.builder.InMemoryHashAggregationBuilder.toTypes; @@ -70,6 +73,7 @@ public static class HashAggregationOperatorFactory private final SpillerFactory spillerFactory; private final JoinCompiler joinCompiler; private final BlockTypeOperators blockTypeOperators; + private final Optional partialAggregationController; private boolean closed; @@ -87,7 +91,8 @@ public HashAggregationOperatorFactory( int expectedGroups, Optional maxPartialMemory, JoinCompiler joinCompiler, - BlockTypeOperators blockTypeOperators) + BlockTypeOperators blockTypeOperators, + Optional partialAggregationController) { this(operatorId, planNodeId, @@ -108,7 +113,8 @@ public HashAggregationOperatorFactory( throw new UnsupportedOperationException(); }, joinCompiler, - blockTypeOperators); + blockTypeOperators, + partialAggregationController); } public HashAggregationOperatorFactory( @@ -128,7 +134,8 @@ public HashAggregationOperatorFactory( DataSize unspillMemoryLimit, SpillerFactory spillerFactory, JoinCompiler joinCompiler, - BlockTypeOperators blockTypeOperators) + BlockTypeOperators blockTypeOperators, + Optional partialAggregationController) { this(operatorId, planNodeId, @@ -147,7 +154,8 @@ public HashAggregationOperatorFactory( DataSize.succinctBytes((long) (unspillMemoryLimit.toBytes() * MERGE_WITH_MEMORY_RATIO)), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + partialAggregationController); } @VisibleForTesting @@ -169,7 +177,8 @@ public HashAggregationOperatorFactory( DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, JoinCompiler joinCompiler, - BlockTypeOperators blockTypeOperators) + BlockTypeOperators blockTypeOperators, + Optional partialAggregationController) { this.operatorId = operatorId; this.planNodeId = requireNonNull(planNodeId, "planNodeId is null"); @@ -189,6 +198,7 @@ public HashAggregationOperatorFactory( this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null"); this.joinCompiler = requireNonNull(joinCompiler, "joinCompiler is null"); this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null"); + this.partialAggregationController = requireNonNull(partialAggregationController, "partialAggregationController is null"); } @Override @@ -214,7 +224,8 @@ public Operator createOperator(DriverContext driverContext) memoryLimitForMergeWithMemory, spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + partialAggregationController); return hashAggregationOperator; } @@ -245,11 +256,13 @@ public OperatorFactory duplicate() memoryLimitForMergeWithMemory, spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + partialAggregationController.map(PartialAggregationController::duplicate)); } } private final OperatorContext operatorContext; + private final Optional partialAggregationController; private final List groupByTypes; private final List groupByChannels; private final List globalAggregationGroupIds; @@ -279,6 +292,8 @@ public OperatorFactory duplicate() // for yield when memory is not available private Work unfinishedWork; + private long numberOfInputRowsProcessed; + private long numberOfUniqueRowsProduced; private HashAggregationOperator( OperatorContext operatorContext, @@ -297,12 +312,15 @@ private HashAggregationOperator( DataSize memoryLimitForMergeWithMemory, SpillerFactory spillerFactory, JoinCompiler joinCompiler, - BlockTypeOperators blockTypeOperators) + BlockTypeOperators blockTypeOperators, + Optional partialAggregationController) { this.operatorContext = requireNonNull(operatorContext, "operatorContext is null"); + this.partialAggregationController = requireNonNull(partialAggregationController, "partialAggregationControl is null"); requireNonNull(step, "step is null"); requireNonNull(aggregatorFactories, "aggregatorFactories is null"); requireNonNull(operatorContext, "operatorContext is null"); + checkArgument(partialAggregationController.isEmpty() || step.isOutputPartial(), "partialAggregationController should be present only for partial aggregation"); this.groupByTypes = ImmutableList.copyOf(groupByTypes); this.groupByChannels = ImmutableList.copyOf(groupByChannels); @@ -368,8 +386,14 @@ public void addInput(Page page) inputProcessed = true; if (aggregationBuilder == null) { - // TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause or DISTINCT because they are not yet implemented for spilling. - if (step.isOutputPartial() || !spillEnabled || !isSpillable()) { + boolean partialAggregationDisabled = partialAggregationController + .map(PartialAggregationController::isPartialAggregationDisabled) + .orElse(false); + if (step.isOutputPartial() && partialAggregationDisabled) { + aggregationBuilder = new SkipAggregationBuilder(groupByChannels, hashChannel, aggregatorFactories, memoryContext); + } + else if (step.isOutputPartial() || !spillEnabled || !isSpillable()) { + // TODO: We ignore spillEnabled here if any aggregate has ORDER BY clause or DISTINCT because they are not yet implemented for spilling. aggregationBuilder = new InMemoryHashAggregationBuilder( aggregatorFactories, step, @@ -418,6 +442,7 @@ public void addInput(Page page) unfinishedWork = null; } aggregationBuilder.updateMemory(); + numberOfInputRowsProcessed += page.getPositionCount(); } private boolean isSpillable() @@ -490,7 +515,9 @@ public Page getOutput() return null; } - return outputPages.getResult(); + Page result = outputPages.getResult(); + numberOfUniqueRowsProduced += result.getPositionCount(); + return result; } @Override @@ -516,6 +543,10 @@ private void closeAggregationBuilder() aggregationBuilder = null; } memoryContext.setBytes(0); + partialAggregationController.ifPresent( + controller -> controller.onFlush(numberOfInputRowsProcessed, numberOfUniqueRowsProduced)); + numberOfInputRowsProcessed = 0; + numberOfUniqueRowsProduced = 0; } private Page getGlobalAggregationOutput() diff --git a/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationController.java b/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationController.java new file mode 100644 index 000000000000..4408111feabb --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/PartialAggregationController.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.aggregation.partial; + +import io.trino.operator.HashAggregationOperator; + +/** + * Controls whenever partial aggregation is enabled across all {@link HashAggregationOperator}s + * for a particular plan node on a single node. + * Partial aggregation is disabled once enough rows has been processed ({@link #minNumberOfRowsProcessed}) + * and the ratio between output(unique) and input rows is too high (> {@link #uniqueRowsRatioThreshold}). + * TODO https://github.com/trinodb/trino/issues/11361 add support to adaptively re-enable partial aggregation. + *

+ * The class is thread safe and objects of this class are used potentially by multiple threads/drivers simultaneously. + * Different threads either: + * - modify fields via synchronized {@link #onFlush}. + * - read volatile {@link #partialAggregationDisabled} (volatile here gives visibility). + */ +public class PartialAggregationController +{ + private final long minNumberOfRowsProcessed; + private final double uniqueRowsRatioThreshold; + + private volatile boolean partialAggregationDisabled; + private long totalRowProcessed; + private long totalUniqueRowsProduced; + + public PartialAggregationController(long minNumberOfRowsProcessedToDisable, double uniqueRowsRatioThreshold) + { + this.minNumberOfRowsProcessed = minNumberOfRowsProcessedToDisable; + this.uniqueRowsRatioThreshold = uniqueRowsRatioThreshold; + } + + public boolean isPartialAggregationDisabled() + { + return partialAggregationDisabled; + } + + public synchronized void onFlush(long rowsProcessed, long uniqueRowsProduced) + { + if (partialAggregationDisabled) { + return; + } + + totalRowProcessed += rowsProcessed; + totalUniqueRowsProduced += uniqueRowsProduced; + if (shouldDisablePartialAggregation()) { + partialAggregationDisabled = true; + } + } + + private boolean shouldDisablePartialAggregation() + { + return totalRowProcessed >= minNumberOfRowsProcessed + && ((double) totalUniqueRowsProduced / totalRowProcessed) > uniqueRowsRatioThreshold; + } + + public PartialAggregationController duplicate() + { + return new PartialAggregationController(minNumberOfRowsProcessed, uniqueRowsRatioThreshold); + } +} diff --git a/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/SkipAggregationBuilder.java b/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/SkipAggregationBuilder.java new file mode 100644 index 000000000000..3e9f00b9ad8d --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/aggregation/partial/SkipAggregationBuilder.java @@ -0,0 +1,190 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.aggregation.partial; + +import com.google.common.util.concurrent.ListenableFuture; +import io.trino.memory.context.LocalMemoryContext; +import io.trino.operator.CompletedWork; +import io.trino.operator.GroupByIdBlock; +import io.trino.operator.HashCollisionsCounter; +import io.trino.operator.Work; +import io.trino.operator.WorkProcessor; +import io.trino.operator.aggregation.AggregatorFactory; +import io.trino.operator.aggregation.GroupedAggregator; +import io.trino.operator.aggregation.builder.HashAggregationBuilder; +import io.trino.spi.Page; +import io.trino.spi.block.Block; +import io.trino.spi.block.BlockBuilder; +import io.trino.spi.block.LongArrayBlock; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +/** + * {@link HashAggregationBuilder} that does not aggregate input rows at all. + * It passes the input pages, augmented with initial accumulator state to the output. + * It can only be used at the partial aggregation step as it relies on rows be aggregated at the final step. + */ +public class SkipAggregationBuilder + implements HashAggregationBuilder +{ + private final LocalMemoryContext memoryContext; + private final List groupedAggregators; + @Nullable + private Page currentPage; + private final int[] hashChannels; + + public SkipAggregationBuilder( + List groupByChannels, + Optional inputHashChannel, + List aggregatorFactories, + LocalMemoryContext memoryContext) + { + this.memoryContext = requireNonNull(memoryContext, "memoryContext is null"); + this.groupedAggregators = requireNonNull(aggregatorFactories, "aggregatorFactories is null") + .stream() + .map(AggregatorFactory::createGroupedAggregator) + .collect(toImmutableList()); + this.hashChannels = new int[groupByChannels.size() + (inputHashChannel.isPresent() ? 1 : 0)]; + for (int i = 0; i < groupByChannels.size(); i++) { + hashChannels[i] = groupByChannels.get(i); + } + inputHashChannel.ifPresent(channelIndex -> hashChannels[groupByChannels.size()] = channelIndex); + } + + @Override + public Work processPage(Page page) + { + checkArgument(currentPage == null); + currentPage = page; + return new CompletedWork<>(); + } + + @Override + public WorkProcessor buildResult() + { + if (currentPage == null) { + return WorkProcessor.of(); + } + + Page result = buildOutputPage(currentPage); + currentPage = null; + return WorkProcessor.of(result); + } + + @Override + public boolean isFull() + { + return currentPage != null; + } + + @Override + public void updateMemory() + { + if (currentPage != null) { + memoryContext.setBytes(currentPage.getSizeInBytes()); + } + } + + @Override + public void recordHashCollisions(HashCollisionsCounter hashCollisionsCounter) + { + // no op + } + + @Override + public void close() + { + } + + @Override + public ListenableFuture startMemoryRevoke() + { + throw new UnsupportedOperationException("startMemoryRevoke not supported for SkipAggregationBuilder"); + } + + @Override + public void finishMemoryRevoke() + { + throw new UnsupportedOperationException("finishMemoryRevoke not supported for SkipAggregationBuilder"); + } + + private Page buildOutputPage(Page page) + { + populateInitialAccumulatorState(page); + + BlockBuilder[] outputBuilders = serializeAccumulatorState(page.getPositionCount()); + + return constructOutputPage(page, outputBuilders); + } + + private void populateInitialAccumulatorState(Page page) + { + GroupByIdBlock groupByIdBlock = getGroupByIdBlock(page.getPositionCount()); + for (GroupedAggregator groupedAggregator : groupedAggregators) { + groupedAggregator.processPage(groupByIdBlock, page); + } + } + + private GroupByIdBlock getGroupByIdBlock(int positionCount) + { + return new GroupByIdBlock( + positionCount, + new LongArrayBlock(positionCount, Optional.empty(), consecutive(positionCount))); + } + + private BlockBuilder[] serializeAccumulatorState(int positionCount) + { + BlockBuilder[] outputBuilders = new BlockBuilder[groupedAggregators.size()]; + for (int i = 0; i < outputBuilders.length; i++) { + outputBuilders[i] = groupedAggregators.get(i).getType().createBlockBuilder(null, positionCount); + } + + for (int position = 0; position < positionCount; position++) { + for (int i = 0; i < groupedAggregators.size(); i++) { + GroupedAggregator groupedAggregator = groupedAggregators.get(i); + BlockBuilder output = outputBuilders[i]; + groupedAggregator.evaluate(position, output); + } + } + return outputBuilders; + } + + private Page constructOutputPage(Page page, BlockBuilder[] outputBuilders) + { + Block[] outputBlocks = new Block[hashChannels.length + outputBuilders.length]; + for (int i = 0; i < hashChannels.length; i++) { + outputBlocks[i] = page.getBlock(hashChannels[i]); + } + for (int i = 0; i < outputBuilders.length; i++) { + outputBlocks[hashChannels.length + i] = outputBuilders[i].build(); + } + return new Page(page.getPositionCount(), outputBlocks); + } + + private static long[] consecutive(int positionCount) + { + long[] longs = new long[positionCount]; + for (int i = 0; i < positionCount; i++) { + longs[i] = i; + } + return longs; + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java index 859a1ff2dcfd..01fc1629468f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java @@ -102,6 +102,7 @@ import io.trino.operator.aggregation.DistinctAccumulatorFactory; import io.trino.operator.aggregation.LambdaProvider; import io.trino.operator.aggregation.OrderedAccumulatorFactory; +import io.trino.operator.aggregation.partial.PartialAggregationController; import io.trino.operator.exchange.LocalExchange.LocalExchangeFactory; import io.trino.operator.exchange.LocalExchangeSinkOperator.LocalExchangeSinkOperatorFactory; import io.trino.operator.exchange.LocalExchangeSourceOperator.LocalExchangeSourceOperatorFactory; @@ -278,11 +279,14 @@ import static com.google.common.collect.Range.closedOpen; import static com.google.common.collect.Sets.difference; import static io.airlift.concurrent.MoreFutures.addSuccessCallback; +import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationMinRows; +import static io.trino.SystemSessionProperties.getAdaptivePartialAggregationUniqueRowsRatioThreshold; import static io.trino.SystemSessionProperties.getAggregationOperatorUnspillMemoryLimit; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageRowCount; import static io.trino.SystemSessionProperties.getFilterAndProjectMinOutputPageSize; import static io.trino.SystemSessionProperties.getTaskConcurrency; import static io.trino.SystemSessionProperties.getTaskWriterCount; +import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled; import static io.trino.SystemSessionProperties.isEnableCoordinatorDynamicFiltersDistribution; import static io.trino.SystemSessionProperties.isEnableLargeDynamicFilters; import static io.trino.SystemSessionProperties.isExchangeCompressionEnabled; @@ -3827,11 +3831,21 @@ private OperatorFactory createHashAggregationOperatorFactory( unspillMemoryLimit, spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + createPartialAggregationController(step, session)); } } } + private static Optional createPartialAggregationController(AggregationNode.Step step, Session session) + { + return step.isOutputPartial() && isAdaptivePartialAggregationEnabled(session) ? + Optional.of(new PartialAggregationController( + getAdaptivePartialAggregationMinRows(session), + getAdaptivePartialAggregationUniqueRowsRatioThreshold(session))) : + Optional.empty(); + } + private int getDynamicFilteringMaxDistinctValuesPerDriver(Session session, boolean isReplicatedJoin) { if (isEnableLargeDynamicFilters(session)) { diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java index a6d950333e88..9cc9ff382578 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java @@ -77,6 +77,10 @@ public class OptimizerConfig private double tableScanNodePartitioningMinBucketToTaskRatio = 0.5; private boolean mergeProjectWithValues = true; private boolean forceSingleNodeOutput = true; + // adaptive partial aggregation + private boolean adaptivePartialAggregationEnabled = true; + private long adaptivePartialAggregationMinRows = 100_000; + private double adaptivePartialAggregationUniqueRowsRatioThreshold = 0.8; public enum JoinReorderingStrategy { @@ -624,4 +628,42 @@ public OptimizerConfig setForceSingleNodeOutput(boolean value) this.forceSingleNodeOutput = value; return this; } + + public boolean isAdaptivePartialAggregationEnabled() + { + return adaptivePartialAggregationEnabled; + } + + @Config("adaptive-partial-aggregation.enabled") + public OptimizerConfig setAdaptivePartialAggregationEnabled(boolean adaptivePartialAggregationEnabled) + { + this.adaptivePartialAggregationEnabled = adaptivePartialAggregationEnabled; + return this; + } + + public long getAdaptivePartialAggregationMinRows() + { + return adaptivePartialAggregationMinRows; + } + + @Config("adaptive-partial-aggregation.min-rows") + @ConfigDescription("Minimum number of processed rows before partial aggregation might be adaptively turned off") + public OptimizerConfig setAdaptivePartialAggregationMinRows(long adaptivePartialAggregationMinRows) + { + this.adaptivePartialAggregationMinRows = adaptivePartialAggregationMinRows; + return this; + } + + public double getAdaptivePartialAggregationUniqueRowsRatioThreshold() + { + return adaptivePartialAggregationUniqueRowsRatioThreshold; + } + + @Config("adaptive-partial-aggregation.unique-rows-ratio-threshold") + @ConfigDescription("Ratio between aggregation output and input rows above which partial aggregation might be adaptively turned off") + public OptimizerConfig setAdaptivePartialAggregationUniqueRowsRatioThreshold(double adaptivePartialAggregationUniqueRowsRatioThreshold) + { + this.adaptivePartialAggregationUniqueRowsRatioThreshold = adaptivePartialAggregationUniqueRowsRatioThreshold; + return this; + } } diff --git a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java index c881b8424a29..edc6ec7e1608 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java @@ -80,7 +80,10 @@ public void testDefaults() .setUseTableScanNodePartitioning(true) .setTableScanNodePartitioningMinBucketToTaskRatio(0.5) .setMergeProjectWithValues(true) - .setForceSingleNodeOutput(true)); + .setForceSingleNodeOutput(true) + .setAdaptivePartialAggregationEnabled(true) + .setAdaptivePartialAggregationMinRows(100_000) + .setAdaptivePartialAggregationUniqueRowsRatioThreshold(0.8)); } @Test @@ -129,6 +132,9 @@ public void testExplicitPropertyMappings() .put("optimizer.use-table-scan-node-partitioning", "false") .put("optimizer.table-scan-node-partitioning-min-bucket-to-task-ratio", "0.0") .put("optimizer.merge-project-with-values", "false") + .put("adaptive-partial-aggregation.enabled", "false") + .put("adaptive-partial-aggregation.min-rows", "1") + .put("adaptive-partial-aggregation.unique-rows-ratio-threshold", "0.99") .buildOrThrow(); OptimizerConfig expected = new OptimizerConfig() @@ -173,7 +179,10 @@ public void testExplicitPropertyMappings() .setUseTableScanNodePartitioning(false) .setTableScanNodePartitioningMinBucketToTaskRatio(0.0) .setMergeProjectWithValues(false) - .setForceSingleNodeOutput(false); + .setForceSingleNodeOutput(false) + .setAdaptivePartialAggregationEnabled(false) + .setAdaptivePartialAggregationMinRows(1) + .setAdaptivePartialAggregationUniqueRowsRatioThreshold(0.99); assertFullMapping(properties, expected); } } diff --git a/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java b/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java index 0610b0cb3ab4..e25f28f029c9 100644 --- a/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java +++ b/core/trino-main/src/test/java/io/trino/operator/BenchmarkHashAndStreamingAggregationOperators.java @@ -266,7 +266,8 @@ private OperatorFactory createHashAggregationOperatorFactory( succinctBytes(Integer.MAX_VALUE), spillerFactory, JOIN_COMPILER, - BLOCK_TYPE_OPERATORS); + BLOCK_TYPE_OPERATORS, + Optional.empty()); } private static void repeatToBigintBlock(long value, int count, BlockBuilder blockBuilder) diff --git a/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java b/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java index 1dd263811d50..593e073de84f 100644 --- a/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java +++ b/core/trino-main/src/test/java/io/trino/operator/OperatorAssertion.java @@ -161,9 +161,16 @@ public static List toPages(OperatorFactory operatorFactory, DriverContext } public static List toPages(OperatorFactory operatorFactory, DriverContext driverContext, List input, boolean revokeMemoryWhenAddingPages) + { + return toPages(operatorFactory, driverContext, input, revokeMemoryWhenAddingPages, true); + } + + public static List toPages(OperatorFactory operatorFactory, DriverContext driverContext, List input, boolean revokeMemoryWhenAddingPages, boolean closeOperatorFactory) { try (Operator operator = operatorFactory.createOperator(driverContext)) { - operatorFactory.noMoreOperators(); + if (closeOperatorFactory) { + operatorFactory.noMoreOperators(); + } return toPages(operator, input.iterator(), revokeMemoryWhenAddingPages); } catch (Exception e) { @@ -225,6 +232,17 @@ public static void assertOperatorEquals( assertOperatorEquals(operatorFactory, driverContext, input, expected, false, ImmutableList.of(), revokeMemoryWhenAddingPages); } + public static void assertOperatorEquals( + OperatorFactory operatorFactory, + DriverContext driverContext, + List input, + MaterializedResult expected, + boolean revokeMemoryWhenAddingPages, + boolean closeOperatorFactory) + { + assertOperatorEquals(operatorFactory, driverContext, input, expected, false, ImmutableList.of(), revokeMemoryWhenAddingPages, closeOperatorFactory); + } + public static void assertOperatorEquals(OperatorFactory operatorFactory, DriverContext driverContext, List input, MaterializedResult expected, boolean hashEnabled, List hashChannels) { assertOperatorEquals(operatorFactory, driverContext, input, expected, hashEnabled, hashChannels, true); @@ -239,7 +257,28 @@ public static void assertOperatorEquals( List hashChannels, boolean revokeMemoryWhenAddingPages) { - List pages = toPages(operatorFactory, driverContext, input, revokeMemoryWhenAddingPages); + assertOperatorEquals( + operatorFactory, + driverContext, + input, + expected, + hashEnabled, + hashChannels, + revokeMemoryWhenAddingPages, + true); + } + + public static void assertOperatorEquals( + OperatorFactory operatorFactory, + DriverContext driverContext, + List input, + MaterializedResult expected, + boolean hashEnabled, + List hashChannels, + boolean revokeMemoryWhenAddingPages, + boolean closeOperatorFactory) + { + List pages = toPages(operatorFactory, driverContext, input, revokeMemoryWhenAddingPages, closeOperatorFactory); if (hashEnabled && !hashChannels.isEmpty()) { // Drop the hashChannel for all pages pages = dropChannel(pages, hashChannels); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java index 0ef38b641ed8..3d3fc7623bb3 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java @@ -27,6 +27,7 @@ import io.trino.operator.aggregation.TestingAggregationFunction; import io.trino.operator.aggregation.builder.HashAggregationBuilder; import io.trino.operator.aggregation.builder.InMemoryHashAggregationBuilder; +import io.trino.operator.aggregation.partial.PartialAggregationController; import io.trino.spi.Page; import io.trino.spi.block.BlockBuilder; import io.trino.spi.block.PageBuilderStatus; @@ -66,6 +67,8 @@ import static io.airlift.units.DataSize.succinctBytes; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; +import static io.trino.block.BlockAssertions.createLongsBlock; +import static io.trino.block.BlockAssertions.createRLEBlock; import static io.trino.operator.GroupByHashYieldAssertion.GroupByHashYieldResult; import static io.trino.operator.GroupByHashYieldAssertion.createPagesWithDistinctHashKeys; import static io.trino.operator.GroupByHashYieldAssertion.finishOperatorWithYieldingGroupByHash; @@ -90,6 +93,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; @Test(singleThreaded = true) @@ -193,7 +197,8 @@ public void testHashAggregation(boolean hashEnabled, boolean spillEnabled, boole succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); DriverContext driverContext = createDriverContext(memoryLimitForMerge); @@ -246,7 +251,8 @@ public void testHashAggregationWithGlobals(boolean hashEnabled, boolean spillEna succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); DriverContext driverContext = createDriverContext(memoryLimitForMerge); MaterializedResult expected = resultBuilder(driverContext.getSession(), VARCHAR, BIGINT, BIGINT, BIGINT, DOUBLE, VARCHAR, BIGINT, BIGINT) @@ -292,7 +298,8 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); Operator operator = operatorFactory.createOperator(driverContext); toPages(operator, input.iterator(), revokeMemoryWhenAddingPages); @@ -334,7 +341,8 @@ public void testMemoryLimit(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); toPages(operatorFactory, driverContext, input); } @@ -374,7 +382,8 @@ public void testHashBuilderResize(boolean hashEnabled, boolean spillEnabled, boo succinctBytes(memoryLimitForMergeWithMemory), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); toPages(operatorFactory, driverContext, input, revokeMemoryWhenAddingPages); } @@ -396,7 +405,8 @@ public void testMemoryReservationYield(Type type) 1, Optional.of(DataSize.of(16, MEGABYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); // get result with yield; pick a relatively small buffer for aggregator's memory usage GroupByHashYieldResult result; @@ -448,7 +458,8 @@ public void testHashBuilderResizeLimit(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); toPages(operatorFactory, driverContext, input); } @@ -482,7 +493,8 @@ public void testMultiSliceAggregationOutput(boolean hashEnabled) 100_000, Optional.of(DataSize.of(16, MEGABYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); assertEquals(toPages(operatorFactory, createDriverContext(), input).size(), 2); } @@ -513,7 +525,8 @@ public void testMultiplePartialFlushes(boolean hashEnabled) 100_000, Optional.of(DataSize.of(1, KILOBYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); DriverContext driverContext = createDriverContext(1024); @@ -598,7 +611,8 @@ public void testMergeWithMemorySpill() succinctBytes(Integer.MAX_VALUE), spillerFactory, joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); DriverContext driverContext = createDriverContext(smallPagesSpillThresholdSize); @@ -652,7 +666,8 @@ public void testSpillerFailure() succinctBytes(Integer.MAX_VALUE), new FailingSpillerFactory(), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); assertThatThrownBy(() -> toPages(operatorFactory, driverContext, input)) .isInstanceOf(RuntimeException.class) @@ -681,7 +696,8 @@ public void testMemoryTracking() 100_000, Optional.of(DataSize.of(16, MEGABYTE)), joinCompiler, - blockTypeOperators); + blockTypeOperators, + Optional.empty()); DriverContext driverContext = createDriverContext(1024); @@ -698,6 +714,115 @@ public void testMemoryTracking() assertEquals(driverContext.getRevocableMemoryUsage(), 0); } + @Test + public void testAdaptivePartialAggregation() + { + List hashChannels = Ints.asList(0); + + PartialAggregationController partialAggregationController = new PartialAggregationController(5, 0.8); + HashAggregationOperatorFactory operatorFactory = new HashAggregationOperatorFactory( + 0, + new PlanNodeId("test"), + ImmutableList.of(BIGINT), + hashChannels, + ImmutableList.of(), + PARTIAL, + ImmutableList.of(LONG_MIN.createAggregatorFactory(PARTIAL, ImmutableList.of(0), OptionalInt.empty())), + Optional.empty(), + Optional.empty(), + 100, + Optional.of(DataSize.ofBytes(1)), // this setting makes operator to flush after each page + joinCompiler, + blockTypeOperators, + // use 5 rows threshold to trigger adaptive partial aggregation after each page flush + Optional.of(partialAggregationController)); + + // at the start partial aggregation is enabled + assertFalse(partialAggregationController.isPartialAggregationDisabled()); + // First operator will trigger adaptive partial aggregation after the first page + List operator1Input = rowPagesBuilder(false, hashChannels, BIGINT) + .addBlocksPage(createLongsBlock(0, 1, 2, 3, 4, 5, 6, 7, 8, 8)) // first page will be hashed but the values are almost unique, so it will trigger adaptation + .addBlocksPage(createRLEBlock(1, 10)) // second page would be hashed to existing value 1. but if adaptive PA kicks in, the raw values will be passed on + .build(); + List operator1Expected = rowPagesBuilder(BIGINT, BIGINT) + .addBlocksPage(createLongsBlock(0, 1, 2, 3, 4, 5, 6, 7, 8), createLongsBlock(0, 1, 2, 3, 4, 5, 6, 7, 8)) // the last position was aggregated + .addBlocksPage(createRLEBlock(1, 10), createRLEBlock(1, 10)) // we are expecting second page with raw values + .build(); + assertOperatorEquals(operatorFactory, operator1Input, operator1Expected); + + // the first operator flush disables partial aggregation + assertTrue(partialAggregationController.isPartialAggregationDisabled()); + // second operator using the same factory, reuses PartialAggregationControl, so it will only produce raw pages (partial aggregation is disabled at this point) + List operator2Input = rowPagesBuilder(false, hashChannels, BIGINT) + .addBlocksPage(createRLEBlock(1, 10)) + .addBlocksPage(createRLEBlock(2, 10)) + .build(); + List operator2Expected = rowPagesBuilder(BIGINT, BIGINT) + .addBlocksPage(createRLEBlock(1, 10), createRLEBlock(1, 10)) + .addBlocksPage(createRLEBlock(2, 10), createRLEBlock(2, 10)) + .build(); + + assertOperatorEquals(operatorFactory, operator2Input, operator2Expected); + } + + @Test + public void testAdaptivePartialAggregationTriggeredOnlyOnFlush() + { + List hashChannels = Ints.asList(0); + + PartialAggregationController partialAggregationController = new PartialAggregationController(5, 0.8); + HashAggregationOperatorFactory operatorFactory = new HashAggregationOperatorFactory( + 0, + new PlanNodeId("test"), + ImmutableList.of(BIGINT), + hashChannels, + ImmutableList.of(), + PARTIAL, + ImmutableList.of(LONG_MIN.createAggregatorFactory(PARTIAL, ImmutableList.of(0), OptionalInt.empty())), + Optional.empty(), + Optional.empty(), + 10, + Optional.of(DataSize.of(16, MEGABYTE)), // this setting makes operator to flush only after all pages + joinCompiler, + blockTypeOperators, + // use 5 rows threshold to trigger adaptive partial aggregation after each page flush + Optional.of(partialAggregationController)); + + List operator1Input = rowPagesBuilder(false, hashChannels, BIGINT) + .addSequencePage(10, 0) // first page are unique values, so it would trigger adaptation, but it won't because flush is not called + .addBlocksPage(createRLEBlock(1, 2)) // second page will be hashed to existing value 1 + .build(); + // the total unique ows ratio for the first operator will be 10/12 so > 0.8 (adaptive partial aggregation uniqueRowsRatioThreshold) + List operator1Expected = rowPagesBuilder(BIGINT, BIGINT) + .addSequencePage(10, 0, 0) // we are expecting second page to be squashed with the first + .build(); + assertOperatorEquals(operatorFactory, operator1Input, operator1Expected); + + // the first operator flush disables partial aggregation + assertTrue(partialAggregationController.isPartialAggregationDisabled()); + + // second operator using the same factory, reuses PartialAggregationControl, so it will only produce raw pages (partial aggregation is disabled at this point) + List operator2Input = rowPagesBuilder(false, hashChannels, BIGINT) + .addBlocksPage(createRLEBlock(1, 10)) + .addBlocksPage(createRLEBlock(2, 10)) + .build(); + List operator2Expected = rowPagesBuilder(BIGINT, BIGINT) + .addBlocksPage(createRLEBlock(1, 10), createRLEBlock(1, 10)) + .addBlocksPage(createRLEBlock(2, 10), createRLEBlock(2, 10)) + .build(); + + assertOperatorEquals(operatorFactory, operator2Input, operator2Expected); + } + + private void assertOperatorEquals(OperatorFactory operatorFactory, List input, List expectedPages) + { + DriverContext driverContext = createDriverContext(1024); + MaterializedResult expected = resultBuilder(driverContext.getSession(), BIGINT, BIGINT) + .pages(expectedPages) + .build(); + OperatorAssertion.assertOperatorEquals(operatorFactory, driverContext, input, expected, false, false); + } + private DriverContext createDriverContext() { return createDriverContext(Integer.MAX_VALUE); diff --git a/core/trino-main/src/test/java/io/trino/sql/query/TestFilterHideInacessibleColumnsSession.java b/core/trino-main/src/test/java/io/trino/sql/query/TestFilterHideInacessibleColumnsSession.java index cbe3a73bab72..15f7a518013f 100644 --- a/core/trino-main/src/test/java/io/trino/sql/query/TestFilterHideInacessibleColumnsSession.java +++ b/core/trino-main/src/test/java/io/trino/sql/query/TestFilterHideInacessibleColumnsSession.java @@ -35,15 +35,7 @@ public void testDisableWhenEnabledByDefault() { FeaturesConfig featuresConfig = new FeaturesConfig(); featuresConfig.setHideInaccessibleColumns(true); - SessionPropertyManager sessionPropertyManager = new SessionPropertyManager(new SystemSessionProperties( - new QueryManagerConfig(), - new TaskManagerConfig(), - new MemoryManagerConfig(), - featuresConfig, - new OptimizerConfig(), - new NodeMemoryConfig(), - new DynamicFilterConfig(), - new NodeSchedulerConfig())); + SessionPropertyManager sessionPropertyManager = createSessionPropertyManager(featuresConfig); assertThatThrownBy(() -> sessionPropertyManager.validateSystemSessionProperty(SystemSessionProperties.HIDE_INACCESSIBLE_COLUMNS, "false")) .hasMessage("hide_inaccessible_columns cannot be disabled with session property when it was enabled with configuration"); } @@ -53,15 +45,7 @@ public void testEnableWhenAlreadyEnabledByDefault() { FeaturesConfig featuresConfig = new FeaturesConfig(); featuresConfig.setHideInaccessibleColumns(true); - SessionPropertyManager sessionPropertyManager = new SessionPropertyManager(new SystemSessionProperties( - new QueryManagerConfig(), - new TaskManagerConfig(), - new MemoryManagerConfig(), - featuresConfig, - new OptimizerConfig(), - new NodeMemoryConfig(), - new DynamicFilterConfig(), - new NodeSchedulerConfig())); + SessionPropertyManager sessionPropertyManager = createSessionPropertyManager(featuresConfig); assertThatNoException().isThrownBy(() -> sessionPropertyManager.validateSystemSessionProperty(SystemSessionProperties.HIDE_INACCESSIBLE_COLUMNS, "true")); } @@ -69,15 +53,7 @@ public void testEnableWhenAlreadyEnabledByDefault() public void testDisableWhenAlreadyDisabledByDefault() { FeaturesConfig featuresConfig = new FeaturesConfig(); - SessionPropertyManager sessionPropertyManager = new SessionPropertyManager(new SystemSessionProperties( - new QueryManagerConfig(), - new TaskManagerConfig(), - new MemoryManagerConfig(), - featuresConfig, - new OptimizerConfig(), - new NodeMemoryConfig(), - new DynamicFilterConfig(), - new NodeSchedulerConfig())); + SessionPropertyManager sessionPropertyManager = createSessionPropertyManager(featuresConfig); assertThatNoException().isThrownBy(() -> sessionPropertyManager.validateSystemSessionProperty(SystemSessionProperties.HIDE_INACCESSIBLE_COLUMNS, "false")); } @@ -85,7 +61,13 @@ public void testDisableWhenAlreadyDisabledByDefault() public void testEnableWhenDisabledByDefault() { FeaturesConfig featuresConfig = new FeaturesConfig(); - SessionPropertyManager sessionPropertyManager = new SessionPropertyManager(new SystemSessionProperties( + SessionPropertyManager sessionPropertyManager = createSessionPropertyManager(featuresConfig); + assertThatNoException().isThrownBy(() -> sessionPropertyManager.validateSystemSessionProperty(SystemSessionProperties.HIDE_INACCESSIBLE_COLUMNS, "true")); + } + + private SessionPropertyManager createSessionPropertyManager(FeaturesConfig featuresConfig) + { + return new SessionPropertyManager(new SystemSessionProperties( new QueryManagerConfig(), new TaskManagerConfig(), new MemoryManagerConfig(), @@ -94,6 +76,5 @@ public void testEnableWhenDisabledByDefault() new NodeMemoryConfig(), new DynamicFilterConfig(), new NodeSchedulerConfig())); - assertThatNoException().isThrownBy(() -> sessionPropertyManager.validateSystemSessionProperty(SystemSessionProperties.HIDE_INACCESSIBLE_COLUMNS, "true")); } } diff --git a/core/trino-main/src/test/java/io/trino/sql/query/TestShowQueries.java b/core/trino-main/src/test/java/io/trino/sql/query/TestShowQueries.java index b2326fd675b7..6309bacc4042 100644 --- a/core/trino-main/src/test/java/io/trino/sql/query/TestShowQueries.java +++ b/core/trino-main/src/test/java/io/trino/sql/query/TestShowQueries.java @@ -90,8 +90,9 @@ public void testShowCatalogsLikeWithEscape() public void testShowFunctionLike() { assertThat(assertions.query("SHOW FUNCTIONS LIKE 'split%'")) + .skippingTypesCheck() .matches("VALUES " + - "(cast('split' AS VARCHAR(30)), cast('array(varchar(x))' AS VARCHAR(28)), cast('varchar(x), varchar(y)' AS VARCHAR(68)), cast('scalar' AS VARCHAR(9)), true, cast('' AS VARCHAR(131)))," + + "('split', 'array(varchar(x))', 'varchar(x), varchar(y)', 'scalar', true, '')," + "('split', 'array(varchar(x))', 'varchar(x), varchar(y), bigint', 'scalar', true, '')," + "('split_part', 'varchar(x)', 'varchar(x), varchar(y), bigint', 'scalar', true, 'Splits a string by a delimiter and returns the specified field (counting from one)')," + "('split_to_map', 'map(varchar,varchar)', 'varchar, varchar, varchar', 'scalar', true, 'Creates a map using entryDelimiter and keyValueDelimiter')," + @@ -102,8 +103,9 @@ public void testShowFunctionLike() public void testShowFunctionsLikeWithEscape() { assertThat(assertions.query("SHOW FUNCTIONS LIKE 'split$_to$_%' ESCAPE '$'")) + .skippingTypesCheck() .matches("VALUES " + - "(cast('split_to_map' AS VARCHAR(30)), cast('map(varchar,varchar)' AS VARCHAR(28)), cast('varchar, varchar, varchar' AS VARCHAR(68)), cast('scalar' AS VARCHAR(9)), true, cast('Creates a map using entryDelimiter and keyValueDelimiter' AS VARCHAR(131)))," + + "('split_to_map', 'map(varchar,varchar)', 'varchar, varchar, varchar', 'scalar', true, 'Creates a map using entryDelimiter and keyValueDelimiter')," + "('split_to_multimap', 'map(varchar,array(varchar))', 'varchar, varchar, varchar', 'scalar', true, 'Creates a multimap by splitting a string into key/value pairs')"); } @@ -112,7 +114,8 @@ public void testShowSessionLike() { assertThat(assertions.query( "SHOW SESSION LIKE '%page_row_c%'")) - .matches("VALUES (cast('filter_and_project_min_output_page_row_count' as VARCHAR(53)), cast('256' as VARCHAR(14)), cast('256' as VARCHAR(14)), 'integer', cast('Experimental: Minimum output page row count for filter and project operators' as VARCHAR(142)))"); + .skippingTypesCheck() + .matches("VALUES ('filter_and_project_min_output_page_row_count', '256', '256', 'integer', 'Experimental: Minimum output page row count for filter and project operators')"); } @Test @@ -124,7 +127,8 @@ public void testShowSessionLikeWithEscape() .hasMessage("Escape string must be a single character"); assertThat(assertions.query( "SHOW SESSION LIKE '%page$_row$_c%' ESCAPE '$'")) - .matches("VALUES (cast('filter_and_project_min_output_page_row_count' as VARCHAR(53)), cast('256' as VARCHAR(14)), cast('256' as VARCHAR(14)), 'integer', cast('Experimental: Minimum output page row count for filter and project operators' as VARCHAR(142)))"); + .skippingTypesCheck() + .matches("VALUES ('filter_and_project_min_output_page_row_count', '256', '256', 'integer', 'Experimental: Minimum output page row count for filter and project operators')"); } @Test diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HandTpchQuery1.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HandTpchQuery1.java index 1d5302c975c5..636988d87e61 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HandTpchQuery1.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HandTpchQuery1.java @@ -121,7 +121,8 @@ protected List createOperatorFactories() 10_000, Optional.of(DataSize.of(16, MEGABYTE)), new JoinCompiler(localQueryRunner.getTypeOperators()), - localQueryRunner.getBlockTypeOperators()); + localQueryRunner.getBlockTypeOperators(), + Optional.empty()); return ImmutableList.of(tableScanOperator, tpchQuery1Operator, aggregationOperator); } diff --git a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashAggregationBenchmark.java b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashAggregationBenchmark.java index 7105a6e74f7c..8eec3fd5bd98 100644 --- a/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashAggregationBenchmark.java +++ b/testing/trino-benchmark/src/main/java/io/trino/benchmark/HashAggregationBenchmark.java @@ -61,7 +61,8 @@ protected List createOperatorFactories() 100_000, Optional.of(DataSize.of(16, MEGABYTE)), new JoinCompiler(localQueryRunner.getTypeOperators()), - localQueryRunner.getBlockTypeOperators()); + localQueryRunner.getBlockTypeOperators(), + Optional.empty()); return ImmutableList.of(tableScanOperator, aggregationOperator); } diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestAdaptivePartialAggregation.java b/testing/trino-tests/src/test/java/io/trino/tests/TestAdaptivePartialAggregation.java new file mode 100644 index 000000000000..3b9fa208c216 --- /dev/null +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestAdaptivePartialAggregation.java @@ -0,0 +1,34 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests; + +import com.google.common.collect.ImmutableMap; +import io.trino.testing.AbstractTestAggregations; +import io.trino.testing.QueryRunner; +import io.trino.tests.tpch.TpchQueryRunnerBuilder; + +public class TestAdaptivePartialAggregation + extends AbstractTestAggregations +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return TpchQueryRunnerBuilder.builder() + .setExtraProperties(ImmutableMap.of( + "adaptive-partial-aggregation.min-rows", "0", + "task.max-partial-aggregation-memory", "0B")) + .build(); + } +}