From 0ebdaa0083b5361af2ff098a6b4d24803717d153 Mon Sep 17 00:00:00 2001 From: Karol Sobczak Date: Fri, 25 Nov 2022 22:45:09 +0100 Subject: [PATCH] Fix derivation of LocalExchange single stream property LocalExchange will preserve single stream property only if: 1. global partitioning is either on coordinator or single-node 2. LocalExchange is gathering --- .../optimizations/PropertyDerivations.java | 16 ++++-- .../testing/AbstractTestWindowQueries.java | 51 +++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java index d0fbfdb12be7..68acc751429e 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/PropertyDerivations.java @@ -101,7 +101,9 @@ import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.spi.predicate.TupleDomain.extractFixedValues; import static io.trino.sql.planner.SystemPartitioningHandle.ARBITRARY_DISTRIBUTION; +import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION; import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION; +import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION; import static io.trino.sql.planner.optimizations.ActualProperties.Global.arbitraryPartition; import static io.trino.sql.planner.optimizations.ActualProperties.Global.coordinatorSingleStreamPartition; import static io.trino.sql.planner.optimizations.ActualProperties.Global.partitionedOn; @@ -653,7 +655,7 @@ public ActualProperties visitExchange(ExchangeNode node, List // This is acceptable because AddLocalExchanges does not use global properties and is only // interested in the local properties. // However, for the purpose of validation, some global properties (single-node vs distributed) - // are computed for local exchanges. + // need to be propagated through local exchanges. // TODO: implement full properties for local exchanges if (node.getScope() == LOCAL) { if (inputProperties.size() == 1) { @@ -674,10 +676,18 @@ public ActualProperties visitExchange(ExchangeNode node, List builder.constants(constants); if (inputProperties.stream().anyMatch(ActualProperties::isCoordinatorOnly)) { - builder.global(coordinatorSingleStreamPartition()); + builder.global(partitionedOn( + COORDINATOR_DISTRIBUTION, + ImmutableList.of(), + // only gathering local exchange preserves single stream property + node.getType() == GATHER ? Optional.of(ImmutableList.of()) : Optional.empty())); } else if (inputProperties.stream().anyMatch(ActualProperties::isSingleNode)) { - builder.global(coordinatorSingleStreamPartition()); + builder.global(partitionedOn( + SINGLE_DISTRIBUTION, + ImmutableList.of(), + // only gathering local exchange preserves single stream property + node.getType() == GATHER ? Optional.of(ImmutableList.of()) : Optional.empty())); } else if (node.getOrderingScheme().isPresent() && node.getType() == GATHER) { // Local merging exchange uses passthrough distribution diff --git a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestWindowQueries.java b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestWindowQueries.java index e01a5a05a7d2..533dafc53cc1 100644 --- a/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestWindowQueries.java +++ b/testing/trino-testing/src/main/java/io/trino/testing/AbstractTestWindowQueries.java @@ -700,4 +700,55 @@ public void testMultipleInstancesOfWindowFunction() "(5, 'A', 'e', 'c', null), " + "(6, 'A', null, 'e', 'e')"); } + + @Test + public void testPreSortedInput() + { + assertQueryOrdered("" + + "WITH students_results(student_id, course_id, grade) AS (VALUES " + + " (1000, 100, 17), " + + " (2000, 200, 16), " + + " (3000, 300, 18), " + + " (1000, 100, 18), " + + " (2000, 100, 10), " + + " (3000, 200, 20), " + + " (1000, 200, 16), " + + " (2000, 300, 12), " + + " (3000, 100, 17), " + + " (2000, 200, 15), " + + " (3000, 100, 18), " + + " (1000, 300, 12), " + + " (3000, 100, 20), " + + " (1000, 300, 16), " + + " (2000, 100, 12)) " + + "SELECT student_id, course_id, cnt, avg_w_sum, " + + " avg(sum_w_sum) OVER (" + + " ORDER BY student_id, course_id" + + " ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING" + + " ) AS avg_w " + + "FROM (" + + " SELECT" + + " student_id, course_id, count(*) AS cnt," + + " sum(sum(grade)) OVER (" + + " ORDER BY student_id, course_id" + + " ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING" + + " ) AS avg_w_sum," + + " sum(sum(grade)) OVER (" + + " PARTITION BY student_id" + + " ) AS sum_w_sum" + + " FROM students_results" + + " GROUP BY student_id, course_id" + + ") AS t " + + "ORDER BY student_id, course_id", + "VALUES " + + "(1000, 100, 2, 51, 79.0), " + + "(1000, 200, 1, 79, 79.0), " + + "(1000, 300, 2, 101, 75.5), " + + "(2000, 100, 2, 97, 72.0), " + + "(2000, 200, 2, 93, 68.5), " + + "(2000, 300, 1, 120, 72.0), " + + "(3000, 100, 3, 118, 79.0), " + + "(3000, 200, 1, 105, 86.0), " + + "(3000, 300, 1, 93, 93.0)"); + } }