diff --git a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java index 03be31c6e14a..e06323d0287d 100644 --- a/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java +++ b/core/trino-main/src/main/java/io/trino/SystemSessionProperties.java @@ -192,6 +192,7 @@ public final class SystemSessionProperties public static final String MIN_INPUT_SIZE_PER_TASK = "min_input_size_per_task"; public static final String MIN_INPUT_ROWS_PER_TASK = "min_input_rows_per_task"; public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning"; + public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning"; public static final String FORCE_SPILLING_JOIN = "force_spilling_join"; public static final String FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED = "fault_tolerant_execution_force_preferred_write_partitioning_enabled"; public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size"; @@ -973,6 +974,11 @@ public SystemSessionProperties( "When enabled this forces data repartitioning unless the partitioning of upstream stage matches exactly what downstream stage expects", optimizerConfig.isUseExactPartitioning(), false), + booleanProperty( + USE_COST_BASED_PARTITIONING, + "When enabled the cost based optimizer is used to determine if repartitioning the output of an already partitioned stage is necessary", + optimizerConfig.isUseCostBasedPartitioning(), + false), booleanProperty( FORCE_SPILLING_JOIN, "Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled", @@ -1753,6 +1759,11 @@ public static boolean isUseExactPartitioning(Session session) return session.getSystemProperty(USE_EXACT_PARTITIONING, Boolean.class); } + public static boolean isUseCostBasedPartitioning(Session session) + { + return session.getSystemProperty(USE_COST_BASED_PARTITIONING, Boolean.class); + } + public static boolean isForceSpillingOperator(Session session) { return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class); diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java index d718c5a9adff..d770761cb75f 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/OptimizerConfig.java @@ -87,6 +87,7 @@ public class OptimizerConfig private boolean mergeProjectWithValues = true; private boolean forceSingleNodeOutput; private boolean useExactPartitioning; + private boolean useCostBasedPartitioning = true; // adaptive partial aggregation private boolean adaptivePartialAggregationEnabled = true; private long adaptivePartialAggregationMinRows = 100_000; @@ -802,4 +803,17 @@ public OptimizerConfig setUseExactPartitioning(boolean useExactPartitioning) this.useExactPartitioning = useExactPartitioning; return this; } + + public boolean isUseCostBasedPartitioning() + { + return useCostBasedPartitioning; + } + + @Config("optimizer.use-cost-based-partitioning") + @ConfigDescription("When enabled the cost based optimizer is used to determine if repartitioning the output of an already partitioned stage is necessary") + public OptimizerConfig setUseCostBasedPartitioning(boolean useCostBasedPartitioning) + { + this.useCostBasedPartitioning = useCostBasedPartitioning; + return this; + } } diff --git a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java index 223f8be90162..4da3d3b17902 100644 --- a/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java +++ b/core/trino-main/src/main/java/io/trino/sql/planner/optimizations/AddExchanges.java @@ -109,6 +109,7 @@ import static io.trino.SystemSessionProperties.isColocatedJoinEnabled; import static io.trino.SystemSessionProperties.isDistributedSortEnabled; import static io.trino.SystemSessionProperties.isForceSingleNodeOutput; +import static io.trino.SystemSessionProperties.isUseCostBasedPartitioning; import static io.trino.SystemSessionProperties.isUseExactPartitioning; import static io.trino.SystemSessionProperties.isUsePartialDistinctLimit; import static io.trino.sql.planner.FragmentTableScanCounter.countSources; @@ -276,7 +277,7 @@ else if ((!isStreamPartitionedOn(child.getProperties(), partitioningRequirement) */ private Optional> useParentPreferredPartitioning(AggregationNode node, Set parentPreferredPartitioningColumns) { - if (isUseExactPartitioning(session)) { + if (isUseExactPartitioning(session) || !isUseCostBasedPartitioning(session)) { return Optional.empty(); } if (parentPreferredPartitioningColumns.isEmpty()) { diff --git a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java index db574345a214..3749a199eb0b 100644 --- a/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java +++ b/core/trino-main/src/test/java/io/trino/cost/TestOptimizerConfig.java @@ -91,7 +91,8 @@ public void testDefaults() .setJoinPartitionedBuildMinRowCount(1_000_000) .setMinInputSizePerTask(DataSize.of(5, GIGABYTE)) .setMinInputRowsPerTask(10_000_000L) - .setUseExactPartitioning(false)); + .setUseExactPartitioning(false) + .setUseCostBasedPartitioning(true)); } @Test @@ -150,6 +151,7 @@ public void testExplicitPropertyMappings() .put("optimizer.min-input-size-per-task", "1MB") .put("optimizer.min-input-rows-per-task", "1000000") .put("optimizer.use-exact-partitioning", "true") + .put("optimizer.use-cost-based-partitioning", "false") .buildOrThrow(); OptimizerConfig expected = new OptimizerConfig() @@ -204,7 +206,8 @@ public void testExplicitPropertyMappings() .setJoinPartitionedBuildMinRowCount(1) .setMinInputSizePerTask(DataSize.of(1, MEGABYTE)) .setMinInputRowsPerTask(1_000_000L) - .setUseExactPartitioning(true); + .setUseExactPartitioning(true) + .setUseCostBasedPartitioning(false); assertFullMapping(properties, expected); } } diff --git a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java index 6bc499eb221f..b4c5cc1abb7b 100644 --- a/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java +++ b/core/trino-main/src/test/java/io/trino/sql/planner/optimizations/TestAddExchangesPlans.java @@ -48,6 +48,7 @@ import static io.trino.SystemSessionProperties.MARK_DISTINCT_STRATEGY; import static io.trino.SystemSessionProperties.SPILL_ENABLED; import static io.trino.SystemSessionProperties.TASK_CONCURRENCY; +import static io.trino.SystemSessionProperties.USE_COST_BASED_PARTITIONING; import static io.trino.SystemSessionProperties.USE_EXACT_PARTITIONING; import static io.trino.spi.type.VarcharType.createVarcharType; import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED; @@ -657,6 +658,8 @@ SELECT suppkey, partkey, count(*) as count anyTree(tableScan("lineitem", ImmutableMap.of( "partkey", "partkey", "suppkey", "suppkey")))))))))))))); + // parent partitioning would be preferable but use_cost_based_partitioning=false prevents it + assertDistributedPlan(singleColumnParentGroupBy, doNotUseCostBasedPartitioning(), exactPartitioningPlan); // parent partitioning would be preferable but use_exact_partitioning prevents it assertDistributedPlan(singleColumnParentGroupBy, useExactPartitioning(), exactPartitioningPlan); // no stats. fallback to exact partitioning expected @@ -1014,6 +1017,13 @@ private Session useExactPartitioning() .build(); } + private Session doNotUseCostBasedPartitioning() + { + return Session.builder(getQueryRunner().getDefaultSession()) + .setSystemProperty(USE_COST_BASED_PARTITIONING, "false") + .build(); + } + private Session disableStats() { return Session.builder(getQueryRunner().getDefaultSession()) diff --git a/docs/src/main/sphinx/admin/properties-optimizer.rst b/docs/src/main/sphinx/admin/properties-optimizer.rst index 618bb47f9441..cb676d8fc307 100644 --- a/docs/src/main/sphinx/admin/properties-optimizer.rst +++ b/docs/src/main/sphinx/admin/properties-optimizer.rst @@ -272,3 +272,13 @@ increases concurrency on large clusters where multiple small queries are running The estimated value will always be between ``min_hash_partition_count`` and ``max_hash_partition_count`` session property. A value of ``0`` disables this optimization. + +``optimizer.use-cost-based-partitioning`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +* **Type:** :ref:`prop-type-boolean` +* **Default value:** ``true`` +* **Session property:** ``use_cost_based_partitioning`` + +When enabled the cost based optimizer is used to determine if repartitioning the output of an +already partitioned stage is necessary.