Skip to content

Commit

Permalink
Add use_cost_based_partitioning
Browse files Browse the repository at this point in the history
Use use_cost_based_partitioning instead of use_exact_partitioning to
control the cost based optimization to prefer parent partitioning.
The motivation is to be able to disable the optimization if the NDV
statistics are overestimated and the optimization would hurt parallelism.
  • Loading branch information
lukasz-stec authored and raunaqmorarka committed Mar 31, 2023
1 parent 21a7a2c commit 975bb3e
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ public final class SystemSessionProperties
public static final String MIN_INPUT_SIZE_PER_TASK = "min_input_size_per_task";
public static final String MIN_INPUT_ROWS_PER_TASK = "min_input_rows_per_task";
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String USE_COST_BASED_PARTITIONING = "use_cost_based_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_FORCE_PREFERRED_WRITE_PARTITIONING_ENABLED = "fault_tolerant_execution_force_preferred_write_partitioning_enabled";
public static final String PAGE_PARTITIONING_BUFFER_POOL_SIZE = "page_partitioning_buffer_pool_size";
Expand Down Expand Up @@ -973,6 +974,11 @@ public SystemSessionProperties(
"When enabled this forces data repartitioning unless the partitioning of upstream stage matches exactly what downstream stage expects",
optimizerConfig.isUseExactPartitioning(),
false),
booleanProperty(
USE_COST_BASED_PARTITIONING,
"When enabled the cost based optimizer is used to determine if repartitioning the output of an already partitioned stage is necessary",
optimizerConfig.isUseCostBasedPartitioning(),
false),
booleanProperty(
FORCE_SPILLING_JOIN,
"Force the usage of spliing join operator in favor of the non-spilling one, even if spill is not enabled",
Expand Down Expand Up @@ -1753,6 +1759,11 @@ public static boolean isUseExactPartitioning(Session session)
return session.getSystemProperty(USE_EXACT_PARTITIONING, Boolean.class);
}

public static boolean isUseCostBasedPartitioning(Session session)
{
return session.getSystemProperty(USE_COST_BASED_PARTITIONING, Boolean.class);
}

public static boolean isForceSpillingOperator(Session session)
{
return session.getSystemProperty(FORCE_SPILLING_JOIN, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class OptimizerConfig
private boolean mergeProjectWithValues = true;
private boolean forceSingleNodeOutput;
private boolean useExactPartitioning;
private boolean useCostBasedPartitioning = true;
// adaptive partial aggregation
private boolean adaptivePartialAggregationEnabled = true;
private long adaptivePartialAggregationMinRows = 100_000;
Expand Down Expand Up @@ -802,4 +803,17 @@ public OptimizerConfig setUseExactPartitioning(boolean useExactPartitioning)
this.useExactPartitioning = useExactPartitioning;
return this;
}

public boolean isUseCostBasedPartitioning()
{
return useCostBasedPartitioning;
}

@Config("optimizer.use-cost-based-partitioning")
@ConfigDescription("When enabled the cost based optimizer is used to determine if repartitioning the output of an already partitioned stage is necessary")
public OptimizerConfig setUseCostBasedPartitioning(boolean useCostBasedPartitioning)
{
this.useCostBasedPartitioning = useCostBasedPartitioning;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
import static io.trino.SystemSessionProperties.isColocatedJoinEnabled;
import static io.trino.SystemSessionProperties.isDistributedSortEnabled;
import static io.trino.SystemSessionProperties.isForceSingleNodeOutput;
import static io.trino.SystemSessionProperties.isUseCostBasedPartitioning;
import static io.trino.SystemSessionProperties.isUseExactPartitioning;
import static io.trino.SystemSessionProperties.isUsePartialDistinctLimit;
import static io.trino.sql.planner.FragmentTableScanCounter.countSources;
Expand Down Expand Up @@ -276,7 +277,7 @@ else if ((!isStreamPartitionedOn(child.getProperties(), partitioningRequirement)
*/
private Optional<List<Symbol>> useParentPreferredPartitioning(AggregationNode node, Set<Symbol> parentPreferredPartitioningColumns)
{
if (isUseExactPartitioning(session)) {
if (isUseExactPartitioning(session) || !isUseCostBasedPartitioning(session)) {
return Optional.empty();
}
if (parentPreferredPartitioningColumns.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ public void testDefaults()
.setJoinPartitionedBuildMinRowCount(1_000_000)
.setMinInputSizePerTask(DataSize.of(5, GIGABYTE))
.setMinInputRowsPerTask(10_000_000L)
.setUseExactPartitioning(false));
.setUseExactPartitioning(false)
.setUseCostBasedPartitioning(true));
}

@Test
Expand Down Expand Up @@ -150,6 +151,7 @@ public void testExplicitPropertyMappings()
.put("optimizer.min-input-size-per-task", "1MB")
.put("optimizer.min-input-rows-per-task", "1000000")
.put("optimizer.use-exact-partitioning", "true")
.put("optimizer.use-cost-based-partitioning", "false")
.buildOrThrow();

OptimizerConfig expected = new OptimizerConfig()
Expand Down Expand Up @@ -204,7 +206,8 @@ public void testExplicitPropertyMappings()
.setJoinPartitionedBuildMinRowCount(1)
.setMinInputSizePerTask(DataSize.of(1, MEGABYTE))
.setMinInputRowsPerTask(1_000_000L)
.setUseExactPartitioning(true);
.setUseExactPartitioning(true)
.setUseCostBasedPartitioning(false);
assertFullMapping(properties, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static io.trino.SystemSessionProperties.MARK_DISTINCT_STRATEGY;
import static io.trino.SystemSessionProperties.SPILL_ENABLED;
import static io.trino.SystemSessionProperties.TASK_CONCURRENCY;
import static io.trino.SystemSessionProperties.USE_COST_BASED_PARTITIONING;
import static io.trino.SystemSessionProperties.USE_EXACT_PARTITIONING;
import static io.trino.spi.type.VarcharType.createVarcharType;
import static io.trino.sql.planner.OptimizerConfig.JoinDistributionType.PARTITIONED;
Expand Down Expand Up @@ -657,6 +658,8 @@ SELECT suppkey, partkey, count(*) as count
anyTree(tableScan("lineitem", ImmutableMap.of(
"partkey", "partkey",
"suppkey", "suppkey"))))))))))))));
// parent partitioning would be preferable but use_cost_based_partitioning=false prevents it
assertDistributedPlan(singleColumnParentGroupBy, doNotUseCostBasedPartitioning(), exactPartitioningPlan);
// parent partitioning would be preferable but use_exact_partitioning prevents it
assertDistributedPlan(singleColumnParentGroupBy, useExactPartitioning(), exactPartitioningPlan);
// no stats. fallback to exact partitioning expected
Expand Down Expand Up @@ -1014,6 +1017,13 @@ private Session useExactPartitioning()
.build();
}

private Session doNotUseCostBasedPartitioning()
{
return Session.builder(getQueryRunner().getDefaultSession())
.setSystemProperty(USE_COST_BASED_PARTITIONING, "false")
.build();
}

private Session disableStats()
{
return Session.builder(getQueryRunner().getDefaultSession())
Expand Down
10 changes: 10 additions & 0 deletions docs/src/main/sphinx/admin/properties-optimizer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,13 @@ increases concurrency on large clusters where multiple small queries are running
The estimated value will always be between ``min_hash_partition_count`` and
``max_hash_partition_count`` session property.
A value of ``0`` disables this optimization.

``optimizer.use-cost-based-partitioning``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** :ref:`prop-type-boolean`
* **Default value:** ``true``
* **Session property:** ``use_cost_based_partitioning``

When enabled the cost based optimizer is used to determine if repartitioning the output of an
already partitioned stage is necessary.

0 comments on commit 975bb3e

Please sign in to comment.